Browse Source

fix: update vif replication on configure

rust-volume-server
Chris Lu 4 days ago
parent
commit
bcb8a27849
  1. 102
      seaweed-volume/src/storage/store.rs

102
seaweed-volume/src/storage/store.rs

@ -16,7 +16,7 @@ use crate::storage::needle::needle::Needle;
use crate::storage::needle_map::NeedleMapKind;
use crate::storage::super_block::ReplicaPlacement;
use crate::storage::types::*;
use crate::storage::volume::VolumeError;
use crate::storage::volume::{VifVolumeInfo, VolumeError};
/// Top-level storage manager containing all disk locations and their volumes.
pub struct Store {
@ -273,60 +273,17 @@ impl Store {
vid: VolumeId,
rp: ReplicaPlacement,
) -> Result<(), VolumeError> {
use std::io::{Read, Seek, SeekFrom, Write};
// Find the .dat file across all locations
for loc in &self.locations {
// Try both empty and all known collection prefixes
// We scan the directory for matching .dat files
let dir = &loc.directory;
let patterns = [
format!("{}/{}.dat", dir, vid.0),
];
for dat_path in &patterns {
if std::path::Path::new(dat_path).exists() {
let mut file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(dat_path)
.map_err(VolumeError::Io)?;
// Read the super block header (at least 8 bytes)
let mut header = [0u8; 8];
file.read_exact(&mut header).map_err(VolumeError::Io)?;
// Byte 1 is the replica_placement
header[1] = rp.to_byte();
file.seek(SeekFrom::Start(0)).map_err(VolumeError::Io)?;
file.write_all(&header).map_err(VolumeError::Io)?;
file.sync_all().map_err(VolumeError::Io)?;
return Ok(());
}
}
// Also check collection-prefixed files
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
let name = entry.file_name();
let name = name.to_string_lossy();
if name.ends_with(&format!("_{}.dat", vid.0)) {
let dat_path = entry.path();
let mut file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&dat_path)
.map_err(VolumeError::Io)?;
let mut header = [0u8; 8];
file.read_exact(&mut header).map_err(VolumeError::Io)?;
header[1] = rp.to_byte();
file.seek(SeekFrom::Start(0)).map_err(VolumeError::Io)?;
file.write_all(&header).map_err(VolumeError::Io)?;
file.sync_all().map_err(VolumeError::Io)?;
return Ok(());
}
}
}
}
Err(VolumeError::Io(io::Error::new(
io::ErrorKind::NotFound,
format!("volume {} not found on disk", vid),
)))
let (_, base_path, _) = self.find_volume_file_base(vid).ok_or_else(|| {
VolumeError::Io(io::Error::new(
io::ErrorKind::NotFound,
format!("volume {} not found on disk", vid),
))
})?;
let vif_path = format!("{}.vif", base_path);
let mut vif = load_vif_volume_info(&vif_path)?;
vif.replication = rp.to_string();
save_vif_volume_info(&vif_path, &vif)?;
Ok(())
}
// ---- Read / Write / Delete ----
@ -654,6 +611,41 @@ fn parse_volume_filename(filename: &str) -> Option<(String, VolumeId)> {
}
}
fn load_vif_volume_info(path: &str) -> Result<VifVolumeInfo, VolumeError> {
let content = match std::fs::read_to_string(path) {
Ok(c) => c,
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(VifVolumeInfo::default()),
Err(e) => return Err(VolumeError::Io(e)),
};
if content.trim().is_empty() {
return Ok(VifVolumeInfo::default());
}
if let Ok(vif) = serde_json::from_str::<VifVolumeInfo>(&content) {
return Ok(vif);
}
#[derive(serde::Deserialize)]
struct LegacyVolumeInfo {
read_only: bool,
}
if let Ok(legacy) = serde_json::from_str::<LegacyVolumeInfo>(&content) {
let mut vif = VifVolumeInfo::default();
vif.read_only = legacy.read_only;
return Ok(vif);
}
Err(VolumeError::Io(io::Error::new(
io::ErrorKind::InvalidData,
format!("invalid volume info file {}", path),
)))
}
fn save_vif_volume_info(path: &str, info: &VifVolumeInfo) -> Result<(), VolumeError> {
let content = serde_json::to_string_pretty(info).map_err(|e| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, e.to_string()))
})?;
std::fs::write(path, content)?;
Ok(())
}
// ============================================================================
// Tests
// ============================================================================

Loading…
Cancel
Save