From e63c34eb6dc8c57f9fd900759a9234535eb0012d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Mar 2026 03:40:32 -0800 Subject: [PATCH] implement vacuum volume compaction (compact/commit/cleanup) Adds full volume compaction support: - compact_by_index: iterates live needles, writes to new .cpd/.cpx files, skipping deleted and TTL-expired entries - commit_compact: swaps .cpd->.dat and .cpx->.idx, reloads volume - cleanup_compact: removes leftover .cpd/.cpx files Wires VacuumVolumeCompact (streaming progress), VacuumVolumeCommit, and VacuumVolumeCleanup gRPC RPCs. Also adds save_to_idx and ascending_visit methods to CompactNeedleMap. Includes unit test verifying compact removes deleted needles and preserves live data through the full compact/commit cycle. --- seaweed-volume/src/server/grpc_server.rs | 64 +++++- seaweed-volume/src/storage/needle_map.rs | 33 +++ seaweed-volume/src/storage/store.rs | 53 +++++ seaweed-volume/src/storage/volume.rs | 259 +++++++++++++++++++++++ 4 files changed, 403 insertions(+), 6 deletions(-) diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index f458d2aef..a98ce0079 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -158,26 +158,78 @@ impl VolumeServer for VolumeGrpcService { type VacuumVolumeCompactStream = BoxStream; async fn vacuum_volume_compact( &self, - _request: Request, + request: Request, ) -> Result, Status> { self.state.check_maintenance()?; - Err(Status::unimplemented("vacuum_volume_compact not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + let preallocate = req.preallocate as u64; + let state = self.state.clone(); + + let (tx, rx) = tokio::sync::mpsc::channel(16); + + tokio::task::spawn_blocking(move || { + let report_interval: i64 = 128 * 1024 * 1024; + let next_report = std::sync::atomic::AtomicI64::new(report_interval); + + let tx_clone = tx.clone(); + let result = { + let mut store = state.store.write().unwrap(); + store.compact_volume(vid, preallocate, 0, |processed| { + let target = next_report.load(std::sync::atomic::Ordering::Relaxed); + if processed > target { + let resp = volume_server_pb::VacuumVolumeCompactResponse { + processed_bytes: processed, + load_avg_1m: 0.0, + }; + let _ = tx_clone.blocking_send(Ok(resp)); + next_report.store( + processed + report_interval, + std::sync::atomic::Ordering::Relaxed, + ); + } + true + }) + }; + + if let Err(e) = result { + let _ = tx.blocking_send(Err(Status::internal(e))); + } + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(Response::new(Box::pin(stream) as Self::VacuumVolumeCompactStream)) } async fn vacuum_volume_commit( &self, - _request: Request, + request: Request, ) -> Result, Status> { self.state.check_maintenance()?; - Err(Status::unimplemented("vacuum_volume_commit not yet implemented")) + let vid = VolumeId(request.into_inner().volume_id); + let mut store = self.state.store.write().unwrap(); + match store.commit_compact_volume(vid) { + Ok((is_read_only, volume_size)) => { + Ok(Response::new(volume_server_pb::VacuumVolumeCommitResponse { + is_read_only, + volume_size, + })) + } + Err(e) => Err(Status::internal(e)), + } } async fn vacuum_volume_cleanup( &self, - _request: Request, + request: Request, ) -> Result, Status> { self.state.check_maintenance()?; - Err(Status::unimplemented("vacuum_volume_cleanup not yet implemented")) + let vid = VolumeId(request.into_inner().volume_id); + let mut store = self.state.store.write().unwrap(); + match store.cleanup_compact_volume(vid) { + Ok(()) => Ok(Response::new(volume_server_pb::VacuumVolumeCleanupResponse {})), + Err(e) => Err(Status::internal(e)), + } } async fn delete_collection( diff --git a/seaweed-volume/src/storage/needle_map.rs b/seaweed-volume/src/storage/needle_map.rs index 378f1da2e..124067900 100644 --- a/seaweed-volume/src/storage/needle_map.rs +++ b/seaweed-volume/src/storage/needle_map.rs @@ -245,6 +245,39 @@ impl CompactNeedleMap { let _ = self.sync(); self.idx_file = None; } + + /// Save the in-memory map to an index file, sorted by needle ID ascending. + pub fn save_to_idx(&self, path: &str) -> io::Result<()> { + let mut entries: Vec<_> = self.map.iter() + .filter(|(_, nv)| nv.size.is_valid()) + .collect(); + entries.sort_by_key(|(id, _)| **id); + + let mut file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path)?; + + for (id, nv) in entries { + idx::write_index_entry(&mut file, *id, nv.offset, nv.size)?; + } + file.sync_all()?; + Ok(()) + } + + /// Visit all live entries in ascending order by needle ID. + pub fn ascending_visit(&self, mut f: F) -> Result<(), String> + where + F: FnMut(NeedleId, &NeedleValue) -> Result<(), String>, + { + let mut entries: Vec<_> = self.map.iter().collect(); + entries.sort_by_key(|(id, _)| **id); + for (&id, nv) in entries { + f(id, nv)?; + } + Ok(()) + } } #[cfg(test)] diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index ae37d86b0..beaad6ce7 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -381,6 +381,59 @@ impl Store { None } + // ---- Vacuum / Compaction ---- + + /// Check the garbage level of a volume. + pub fn check_compact_volume(&self, vid: VolumeId) -> Result { + if let Some((_, v)) = self.find_volume(vid) { + Ok(v.garbage_level()) + } else { + Err(format!("volume id {} is not found during check compact", vid.0)) + } + } + + /// Compact a volume by rewriting only live needles. + pub fn compact_volume( + &mut self, + vid: VolumeId, + preallocate: u64, + max_bytes_per_second: i64, + progress_fn: F, + ) -> Result<(), String> + where + F: Fn(i64) -> bool, + { + if let Some((_, v)) = self.find_volume_mut(vid) { + v.compact_by_index(preallocate, max_bytes_per_second, progress_fn) + .map_err(|e| format!("compact volume {}: {}", vid.0, e)) + } else { + Err(format!("volume id {} is not found during compact", vid.0)) + } + } + + /// Commit a completed compaction: swap files and reload. + pub fn commit_compact_volume(&mut self, vid: VolumeId) -> Result<(bool, u64), String> { + if let Some((_, v)) = self.find_volume_mut(vid) { + let is_read_only = v.is_read_only(); + v.commit_compact() + .map_err(|e| format!("commit compact volume {}: {}", vid.0, e))?; + let volume_size = v.dat_file_size().unwrap_or(0); + Ok((is_read_only, volume_size)) + } else { + Err(format!("volume id {} is not found during commit compact", vid.0)) + } + } + + /// Clean up leftover compaction files. + pub fn cleanup_compact_volume(&mut self, vid: VolumeId) -> Result<(), String> { + if let Some((_, v)) = self.find_volume_mut(vid) { + v.cleanup_compact() + .map_err(|e| format!("cleanup volume {}: {}", vid.0, e)) + } else { + Err(format!("volume id {} is not found during cleaning up", vid.0)) + } + } + /// Close all locations and their volumes. pub fn close(&mut self) { for loc in &mut self.locations { diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 0af6a8bf4..676cd9f7d 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -815,6 +815,195 @@ impl Volume { self.nm.as_ref().map_or(0, |nm| nm.index_file_size()) } + // ---- Compaction / Vacuum ---- + + /// Compact the volume by copying only live needles to new .cpd/.cpx files. + /// This reads from the current .dat/.idx and writes to .cpd/.cpx. + /// Call `commit_compact()` after to swap the files. + pub fn compact_by_index( + &mut self, + _preallocate: u64, + _max_bytes_per_second: i64, + progress_fn: F, + ) -> Result<(), VolumeError> + where + F: Fn(i64) -> bool, + { + if self.is_compacting { + return Ok(()); // already compacting + } + self.is_compacting = true; + + let result = self.do_compact_by_index(progress_fn); + + self.is_compacting = false; + result + } + + fn do_compact_by_index(&mut self, progress_fn: F) -> Result<(), VolumeError> + where + F: Fn(i64) -> bool, + { + // Record state before compaction for makeupDiff + self.last_compact_index_offset = self.nm.as_ref().map_or(0, |nm| nm.index_file_size()); + self.last_compact_revision = self.super_block.compaction_revision; + + // Sync current data + self.sync_to_disk()?; + + let cpd_path = self.file_name(".cpd"); + let cpx_path = self.file_name(".cpx"); + let version = self.version(); + + // Write new super block with incremented compaction revision + let mut new_sb = self.super_block.clone(); + new_sb.compaction_revision += 1; + let sb_bytes = new_sb.to_bytes(); + + let mut dst = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(&cpd_path)?; + dst.write_all(&sb_bytes)?; + let mut new_offset = sb_bytes.len() as i64; + + // Build new index in memory + let mut new_nm = CompactNeedleMap::new(); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + // Collect live entries from needle map (sorted ascending) + let nm = self.nm.as_ref().ok_or(VolumeError::NotInitialized)?; + let mut entries: Vec<(NeedleId, Offset, Size)> = Vec::new(); + for (&id, nv) in nm.iter() { + if nv.offset.is_zero() || nv.size.is_deleted() { + continue; + } + entries.push((id, nv.offset, nv.size)); + } + entries.sort_by_key(|(id, _, _)| *id); + + for (id, offset, size) in entries { + // Progress callback + if !progress_fn(offset.to_actual_offset()) { + // Interrupted + let _ = fs::remove_file(&cpd_path); + return Err(VolumeError::Io(io::Error::new( + io::ErrorKind::Interrupted, + "compaction interrupted", + ))); + } + + // Read needle from source + let mut n = Needle { + id, + ..Needle::default() + }; + self.read_needle_data_at(&mut n, offset.to_actual_offset(), size)?; + + // Skip TTL-expired needles + if n.has_ttl() { + if let Some(ref ttl) = n.ttl { + let ttl_minutes = ttl.minutes(); + if ttl_minutes > 0 && n.last_modified > 0 { + let expire_at = n.last_modified + (ttl_minutes as u64) * 60; + if now >= expire_at { + continue; + } + } + } + } + + // Write needle to destination + let bytes = n.write_bytes(version); + dst.write_all(&bytes)?; + + // Update new index + new_nm.put(id, Offset::from_actual_offset(new_offset), n.size)?; + new_offset += bytes.len() as i64; + } + + dst.sync_all()?; + + // Save new index + new_nm.save_to_idx(&cpx_path)?; + + Ok(()) + } + + /// Commit a previously completed compaction: swap .cpd/.cpx to .dat/.idx and reload. + pub fn commit_compact(&mut self) -> Result<(), VolumeError> { + // Close current files + if let Some(ref mut nm) = self.nm { + nm.close(); + } + self.nm = None; + if let Some(ref dat_file) = self.dat_file { + let _ = dat_file.sync_all(); + } + self.dat_file = None; + + let cpd_path = self.file_name(".cpd"); + let cpx_path = self.file_name(".cpx"); + let dat_path = self.file_name(".dat"); + let idx_path = self.file_name(".idx"); + + // Check that compact files exist + if !Path::new(&cpd_path).exists() || !Path::new(&cpx_path).exists() { + return Err(VolumeError::Io(io::Error::new( + io::ErrorKind::NotFound, + "compact files (.cpd/.cpx) not found", + ))); + } + + // Swap files: .cpd → .dat, .cpx → .idx + fs::rename(&cpd_path, &dat_path)?; + fs::rename(&cpx_path, &idx_path)?; + + // Remove any leveldb files + let ldb_path = self.file_name(".ldb"); + let _ = fs::remove_dir_all(&ldb_path); + + // Reload + self.load(true, false, 0, self.version())?; + + Ok(()) + } + + /// Clean up leftover compaction files (.cpd, .cpx). + pub fn cleanup_compact(&self) -> Result<(), VolumeError> { + let cpd_path = self.file_name(".cpd"); + let cpx_path = self.file_name(".cpx"); + let cpldb_path = self.file_name(".cpldb"); + + let e1 = fs::remove_file(&cpd_path); + let e2 = fs::remove_file(&cpx_path); + let e3 = fs::remove_dir_all(&cpldb_path); + + // Ignore NotFound errors + if let Err(e) = e1 { + if e.kind() != io::ErrorKind::NotFound { + return Err(e.into()); + } + } + if let Err(e) = e2 { + if e.kind() != io::ErrorKind::NotFound { + return Err(e.into()); + } + } + if let Err(e) = e3 { + if e.kind() != io::ErrorKind::NotFound { + return Err(e.into()); + } + } + + Ok(()) + } + // ---- Sync / Close ---- pub fn sync_to_disk(&mut self) -> io::Result<()> { @@ -1213,4 +1402,74 @@ mod tests { let t3 = get_append_at_ns(future); assert_eq!(t3, future + 1); } + + #[test] + fn test_volume_compact() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let mut v = make_test_volume(dir); + + // Write 3 needles + for i in 1..=3u64 { + let mut n = Needle { + id: NeedleId(i), + cookie: Cookie(i as u32), + data: format!("data-{}", i).into_bytes(), + data_size: format!("data-{}", i).len() as u32, + ..Needle::default() + }; + v.write_needle(&mut n, true).unwrap(); + } + assert_eq!(v.file_count(), 3); + + // Delete needle 2 + let mut del = Needle { + id: NeedleId(2), + cookie: Cookie(2), + ..Needle::default() + }; + v.delete_needle(&mut del).unwrap(); + assert_eq!(v.file_count(), 2); + assert_eq!(v.deleted_count(), 1); + + let dat_size_before = v.dat_file_size().unwrap(); + + // Compact + v.compact_by_index(0, 0, |_| true).unwrap(); + + // Verify compact files exist + assert!(Path::new(&v.file_name(".cpd")).exists()); + assert!(Path::new(&v.file_name(".cpx")).exists()); + + // Commit: swap files and reload + v.commit_compact().unwrap(); + + // After compaction: 2 live needles, 0 deleted + assert_eq!(v.file_count(), 2); + assert_eq!(v.deleted_count(), 0); + + // Dat should be smaller (deleted needle removed) + let dat_size_after = v.dat_file_size().unwrap(); + assert!(dat_size_after < dat_size_before, "dat should shrink after compact"); + + // Read back live needles + let mut n1 = Needle { id: NeedleId(1), ..Needle::default() }; + v.read_needle(&mut n1).unwrap(); + assert_eq!(n1.data, b"data-1"); + + let mut n3 = Needle { id: NeedleId(3), ..Needle::default() }; + v.read_needle(&mut n3).unwrap(); + assert_eq!(n3.data, b"data-3"); + + // Needle 2 should not exist + let mut n2 = Needle { id: NeedleId(2), ..Needle::default() }; + assert!(v.read_needle(&mut n2).is_err()); + + // Compact files should not exist after commit + assert!(!Path::new(&v.file_name(".cpd")).exists()); + assert!(!Path::new(&v.file_name(".cpx")).exists()); + + // Cleanup should be a no-op + v.cleanup_compact().unwrap(); + } }