Browse Source

implement vacuum volume compaction (compact/commit/cleanup)

Adds full volume compaction support:
- compact_by_index: iterates live needles, writes to new .cpd/.cpx
  files, skipping deleted and TTL-expired entries
- commit_compact: swaps .cpd->.dat and .cpx->.idx, reloads volume
- cleanup_compact: removes leftover .cpd/.cpx files

Wires VacuumVolumeCompact (streaming progress), VacuumVolumeCommit,
and VacuumVolumeCleanup gRPC RPCs. Also adds save_to_idx and
ascending_visit methods to CompactNeedleMap.

Includes unit test verifying compact removes deleted needles and
preserves live data through the full compact/commit cycle.
rust-volume-server
Chris Lu 3 days ago
parent
commit
e63c34eb6d
  1. 64
      seaweed-volume/src/server/grpc_server.rs
  2. 33
      seaweed-volume/src/storage/needle_map.rs
  3. 53
      seaweed-volume/src/storage/store.rs
  4. 259
      seaweed-volume/src/storage/volume.rs

64
seaweed-volume/src/server/grpc_server.rs

@ -158,26 +158,78 @@ impl VolumeServer for VolumeGrpcService {
type VacuumVolumeCompactStream = BoxStream<volume_server_pb::VacuumVolumeCompactResponse>;
async fn vacuum_volume_compact(
&self,
_request: Request<volume_server_pb::VacuumVolumeCompactRequest>,
request: Request<volume_server_pb::VacuumVolumeCompactRequest>,
) -> Result<Response<Self::VacuumVolumeCompactStream>, Status> {
self.state.check_maintenance()?;
Err(Status::unimplemented("vacuum_volume_compact not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let preallocate = req.preallocate as u64;
let state = self.state.clone();
let (tx, rx) = tokio::sync::mpsc::channel(16);
tokio::task::spawn_blocking(move || {
let report_interval: i64 = 128 * 1024 * 1024;
let next_report = std::sync::atomic::AtomicI64::new(report_interval);
let tx_clone = tx.clone();
let result = {
let mut store = state.store.write().unwrap();
store.compact_volume(vid, preallocate, 0, |processed| {
let target = next_report.load(std::sync::atomic::Ordering::Relaxed);
if processed > target {
let resp = volume_server_pb::VacuumVolumeCompactResponse {
processed_bytes: processed,
load_avg_1m: 0.0,
};
let _ = tx_clone.blocking_send(Ok(resp));
next_report.store(
processed + report_interval,
std::sync::atomic::Ordering::Relaxed,
);
}
true
})
};
if let Err(e) = result {
let _ = tx.blocking_send(Err(Status::internal(e)));
}
});
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream) as Self::VacuumVolumeCompactStream))
}
async fn vacuum_volume_commit(
&self,
_request: Request<volume_server_pb::VacuumVolumeCommitRequest>,
request: Request<volume_server_pb::VacuumVolumeCommitRequest>,
) -> Result<Response<volume_server_pb::VacuumVolumeCommitResponse>, Status> {
self.state.check_maintenance()?;
Err(Status::unimplemented("vacuum_volume_commit not yet implemented"))
let vid = VolumeId(request.into_inner().volume_id);
let mut store = self.state.store.write().unwrap();
match store.commit_compact_volume(vid) {
Ok((is_read_only, volume_size)) => {
Ok(Response::new(volume_server_pb::VacuumVolumeCommitResponse {
is_read_only,
volume_size,
}))
}
Err(e) => Err(Status::internal(e)),
}
}
async fn vacuum_volume_cleanup(
&self,
_request: Request<volume_server_pb::VacuumVolumeCleanupRequest>,
request: Request<volume_server_pb::VacuumVolumeCleanupRequest>,
) -> Result<Response<volume_server_pb::VacuumVolumeCleanupResponse>, Status> {
self.state.check_maintenance()?;
Err(Status::unimplemented("vacuum_volume_cleanup not yet implemented"))
let vid = VolumeId(request.into_inner().volume_id);
let mut store = self.state.store.write().unwrap();
match store.cleanup_compact_volume(vid) {
Ok(()) => Ok(Response::new(volume_server_pb::VacuumVolumeCleanupResponse {})),
Err(e) => Err(Status::internal(e)),
}
}
async fn delete_collection(

33
seaweed-volume/src/storage/needle_map.rs

@ -245,6 +245,39 @@ impl CompactNeedleMap {
let _ = self.sync();
self.idx_file = None;
}
/// Save the in-memory map to an index file, sorted by needle ID ascending.
pub fn save_to_idx(&self, path: &str) -> io::Result<()> {
let mut entries: Vec<_> = self.map.iter()
.filter(|(_, nv)| nv.size.is_valid())
.collect();
entries.sort_by_key(|(id, _)| **id);
let mut file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)?;
for (id, nv) in entries {
idx::write_index_entry(&mut file, *id, nv.offset, nv.size)?;
}
file.sync_all()?;
Ok(())
}
/// Visit all live entries in ascending order by needle ID.
pub fn ascending_visit<F>(&self, mut f: F) -> Result<(), String>
where
F: FnMut(NeedleId, &NeedleValue) -> Result<(), String>,
{
let mut entries: Vec<_> = self.map.iter().collect();
entries.sort_by_key(|(id, _)| **id);
for (&id, nv) in entries {
f(id, nv)?;
}
Ok(())
}
}
#[cfg(test)]

