Browse Source

Match Go EC volume: mark deletions in .ecx and replay .ecj at startup

Go's DeleteNeedleFromEcx marks needles as deleted in the .ecx index
in-place (writing TOMBSTONE_FILE_SIZE at the size field) in addition
to appending to the .ecj journal. Go's RebuildEcxFile replays .ecj
entries into .ecx on startup, then removes the .ecj file.

Rust was only appending to .ecj without marking .ecx, which meant
deleted EC needles remained readable via .ecx binary search. This
fix:
- Opens .ecx in read/write mode (was read-only)
- Adds mark_needle_deleted_in_ecx: binary search + in-place write
- Calls it from journal_delete before appending to .ecj
- Adds rebuild_ecx_from_journal: replays .ecj into .ecx on startup
rust-volume-server
Chris Lu 3 days ago
parent
commit
70e976ca7f
  1. 108
      seaweed-volume/src/storage/erasure_coding/ec_volume.rs

108
seaweed-volume/src/storage/erasure_coding/ec_volume.rs

@ -113,10 +113,11 @@ impl EcVolume {
expire_at_sec, expire_at_sec,
}; };
// Open .ecx file (sorted index), with fallback to data dir
// Open .ecx file (sorted index) in read/write mode for in-place deletion marking.
// Matches Go which opens ecx for writing via MarkNeedleDeleted.
let ecx_path = vol.ecx_file_name(); let ecx_path = vol.ecx_file_name();
if std::path::Path::new(&ecx_path).exists() { if std::path::Path::new(&ecx_path).exists() {
let file = File::open(&ecx_path)?;
let file = OpenOptions::new().read(true).write(true).open(&ecx_path)?;
vol.ecx_file_size = file.metadata()?.len() as i64; vol.ecx_file_size = file.metadata()?.len() as i64;
vol.ecx_file = Some(file); vol.ecx_file = Some(file);
} else if dir_idx != dir { } else if dir_idx != dir {
@ -128,13 +129,16 @@ impl EcVolume {
volume_id = volume_id.0, volume_id = volume_id.0,
"ecx file not found in idx dir, falling back to data dir" "ecx file not found in idx dir, falling back to data dir"
); );
let file = File::open(&fallback_ecx)?;
let file = OpenOptions::new().read(true).write(true).open(&fallback_ecx)?;
vol.ecx_file_size = file.metadata()?.len() as i64; vol.ecx_file_size = file.metadata()?.len() as i64;
vol.ecx_file = Some(file); vol.ecx_file = Some(file);
vol.ecx_actual_dir = dir.to_string(); vol.ecx_actual_dir = dir.to_string();
} }
} }
// Replay .ecj journal into .ecx on startup (matches Go's RebuildEcxFile).
vol.rebuild_ecx_from_journal()?;
// Open .ecj file (deletion journal) — use ecx_actual_dir for consistency // Open .ecj file (deletion journal) — use ecx_actual_dir for consistency
let ecj_base = let ecj_base =
crate::storage::volume::volume_file_name(&vol.ecx_actual_dir, collection, volume_id); crate::storage::volume::volume_file_name(&vol.ecx_actual_dir, collection, volume_id);
@ -448,10 +452,106 @@ impl EcVolume {
Ok((files, files_deleted, total_size)) Ok((files, files_deleted, total_size))
} }
// ---- Deletion ----
/// Mark a needle as deleted in the .ecx file in-place.
/// Matches Go's MarkNeedleDeleted: binary search the .ecx, then overwrite
/// the size field with TOMBSTONE_FILE_SIZE.
fn mark_needle_deleted_in_ecx(&self, needle_id: NeedleId) -> io::Result<bool> {
let ecx_file = match self.ecx_file.as_ref() {
Some(f) => f,
None => return Ok(false),
};
let entry_count = self.ecx_file_size as usize / NEEDLE_MAP_ENTRY_SIZE;
if entry_count == 0 {
return Ok(false);
}
// Binary search for the needle
let mut lo: usize = 0;
let mut hi: usize = entry_count;
let mut entry_buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
while lo < hi {
let mid = lo + (hi - lo) / 2;
let file_offset = (mid * NEEDLE_MAP_ENTRY_SIZE) as u64;
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
ecx_file.read_exact_at(&mut entry_buf, file_offset)?;
}
let (key, _offset, _size) = idx_entry_from_bytes(&entry_buf);
if key == needle_id {
// Found — overwrite the size field with TOMBSTONE_FILE_SIZE
let size_offset = file_offset + NEEDLE_ID_SIZE as u64 + OFFSET_SIZE as u64;
let mut size_buf = [0u8; SIZE_SIZE];
TOMBSTONE_FILE_SIZE.to_bytes(&mut size_buf);
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
ecx_file.write_all_at(&size_buf, size_offset)?;
}
return Ok(true);
} else if key < needle_id {
lo = mid + 1;
} else {
hi = mid;
}
}
Ok(false) // not found
}
/// Replay .ecj journal entries into .ecx on startup.
/// Matches Go's RebuildEcxFile: for each needle ID in .ecj, marks it
/// deleted in .ecx, then removes the .ecj file.
fn rebuild_ecx_from_journal(&mut self) -> io::Result<()> {
let ecj_path = self.ecj_file_name();
if !std::path::Path::new(&ecj_path).exists() {
return Ok(());
}
let data = fs::read(&ecj_path)?;
if data.is_empty() {
return Ok(());
}
let count = data.len() / NEEDLE_ID_SIZE;
for i in 0..count {
let start = i * NEEDLE_ID_SIZE;
if start + NEEDLE_ID_SIZE > data.len() {
break;
}
let needle_id = NeedleId::from_bytes(&data[start..start + NEEDLE_ID_SIZE]);
// Errors for individual entries are non-fatal (needle may not exist in .ecx)
let _ = self.mark_needle_deleted_in_ecx(needle_id);
}
// Remove the .ecj file after replay (matches Go)
let _ = fs::remove_file(&ecj_path);
// Re-create .ecj for future deletions
let ecj_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.append(true)
.open(&ecj_path)?;
self.ecj_file = Some(ecj_file);
Ok(())
}
// ---- Deletion journal ---- // ---- Deletion journal ----
/// Append a deleted needle ID to the .ecj journal.
/// Append a deleted needle ID to the .ecj journal and mark in .ecx.
/// Matches Go's DeleteNeedleFromEcx: marks in .ecx first, then journals.
pub fn journal_delete(&mut self, needle_id: NeedleId) -> io::Result<()> { pub fn journal_delete(&mut self, needle_id: NeedleId) -> io::Result<()> {
// Mark deleted in .ecx in-place (matches Go's MarkNeedleDeleted)
let _ = self.mark_needle_deleted_in_ecx(needle_id);
let ecj_file = self let ecj_file = self
.ecj_file .ecj_file
.as_mut() .as_mut()

Loading…
Cancel
Save