Browse Source

Add CheckVolumeDataIntegrity on volume load matching Go

rust-volume-server
Chris Lu 3 days ago
parent
commit
e861f14aa4
  1. 128
      seaweed-volume/src/storage/volume.rs

128
seaweed-volume/src/storage/volume.rs

@ -653,6 +653,19 @@ impl Volume {
if also_load_index {
self.load_index()?;
// Match Go: CheckVolumeDataIntegrity after loading index (volume_loading.go L154-159)
// Only for non-remote volumes (remote storage may not have local .dat)
if !self.has_remote_file {
if let Err(e) = self.check_volume_data_integrity() {
self.no_write_or_delete = true;
warn!(
volume_id = self.id.0,
error = %e,
"volumeDataIntegrityChecking failed"
);
}
}
}
// Match Go: if no .vif file existed, create one with version and bytes_offset
@ -1610,6 +1623,121 @@ impl Volume {
Ok(needles)
}
/// Check volume data integrity by verifying the last index entries against the .dat file.
/// Matches Go's CheckVolumeDataIntegrity (volume_checking.go L117-141).
/// Reads the last few index entries, verifies each needle header is readable and
/// consistent. On failure, marks the volume read-only.
fn check_volume_data_integrity(&mut self) -> Result<(), VolumeError> {
let idx_path = self.file_name(".idx");
if !Path::new(&idx_path).exists() {
return Ok(());
}
let idx_size = fs::metadata(&idx_path).map(|m| m.len()).unwrap_or(0) as i64;
if idx_size == 0 {
return Ok(());
}
if idx_size % NEEDLE_MAP_ENTRY_SIZE as i64 != 0 {
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"index file's size is {} bytes, maybe corrupted",
idx_size
),
)));
}
let version = self.super_block.version;
// Check last 10 index entries (matching Go's loop: for i := 1; i <= 10 ...)
let mut idx_file = File::open(&idx_path)?;
let max_entries = std::cmp::min(10, idx_size / NEEDLE_MAP_ENTRY_SIZE as i64);
for i in 1..=max_entries {
let entry_offset = idx_size - i * NEEDLE_MAP_ENTRY_SIZE as i64;
let mut buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
idx_file.seek(SeekFrom::Start(entry_offset as u64))?;
idx_file.read_exact(&mut buf)?;
let (key, offset, size) = idx_entry_from_bytes(&buf);
if offset.is_zero() {
continue;
}
let actual_offset = offset.to_actual_offset() as u64;
// Read needle header at the offset
let mut header = [0u8; NEEDLE_HEADER_SIZE];
match self.read_exact_at_backend(&mut header, actual_offset) {
Ok(()) => {}
Err(VolumeError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
// Needle beyond end of file -- corrupt
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"needle at index offset {} beyond dat file end",
actual_offset
),
)));
}
Err(e) => return Err(e),
}
let (_cookie, needle_id, needle_size) = Needle::parse_header(&header);
// Verify the needle ID matches the index entry
if !key.is_empty() && needle_id != key {
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"index key {:?} does not match needle Id {:?} at offset {}",
key, needle_id, actual_offset
),
)));
}
// For non-deleted entries, verify the size matches
if !size.is_deleted() && size.0 > 0 && needle_size.0 != size.0 {
// Try with MaxPossibleVolumeSize offset adjustment (Go parity)
let alt_offset = actual_offset + MAX_POSSIBLE_VOLUME_SIZE as u64;
let mut alt_header = [0u8; NEEDLE_HEADER_SIZE];
if self
.read_exact_at_backend(&mut alt_header, alt_offset)
.is_ok()
{
let (_, _, alt_size) = Needle::parse_header(&alt_header);
if alt_size.0 == size.0 {
continue; // alternative offset worked
}
}
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"needle size {} does not match index size {} at offset {}",
needle_size.0, size.0, actual_offset
),
)));
}
// If V3, try to read the append timestamp from the last verified entry
if version == VERSION_3 && !size.is_deleted() && size.0 > 0 {
let ts_offset = actual_offset + NEEDLE_HEADER_SIZE as u64 + size.0 as u64 + 4; // skip checksum
let mut ts_buf = [0u8; 8];
if self.read_exact_at_backend(&mut ts_buf, ts_offset).is_ok() {
let ts = u64::from_be_bytes(ts_buf);
if ts > 0 {
self.last_append_at_ns = ts;
}
}
}
// First successfully verified entry is enough
break;
}
Ok(())
}
/// Scrub the volume index by verifying each needle map entry against the dat file.
/// For each entry, reads only the 16-byte needle header at the given offset to verify:
/// correct needle ID, correct cookie (non-zero), and valid size.

Loading…
Cancel
Save