From 5cf148eb33d40f03c28c1d608bfa6f826760993c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 17 Mar 2026 16:06:40 -0700 Subject: [PATCH] Match Go volume: fix version(), integrity check, scrub, and commit_compact - version(): use self.version() instead of self.super_block.version in read_all_needles, check_volume_data_integrity, scan_raw_needles_from to respect volumeInfo.version override - check_volume_data_integrity: initialize healthy_index_size to idx_size (matching Go) and continue on EOF instead of returning error - scrub(): count deleted needles in total_read since they still occupy space in the .dat file (matches Go's totalRead += actualSize for deleted) - commit_compact: clean up .cpd/.cpx files on makeup_diff failure (matches Go's error path cleanup) --- seaweed-volume/src/storage/volume.rs | 61 +++++++++++++--------------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index c27527d81..426d84a4e 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -1670,7 +1670,7 @@ impl Volume { pub fn read_all_needles(&self) -> Result, VolumeError> { let _guard = self.data_file_access_control.read_lock(); let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; - let version = self.super_block.version; + let version = self.version(); let dat_size = self.current_dat_file_size()? as i64; let mut needles = Vec::new(); let mut offset = self.super_block.block_size() as i64; @@ -1742,15 +1742,17 @@ impl Volume { ))); } - let version = self.super_block.version; + let version = self.version(); - // Check last 10 index entries (matching Go's loop: for i := 1; i <= 10 ...) - // Go does NOT break on first success; it tracks healthyIndexSize and only - // breaks on ErrorSizeMismatch. After the loop, it returns an error if - // healthyIndexSize < indexSize (detecting trailing corrupt entries). + // Check last 10 index entries (matching Go's CheckVolumeDataIntegrity). + // Go starts healthyIndexSize = indexSize and reduces on EOF. + // On success: break (err != ErrorSizeMismatch when err == nil). + // On EOF: set healthyIndexSize = position of corrupt entry, continue. + // On ErrorSizeMismatch: continue (try next entry). + // After loop: if healthyIndexSize < indexSize → error. let mut idx_file = File::open(&idx_path)?; let max_entries = std::cmp::min(10, idx_size / NEEDLE_MAP_ENTRY_SIZE as i64); - let mut healthy_index_size: i64 = 0; + let mut healthy_index_size: i64 = idx_size; for i in 1..=max_entries { let entry_offset = idx_size - i * NEEDLE_MAP_ENTRY_SIZE as i64; @@ -1770,14 +1772,10 @@ impl Volume { match self.read_exact_at_backend(&mut header, actual_offset) { Ok(()) => {} Err(VolumeError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => { - // Needle beyond end of file -- corrupt - return Err(VolumeError::Io(io::Error::new( - io::ErrorKind::InvalidData, - format!( - "needle at index offset {} beyond dat file end", - actual_offset - ), - ))); + // Match Go: on EOF, mark this entry as corrupt and continue + // checking earlier entries (healthyIndexSize tracks the boundary). + healthy_index_size = entry_offset; + continue; } Err(e) => return Err(e), } @@ -1806,11 +1804,6 @@ impl Volume { { let (_, _, alt_size) = Needle::parse_header(&alt_header); if alt_size.0 == size.0 { - // Update healthy_index_size and continue - let pos = entry_offset + NEEDLE_MAP_ENTRY_SIZE as i64; - if pos > healthy_index_size { - healthy_index_size = pos; - } continue; } } @@ -1829,12 +1822,6 @@ impl Volume { } } } - - // Track the highest verified index position - let pos = entry_offset + NEEDLE_MAP_ENTRY_SIZE as i64; - if pos > healthy_index_size { - healthy_index_size = pos; - } } // Match Go: if healthyIndexSize < indexSize, trailing entries are corrupt @@ -1930,13 +1917,18 @@ impl Volume { let mut total_read: i64 = 0; for (needle_id, nv) in nm.iter_entries() { - if nv.offset.is_zero() || nv.size.is_deleted() { + if nv.offset.is_zero() { continue; } - // Accumulate actual needle size (matches Go's totalRead += GetActualSize) + // Accumulate actual needle size for ALL entries including deleted ones + // (matches Go: deleted needles still occupy space in the .dat file). total_read += get_actual_size(nv.size, version); + if nv.size.is_deleted() { + continue; + } + let offset = nv.offset.to_actual_offset(); if offset < 0 || offset as u64 >= dat_size { broken.push(format!( @@ -1985,7 +1977,7 @@ impl Volume { &self, from_offset: u64, ) -> Result, Vec, u64)>, VolumeError> { - let version = self.super_block.version; + let version = self.version(); let dat_size = self.current_dat_file_size()?; let mut entries = Vec::new(); let mut offset = from_offset; @@ -2709,10 +2701,15 @@ impl Volume { } fn do_commit_compact(&mut self) -> Result<(), VolumeError> { - self.makeup_diff().map_err(|e| { + if let Err(e) = self.makeup_diff() { warn!("makeup_diff failed: {}", e); - e - })?; + // Match Go: clean up .cpd/.cpx on makeup_diff failure + let cpd = self.file_name(".cpd"); + let cpx = self.file_name(".cpx"); + let _ = fs::remove_file(&cpd); + let _ = fs::remove_file(&cpx); + return Err(e); + } // Close current files if let Some(ref mut nm) = self.nm {