diff --git a/seaweed-volume/src/storage/erasure_coding/ec_volume.rs b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs index 348a9c239..96d2f6ccc 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_volume.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs @@ -113,10 +113,11 @@ impl EcVolume { expire_at_sec, }; - // Open .ecx file (sorted index), with fallback to data dir + // Open .ecx file (sorted index) in read/write mode for in-place deletion marking. + // Matches Go which opens ecx for writing via MarkNeedleDeleted. let ecx_path = vol.ecx_file_name(); if std::path::Path::new(&ecx_path).exists() { - let file = File::open(&ecx_path)?; + let file = OpenOptions::new().read(true).write(true).open(&ecx_path)?; vol.ecx_file_size = file.metadata()?.len() as i64; vol.ecx_file = Some(file); } else if dir_idx != dir { @@ -128,13 +129,16 @@ impl EcVolume { volume_id = volume_id.0, "ecx file not found in idx dir, falling back to data dir" ); - let file = File::open(&fallback_ecx)?; + let file = OpenOptions::new().read(true).write(true).open(&fallback_ecx)?; vol.ecx_file_size = file.metadata()?.len() as i64; vol.ecx_file = Some(file); vol.ecx_actual_dir = dir.to_string(); } } + // Replay .ecj journal into .ecx on startup (matches Go's RebuildEcxFile). + vol.rebuild_ecx_from_journal()?; + // Open .ecj file (deletion journal) — use ecx_actual_dir for consistency let ecj_base = crate::storage::volume::volume_file_name(&vol.ecx_actual_dir, collection, volume_id); @@ -448,10 +452,106 @@ impl EcVolume { Ok((files, files_deleted, total_size)) } + // ---- Deletion ---- + + /// Mark a needle as deleted in the .ecx file in-place. + /// Matches Go's MarkNeedleDeleted: binary search the .ecx, then overwrite + /// the size field with TOMBSTONE_FILE_SIZE. + fn mark_needle_deleted_in_ecx(&self, needle_id: NeedleId) -> io::Result { + let ecx_file = match self.ecx_file.as_ref() { + Some(f) => f, + None => return Ok(false), + }; + + let entry_count = self.ecx_file_size as usize / NEEDLE_MAP_ENTRY_SIZE; + if entry_count == 0 { + return Ok(false); + } + + // Binary search for the needle + let mut lo: usize = 0; + let mut hi: usize = entry_count; + let mut entry_buf = [0u8; NEEDLE_MAP_ENTRY_SIZE]; + + while lo < hi { + let mid = lo + (hi - lo) / 2; + let file_offset = (mid * NEEDLE_MAP_ENTRY_SIZE) as u64; + + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + ecx_file.read_exact_at(&mut entry_buf, file_offset)?; + } + + let (key, _offset, _size) = idx_entry_from_bytes(&entry_buf); + if key == needle_id { + // Found — overwrite the size field with TOMBSTONE_FILE_SIZE + let size_offset = file_offset + NEEDLE_ID_SIZE as u64 + OFFSET_SIZE as u64; + let mut size_buf = [0u8; SIZE_SIZE]; + TOMBSTONE_FILE_SIZE.to_bytes(&mut size_buf); + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + ecx_file.write_all_at(&size_buf, size_offset)?; + } + return Ok(true); + } else if key < needle_id { + lo = mid + 1; + } else { + hi = mid; + } + } + + Ok(false) // not found + } + + /// Replay .ecj journal entries into .ecx on startup. + /// Matches Go's RebuildEcxFile: for each needle ID in .ecj, marks it + /// deleted in .ecx, then removes the .ecj file. + fn rebuild_ecx_from_journal(&mut self) -> io::Result<()> { + let ecj_path = self.ecj_file_name(); + if !std::path::Path::new(&ecj_path).exists() { + return Ok(()); + } + + let data = fs::read(&ecj_path)?; + if data.is_empty() { + return Ok(()); + } + + let count = data.len() / NEEDLE_ID_SIZE; + for i in 0..count { + let start = i * NEEDLE_ID_SIZE; + if start + NEEDLE_ID_SIZE > data.len() { + break; + } + let needle_id = NeedleId::from_bytes(&data[start..start + NEEDLE_ID_SIZE]); + // Errors for individual entries are non-fatal (needle may not exist in .ecx) + let _ = self.mark_needle_deleted_in_ecx(needle_id); + } + + // Remove the .ecj file after replay (matches Go) + let _ = fs::remove_file(&ecj_path); + + // Re-create .ecj for future deletions + let ecj_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .append(true) + .open(&ecj_path)?; + self.ecj_file = Some(ecj_file); + + Ok(()) + } + // ---- Deletion journal ---- - /// Append a deleted needle ID to the .ecj journal. + /// Append a deleted needle ID to the .ecj journal and mark in .ecx. + /// Matches Go's DeleteNeedleFromEcx: marks in .ecx first, then journals. pub fn journal_delete(&mut self, needle_id: NeedleId) -> io::Result<()> { + // Mark deleted in .ecx in-place (matches Go's MarkNeedleDeleted) + let _ = self.mark_needle_deleted_in_ecx(needle_id); let ecj_file = self .ecj_file .as_mut()