Browse Source

Match Go volume: fix version(), integrity check, scrub, and commit_compact

- version(): use self.version() instead of self.super_block.version in
  read_all_needles, check_volume_data_integrity, scan_raw_needles_from
  to respect volumeInfo.version override
- check_volume_data_integrity: initialize healthy_index_size to idx_size
  (matching Go) and continue on EOF instead of returning error
- scrub(): count deleted needles in total_read since they still occupy
  space in the .dat file (matches Go's totalRead += actualSize for deleted)
- commit_compact: clean up .cpd/.cpx files on makeup_diff failure
  (matches Go's error path cleanup)
rust-volume-server
Chris Lu 3 days ago
parent
commit
5cf148eb33
  1. 61
      seaweed-volume/src/storage/volume.rs

61
seaweed-volume/src/storage/volume.rs

@ -1670,7 +1670,7 @@ impl Volume {
pub fn read_all_needles(&self) -> Result<Vec<Needle>, VolumeError> {
let _guard = self.data_file_access_control.read_lock();
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
let version = self.super_block.version;
let version = self.version();
let dat_size = self.current_dat_file_size()? as i64;
let mut needles = Vec::new();
let mut offset = self.super_block.block_size() as i64;
@ -1742,15 +1742,17 @@ impl Volume {
)));
}
let version = self.super_block.version;
let version = self.version();
// Check last 10 index entries (matching Go's loop: for i := 1; i <= 10 ...)
// Go does NOT break on first success; it tracks healthyIndexSize and only
// breaks on ErrorSizeMismatch. After the loop, it returns an error if
// healthyIndexSize < indexSize (detecting trailing corrupt entries).
// Check last 10 index entries (matching Go's CheckVolumeDataIntegrity).
// Go starts healthyIndexSize = indexSize and reduces on EOF.
// On success: break (err != ErrorSizeMismatch when err == nil).
// On EOF: set healthyIndexSize = position of corrupt entry, continue.
// On ErrorSizeMismatch: continue (try next entry).
// After loop: if healthyIndexSize < indexSize → error.
let mut idx_file = File::open(&idx_path)?;
let max_entries = std::cmp::min(10, idx_size / NEEDLE_MAP_ENTRY_SIZE as i64);
let mut healthy_index_size: i64 = 0;
let mut healthy_index_size: i64 = idx_size;
for i in 1..=max_entries {
let entry_offset = idx_size - i * NEEDLE_MAP_ENTRY_SIZE as i64;
@ -1770,14 +1772,10 @@ impl Volume {
match self.read_exact_at_backend(&mut header, actual_offset) {
Ok(()) => {}
Err(VolumeError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
// Needle beyond end of file -- corrupt
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"needle at index offset {} beyond dat file end",
actual_offset
),
)));
// Match Go: on EOF, mark this entry as corrupt and continue
// checking earlier entries (healthyIndexSize tracks the boundary).
healthy_index_size = entry_offset;
continue;
}
Err(e) => return Err(e),
}
@ -1806,11 +1804,6 @@ impl Volume {
{
let (_, _, alt_size) = Needle::parse_header(&alt_header);
if alt_size.0 == size.0 {
// Update healthy_index_size and continue
let pos = entry_offset + NEEDLE_MAP_ENTRY_SIZE as i64;
if pos > healthy_index_size {
healthy_index_size = pos;
}
continue;
}
}
@ -1829,12 +1822,6 @@ impl Volume {
}
}
}
// Track the highest verified index position
let pos = entry_offset + NEEDLE_MAP_ENTRY_SIZE as i64;
if pos > healthy_index_size {
healthy_index_size = pos;
}
}
// Match Go: if healthyIndexSize < indexSize, trailing entries are corrupt
@ -1930,13 +1917,18 @@ impl Volume {
let mut total_read: i64 = 0;
for (needle_id, nv) in nm.iter_entries() {
if nv.offset.is_zero() || nv.size.is_deleted() {
if nv.offset.is_zero() {
continue;
}
// Accumulate actual needle size (matches Go's totalRead += GetActualSize)
// Accumulate actual needle size for ALL entries including deleted ones
// (matches Go: deleted needles still occupy space in the .dat file).
total_read += get_actual_size(nv.size, version);
if nv.size.is_deleted() {
continue;
}
let offset = nv.offset.to_actual_offset();
if offset < 0 || offset as u64 >= dat_size {
broken.push(format!(
@ -1985,7 +1977,7 @@ impl Volume {
&self,
from_offset: u64,
) -> Result<Vec<(Vec<u8>, Vec<u8>, u64)>, VolumeError> {
let version = self.super_block.version;
let version = self.version();
let dat_size = self.current_dat_file_size()?;
let mut entries = Vec::new();
let mut offset = from_offset;
@ -2709,10 +2701,15 @@ impl Volume {
}
fn do_commit_compact(&mut self) -> Result<(), VolumeError> {
self.makeup_diff().map_err(|e| {
if let Err(e) = self.makeup_diff() {
warn!("makeup_diff failed: {}", e);
e
})?;
// Match Go: clean up .cpd/.cpx on makeup_diff failure
let cpd = self.file_name(".cpd");
let cpx = self.file_name(".cpx");
let _ = fs::remove_file(&cpd);
let _ = fs::remove_file(&cpx);
return Err(e);
}
// Close current files
if let Some(ref mut nm) = self.nm {

Loading…
Cancel
Save