From 70e976ca7fc9042b49d1b4f654ad05ec0d9116f6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 17 Mar 2026 16:14:12 -0700 Subject: [PATCH] Match Go EC volume: mark deletions in .ecx and replay .ecj at startup Go's DeleteNeedleFromEcx marks needles as deleted in the .ecx index in-place (writing TOMBSTONE_FILE_SIZE at the size field) in addition to appending to the .ecj journal. Go's RebuildEcxFile replays .ecj entries into .ecx on startup, then removes the .ecj file. Rust was only appending to .ecj without marking .ecx, which meant deleted EC needles remained readable via .ecx binary search. This fix: - Opens .ecx in read/write mode (was read-only) - Adds mark_needle_deleted_in_ecx: binary search + in-place write - Calls it from journal_delete before appending to .ecj - Adds rebuild_ecx_from_journal: replays .ecj into .ecx on startup --- .../src/storage/erasure_coding/ec_volume.rs | 108 +++++++++++++++++- 1 file changed, 104 insertions(+), 4 deletions(-) diff --git a/seaweed-volume/src/storage/erasure_coding/ec_volume.rs b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs index 348a9c239..96d2f6ccc 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_volume.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs @@ -113,10 +113,11 @@ impl EcVolume { expire_at_sec, }; - // Open .ecx file (sorted index), with fallback to data dir + // Open .ecx file (sorted index) in read/write mode for in-place deletion marking. + // Matches Go which opens ecx for writing via MarkNeedleDeleted. let ecx_path = vol.ecx_file_name(); if std::path::Path::new(&ecx_path).exists() { - let file = File::open(&ecx_path)?; + let file = OpenOptions::new().read(true).write(true).open(&ecx_path)?; vol.ecx_file_size = file.metadata()?.len() as i64; vol.ecx_file = Some(file); } else if dir_idx != dir { @@ -128,13 +129,16 @@ impl EcVolume { volume_id = volume_id.0, "ecx file not found in idx dir, falling back to data dir" ); - let file = File::open(&fallback_ecx)?; + let file = OpenOptions::new().read(true).write(true).open(&fallback_ecx)?; vol.ecx_file_size = file.metadata()?.len() as i64; vol.ecx_file = Some(file); vol.ecx_actual_dir = dir.to_string(); } } + // Replay .ecj journal into .ecx on startup (matches Go's RebuildEcxFile). + vol.rebuild_ecx_from_journal()?; + // Open .ecj file (deletion journal) — use ecx_actual_dir for consistency let ecj_base = crate::storage::volume::volume_file_name(&vol.ecx_actual_dir, collection, volume_id); @@ -448,10 +452,106 @@ impl EcVolume { Ok((files, files_deleted, total_size)) } + // ---- Deletion ---- + + /// Mark a needle as deleted in the .ecx file in-place. + /// Matches Go's MarkNeedleDeleted: binary search the .ecx, then overwrite + /// the size field with TOMBSTONE_FILE_SIZE. + fn mark_needle_deleted_in_ecx(&self, needle_id: NeedleId) -> io::Result { + let ecx_file = match self.ecx_file.as_ref() { + Some(f) => f, + None => return Ok(false), + }; + + let entry_count = self.ecx_file_size as usize / NEEDLE_MAP_ENTRY_SIZE; + if entry_count == 0 { + return Ok(false); + } + + // Binary search for the needle + let mut lo: usize = 0; + let mut hi: usize = entry_count; + let mut entry_buf = [0u8; NEEDLE_MAP_ENTRY_SIZE]; + + while lo < hi { + let mid = lo + (hi - lo) / 2; + let file_offset = (mid * NEEDLE_MAP_ENTRY_SIZE) as u64; + + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + ecx_file.read_exact_at(&mut entry_buf, file_offset)?; + } + + let (key, _offset, _size) = idx_entry_from_bytes(&entry_buf); + if key == needle_id { + // Found — overwrite the size field with TOMBSTONE_FILE_SIZE + let size_offset = file_offset + NEEDLE_ID_SIZE as u64 + OFFSET_SIZE as u64; + let mut size_buf = [0u8; SIZE_SIZE]; + TOMBSTONE_FILE_SIZE.to_bytes(&mut size_buf); + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + ecx_file.write_all_at(&size_buf, size_offset)?; + } + return Ok(true); + } else if key < needle_id { + lo = mid + 1; + } else { + hi = mid; + } + } + + Ok(false) // not found + } + + /// Replay .ecj journal entries into .ecx on startup. + /// Matches Go's RebuildEcxFile: for each needle ID in .ecj, marks it + /// deleted in .ecx, then removes the .ecj file. + fn rebuild_ecx_from_journal(&mut self) -> io::Result<()> { + let ecj_path = self.ecj_file_name(); + if !std::path::Path::new(&ecj_path).exists() { + return Ok(()); + } + + let data = fs::read(&ecj_path)?; + if data.is_empty() { + return Ok(()); + } + + let count = data.len() / NEEDLE_ID_SIZE; + for i in 0..count { + let start = i * NEEDLE_ID_SIZE; + if start + NEEDLE_ID_SIZE > data.len() { + break; + } + let needle_id = NeedleId::from_bytes(&data[start..start + NEEDLE_ID_SIZE]); + // Errors for individual entries are non-fatal (needle may not exist in .ecx) + let _ = self.mark_needle_deleted_in_ecx(needle_id); + } + + // Remove the .ecj file after replay (matches Go) + let _ = fs::remove_file(&ecj_path); + + // Re-create .ecj for future deletions + let ecj_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .append(true) + .open(&ecj_path)?; + self.ecj_file = Some(ecj_file); + + Ok(()) + } + // ---- Deletion journal ---- - /// Append a deleted needle ID to the .ecj journal. + /// Append a deleted needle ID to the .ecj journal and mark in .ecx. + /// Matches Go's DeleteNeedleFromEcx: marks in .ecx first, then journals. pub fn journal_delete(&mut self, needle_id: NeedleId) -> io::Result<()> { + // Mark deleted in .ecx in-place (matches Go's MarkNeedleDeleted) + let _ = self.mark_needle_deleted_in_ecx(needle_id); let ecj_file = self .ecj_file .as_mut()