From e861f14aa42ffc70d929e875d0a254b4c8534dcb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Mar 2026 21:23:17 -0700 Subject: [PATCH] Add CheckVolumeDataIntegrity on volume load matching Go --- seaweed-volume/src/storage/volume.rs | 128 +++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 414c4d296..08d2156d8 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -653,6 +653,19 @@ impl Volume { if also_load_index { self.load_index()?; + + // Match Go: CheckVolumeDataIntegrity after loading index (volume_loading.go L154-159) + // Only for non-remote volumes (remote storage may not have local .dat) + if !self.has_remote_file { + if let Err(e) = self.check_volume_data_integrity() { + self.no_write_or_delete = true; + warn!( + volume_id = self.id.0, + error = %e, + "volumeDataIntegrityChecking failed" + ); + } + } } // Match Go: if no .vif file existed, create one with version and bytes_offset @@ -1610,6 +1623,121 @@ impl Volume { Ok(needles) } + /// Check volume data integrity by verifying the last index entries against the .dat file. + /// Matches Go's CheckVolumeDataIntegrity (volume_checking.go L117-141). + /// Reads the last few index entries, verifies each needle header is readable and + /// consistent. On failure, marks the volume read-only. + fn check_volume_data_integrity(&mut self) -> Result<(), VolumeError> { + let idx_path = self.file_name(".idx"); + if !Path::new(&idx_path).exists() { + return Ok(()); + } + + let idx_size = fs::metadata(&idx_path).map(|m| m.len()).unwrap_or(0) as i64; + if idx_size == 0 { + return Ok(()); + } + if idx_size % NEEDLE_MAP_ENTRY_SIZE as i64 != 0 { + return Err(VolumeError::Io(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "index file's size is {} bytes, maybe corrupted", + idx_size + ), + ))); + } + + let version = self.super_block.version; + + // Check last 10 index entries (matching Go's loop: for i := 1; i <= 10 ...) + let mut idx_file = File::open(&idx_path)?; + let max_entries = std::cmp::min(10, idx_size / NEEDLE_MAP_ENTRY_SIZE as i64); + + for i in 1..=max_entries { + let entry_offset = idx_size - i * NEEDLE_MAP_ENTRY_SIZE as i64; + let mut buf = [0u8; NEEDLE_MAP_ENTRY_SIZE]; + idx_file.seek(SeekFrom::Start(entry_offset as u64))?; + idx_file.read_exact(&mut buf)?; + + let (key, offset, size) = idx_entry_from_bytes(&buf); + if offset.is_zero() { + continue; + } + + let actual_offset = offset.to_actual_offset() as u64; + + // Read needle header at the offset + let mut header = [0u8; NEEDLE_HEADER_SIZE]; + match self.read_exact_at_backend(&mut header, actual_offset) { + Ok(()) => {} + Err(VolumeError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => { + // Needle beyond end of file -- corrupt + return Err(VolumeError::Io(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "needle at index offset {} beyond dat file end", + actual_offset + ), + ))); + } + Err(e) => return Err(e), + } + + let (_cookie, needle_id, needle_size) = Needle::parse_header(&header); + + // Verify the needle ID matches the index entry + if !key.is_empty() && needle_id != key { + return Err(VolumeError::Io(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "index key {:?} does not match needle Id {:?} at offset {}", + key, needle_id, actual_offset + ), + ))); + } + + // For non-deleted entries, verify the size matches + if !size.is_deleted() && size.0 > 0 && needle_size.0 != size.0 { + // Try with MaxPossibleVolumeSize offset adjustment (Go parity) + let alt_offset = actual_offset + MAX_POSSIBLE_VOLUME_SIZE as u64; + let mut alt_header = [0u8; NEEDLE_HEADER_SIZE]; + if self + .read_exact_at_backend(&mut alt_header, alt_offset) + .is_ok() + { + let (_, _, alt_size) = Needle::parse_header(&alt_header); + if alt_size.0 == size.0 { + continue; // alternative offset worked + } + } + return Err(VolumeError::Io(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "needle size {} does not match index size {} at offset {}", + needle_size.0, size.0, actual_offset + ), + ))); + } + + // If V3, try to read the append timestamp from the last verified entry + if version == VERSION_3 && !size.is_deleted() && size.0 > 0 { + let ts_offset = actual_offset + NEEDLE_HEADER_SIZE as u64 + size.0 as u64 + 4; // skip checksum + let mut ts_buf = [0u8; 8]; + if self.read_exact_at_backend(&mut ts_buf, ts_offset).is_ok() { + let ts = u64::from_be_bytes(ts_buf); + if ts > 0 { + self.last_append_at_ns = ts; + } + } + } + + // First successfully verified entry is enough + break; + } + + Ok(()) + } + /// Scrub the volume index by verifying each needle map entry against the dat file. /// For each entry, reads only the 16-byte needle header at the given offset to verify: /// correct needle ID, correct cookie (non-zero), and valid size.