From 4c14bbe38b0593ed2db9f0d35c689f450f30c8f3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2026 10:23:50 -0700 Subject: [PATCH] feat(storage): implement makeup_diff for safe concurrent compaction --- seaweed-volume/src/storage/volume.rs | 122 +++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 2b1ad1aec..29258539d 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -1701,6 +1701,11 @@ impl Volume { /// Commit a previously completed compaction: swap .cpd/.cpx to .dat/.idx and reload. pub fn commit_compact(&mut self) -> Result<(), VolumeError> { + self.makeup_diff().map_err(|e| { + warn!("makeup_diff failed: {}", e); + e + })?; + // Close current files if let Some(ref mut nm) = self.nm { nm.close(); @@ -1768,6 +1773,123 @@ impl Volume { Ok(()) } + /// Read any new needles appended during compaction and append them to .cpd/.cpx + fn makeup_diff(&mut self) -> Result<(), VolumeError> { + let old_idx_size = self.nm.as_ref().map_or(0, |nm| nm.index_file_size()); + if old_idx_size == 0 || old_idx_size <= self.last_compact_index_offset { + return Ok(()); + } + + let old_super_block = &self.super_block; + if old_super_block.compaction_revision != self.last_compact_revision { + return Err(VolumeError::Io(io::Error::new( + io::ErrorKind::Other, + format!( + "current old dat file's compact revision {} is not the expected one {}", + old_super_block.compaction_revision, self.last_compact_revision + ), + ))); + } + + let old_idx_path = self.file_name(".idx"); + let mut old_idx_file = File::open(&old_idx_path)?; + + // Read new entries from .idx + let mut incremented_entries = std::collections::HashMap::new(); + let offset = self.last_compact_index_offset; + + old_idx_file.seek(SeekFrom::Start(offset))?; + let entry_count = (old_idx_size - offset) / NEEDLE_MAP_ENTRY_SIZE as u64; + for _ in 0..entry_count { + let mut buf = [0u8; NEEDLE_MAP_ENTRY_SIZE]; + old_idx_file.read_exact(&mut buf)?; + let (key, needle_offset, size) = crate::storage::types::idx_entry_from_bytes(&buf); + incremented_entries.insert(key, (needle_offset, size)); + } + + if incremented_entries.is_empty() { + return Ok(()); + } + + let cpd_path = self.file_name(".cpd"); + let cpx_path = self.file_name(".cpx"); + + let mut dst_dat = OpenOptions::new().read(true).write(true).open(&cpd_path)?; + let mut dst_idx = OpenOptions::new().write(true).append(true).open(&cpx_path)?; + + let mut dat_offset = dst_dat.seek(SeekFrom::End(0))?; + let padding_rem = dat_offset % NEEDLE_PADDING_SIZE as u64; + if padding_rem != 0 { + dat_offset += NEEDLE_PADDING_SIZE as u64 - padding_rem; + dst_dat.seek(SeekFrom::Start(dat_offset))?; + } + + let version = self.version(); + let old_dat_path = self.file_name(".dat"); + let old_dat_file = File::open(&old_dat_path)?; + + for (key, (needle_offset, size)) in incremented_entries { + if !needle_offset.is_zero() && !size.is_deleted() && size.0 > 0 { + let actual_size = crate::storage::needle::needle::get_actual_size(size, version); + let mut blob = vec![0u8; actual_size as usize]; + + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + old_dat_file.read_exact_at(&mut blob, needle_offset.to_actual_offset() as u64)?; + } + #[cfg(windows)] + { + crate::storage::volume::read_exact_at(&old_dat_file, &mut blob, needle_offset.to_actual_offset() as u64)?; + } + + dst_dat.write_all(&blob)?; + + let mut idx_entry_buf = [0u8; NEEDLE_MAP_ENTRY_SIZE]; + crate::storage::types::idx_entry_to_bytes( + &mut idx_entry_buf, + key, + Offset::from_actual_offset(dat_offset as i64), + size, + ); + dst_idx.write_all(&idx_entry_buf)?; + + dat_offset += actual_size as u64; + } else { + let mut fake_del_needle = Needle { + id: key, + cookie: Cookie(0x12345678), + append_at_ns: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64, + ..Needle::default() + }; + let bytes = fake_del_needle.write_bytes(version); + dst_dat.write_all(&bytes)?; + + let mut idx_entry_buf = [0u8; NEEDLE_MAP_ENTRY_SIZE]; + crate::storage::types::idx_entry_to_bytes( + &mut idx_entry_buf, + key, + Offset::from_actual_offset(0), + Size(crate::storage::types::TOMBSTONE_FILE_SIZE.into()), + ); + dst_idx.write_all(&idx_entry_buf)?; + + dat_offset += bytes.len() as u64; + } + + let padding = NEEDLE_PADDING_SIZE as u64 - (dat_offset % NEEDLE_PADDING_SIZE as u64); + if padding != NEEDLE_PADDING_SIZE as u64 { + dat_offset += padding; + dst_dat.seek(SeekFrom::Start(dat_offset))?; + } + } + + dst_dat.sync_all()?; + dst_idx.sync_all()?; + + Ok(()) + } + // ---- Sync / Close ---- pub fn sync_to_disk(&mut self) -> io::Result<()> {