53
seaweed-volume/src/storage/store.rs

@ -381,6 +381,59 @@ impl Store {
None
}
// ---- Vacuum / Compaction ----
/// Check the garbage level of a volume.
pub fn check_compact_volume(&self, vid: VolumeId) -> Result<f64, String> {
if let Some((_, v)) = self.find_volume(vid) {
Ok(v.garbage_level())
} else {
Err(format!("volume id {} is not found during check compact", vid.0))
}
}
/// Compact a volume by rewriting only live needles.
pub fn compact_volume<F>(
&mut self,
vid: VolumeId,
preallocate: u64,
max_bytes_per_second: i64,
progress_fn: F,
) -> Result<(), String>
where
F: Fn(i64) -> bool,
{
if let Some((_, v)) = self.find_volume_mut(vid) {
v.compact_by_index(preallocate, max_bytes_per_second, progress_fn)
.map_err(|e| format!("compact volume {}: {}", vid.0, e))
} else {
Err(format!("volume id {} is not found during compact", vid.0))
}
}
/// Commit a completed compaction: swap files and reload.
pub fn commit_compact_volume(&mut self, vid: VolumeId) -> Result<(bool, u64), String> {
if let Some((_, v)) = self.find_volume_mut(vid) {
let is_read_only = v.is_read_only();
v.commit_compact()
.map_err(|e| format!("commit compact volume {}: {}", vid.0, e))?;
let volume_size = v.dat_file_size().unwrap_or(0);
Ok((is_read_only, volume_size))
} else {
Err(format!("volume id {} is not found during commit compact", vid.0))
}
}
/// Clean up leftover compaction files.
pub fn cleanup_compact_volume(&mut self, vid: VolumeId) -> Result<(), String> {
if let Some((_, v)) = self.find_volume_mut(vid) {
v.cleanup_compact()
.map_err(|e| format!("cleanup volume {}: {}", vid.0, e))
} else {
Err(format!("volume id {} is not found during cleaning up", vid.0))
}
}
/// Close all locations and their volumes.
pub fn close(&mut self) {
for loc in &mut self.locations {

259
seaweed-volume/src/storage/volume.rs

@ -815,6 +815,195 @@ impl Volume {
self.nm.as_ref().map_or(0, |nm| nm.index_file_size())
}
// ---- Compaction / Vacuum ----
/// Compact the volume by copying only live needles to new .cpd/.cpx files.
/// This reads from the current .dat/.idx and writes to .cpd/.cpx.
/// Call `commit_compact()` after to swap the files.
pub fn compact_by_index<F>(
&mut self,
_preallocate: u64,
_max_bytes_per_second: i64,
progress_fn: F,
) -> Result<(), VolumeError>
where
F: Fn(i64) -> bool,
{
if self.is_compacting {
return Ok(()); // already compacting
}
self.is_compacting = true;
let result = self.do_compact_by_index(progress_fn);
self.is_compacting = false;
result
}
fn do_compact_by_index<F>(&mut self, progress_fn: F) -> Result<(), VolumeError>
where
F: Fn(i64) -> bool,
{
// Record state before compaction for makeupDiff
self.last_compact_index_offset = self.nm.as_ref().map_or(0, |nm| nm.index_file_size());
self.last_compact_revision = self.super_block.compaction_revision;
// Sync current data
self.sync_to_disk()?;
let cpd_path = self.file_name(".cpd");
let cpx_path = self.file_name(".cpx");
let version = self.version();
// Write new super block with incremented compaction revision
let mut new_sb = self.super_block.clone();
new_sb.compaction_revision += 1;
let sb_bytes = new_sb.to_bytes();
let mut dst = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&cpd_path)?;
dst.write_all(&sb_bytes)?;
let mut new_offset = sb_bytes.len() as i64;
// Build new index in memory
let mut new_nm = CompactNeedleMap::new();
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
// Collect live entries from needle map (sorted ascending)
let nm = self.nm.as_ref().ok_or(VolumeError::NotInitialized)?;
let mut entries: Vec<(NeedleId, Offset, Size)> = Vec::new();
for (&id, nv) in nm.iter() {
if nv.offset.is_zero() || nv.size.is_deleted() {
continue;
}
entries.push((id, nv.offset, nv.size));
}
entries.sort_by_key(|(id, _, _)| *id);
for (id, offset, size) in entries {
// Progress callback
if !progress_fn(offset.to_actual_offset()) {
// Interrupted
let _ = fs::remove_file(&cpd_path);
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::Interrupted,
"compaction interrupted",
)));
}
// Read needle from source
let mut n = Needle {
id,
..Needle::default()
};
self.read_needle_data_at(&mut n, offset.to_actual_offset(), size)?;
// Skip TTL-expired needles
if n.has_ttl() {
if let Some(ref ttl) = n.ttl {
let ttl_minutes = ttl.minutes();
if ttl_minutes > 0 && n.last_modified > 0 {
let expire_at = n.last_modified + (ttl_minutes as u64) * 60;
if now >= expire_at {
continue;
}
}
}
}
// Write needle to destination
let bytes = n.write_bytes(version);
dst.write_all(&bytes)?;
// Update new index
new_nm.put(id, Offset::from_actual_offset(new_offset), n.size)?;
new_offset += bytes.len() as i64;
}
dst.sync_all()?;
// Save new index
new_nm.save_to_idx(&cpx_path)?;
Ok(())
}
/// Commit a previously completed compaction: swap .cpd/.cpx to .dat/.idx and reload.
pub fn commit_compact(&mut self) -> Result<(), VolumeError> {
// Close current files
if let Some(ref mut nm) = self.nm {
nm.close();
}
self.nm = None;
if let Some(ref dat_file) = self.dat_file {
let _ = dat_file.sync_all();
}
self.dat_file = None;
let cpd_path = self.file_name(".cpd");
let cpx_path = self.file_name(".cpx");
let dat_path = self.file_name(".dat");
let idx_path = self.file_name(".idx");
// Check that compact files exist
if !Path::new(&cpd_path).exists() || !Path::new(&cpx_path).exists() {
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::NotFound,
"compact files (.cpd/.cpx) not found",
)));
}
// Swap files: .cpd → .dat, .cpx → .idx
fs::rename(&cpd_path, &dat_path)?;
fs::rename(&cpx_path, &idx_path)?;
// Remove any leveldb files
let ldb_path = self.file_name(".ldb");
let _ = fs::remove_dir_all(&ldb_path);
// Reload
self.load(true, false, 0, self.version())?;
Ok(())
}
/// Clean up leftover compaction files (.cpd, .cpx).
pub fn cleanup_compact(&self) -> Result<(), VolumeError> {
let cpd_path = self.file_name(".cpd");
let cpx_path = self.file_name(".cpx");
let cpldb_path = self.file_name(".cpldb");
let e1 = fs::remove_file(&cpd_path);
let e2 = fs::remove_file(&cpx_path);
let e3 = fs::remove_dir_all(&cpldb_path);
// Ignore NotFound errors
if let Err(e) = e1 {
if e.kind() != io::ErrorKind::NotFound {
return Err(e.into());
}
}
if let Err(e) = e2 {
if e.kind() != io::ErrorKind::NotFound {
return Err(e.into());
}
}
if let Err(e) = e3 {
if e.kind() != io::ErrorKind::NotFound {
return Err(e.into());
}
}
Ok(())
}
// ---- Sync / Close ----
pub fn sync_to_disk(&mut self) -> io::Result<()> {
@ -1213,4 +1402,74 @@ mod tests {
let t3 = get_append_at_ns(future);
assert_eq!(t3, future + 1);
}
#[test]
fn test_volume_compact() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut v = make_test_volume(dir);
// Write 3 needles
for i in 1..=3u64 {
let mut n = Needle {
id: NeedleId(i),
cookie: Cookie(i as u32),
data: format!("data-{}", i).into_bytes(),
data_size: format!("data-{}", i).len() as u32,
..Needle::default()
};
v.write_needle(&mut n, true).unwrap();
}
assert_eq!(v.file_count(), 3);
// Delete needle 2
let mut del = Needle {
id: NeedleId(2),
cookie: Cookie(2),
..Needle::default()
};
v.delete_needle(&mut del).unwrap();
assert_eq!(v.file_count(), 2);
assert_eq!(v.deleted_count(), 1);
let dat_size_before = v.dat_file_size().unwrap();
// Compact
v.compact_by_index(0, 0, |_| true).unwrap();
// Verify compact files exist
assert!(Path::new(&v.file_name(".cpd")).exists());
assert!(Path::new(&v.file_name(".cpx")).exists());
// Commit: swap files and reload
v.commit_compact().unwrap();
// After compaction: 2 live needles, 0 deleted
assert_eq!(v.file_count(), 2);
assert_eq!(v.deleted_count(), 0);
// Dat should be smaller (deleted needle removed)
let dat_size_after = v.dat_file_size().unwrap();
assert!(dat_size_after < dat_size_before, "dat should shrink after compact");
// Read back live needles
let mut n1 = Needle { id: NeedleId(1), ..Needle::default() };
v.read_needle(&mut n1).unwrap();
assert_eq!(n1.data, b"data-1");
let mut n3 = Needle { id: NeedleId(3), ..Needle::default() };
v.read_needle(&mut n3).unwrap();
assert_eq!(n3.data, b"data-3");
// Needle 2 should not exist
let mut n2 = Needle { id: NeedleId(2), ..Needle::default() };
assert!(v.read_needle(&mut n2).is_err());
// Compact files should not exist after commit
assert!(!Path::new(&v.file_name(".cpd")).exists());
assert!(!Path::new(&v.file_name(".cpx")).exists());
// Cleanup should be a no-op
v.cleanup_compact().unwrap();
}
}
Loading…
Cancel
Save