Browse Source

feat(storage): implement makeup_diff for safe concurrent compaction

rust-volume-server
Chris Lu 2 days ago
parent
commit
4c14bbe38b
  1. 122
      seaweed-volume/src/storage/volume.rs

122
seaweed-volume/src/storage/volume.rs

@ -1701,6 +1701,11 @@ impl Volume {
/// Commit a previously completed compaction: swap .cpd/.cpx to .dat/.idx and reload.
pub fn commit_compact(&mut self) -> Result<(), VolumeError> {
self.makeup_diff().map_err(|e| {
warn!("makeup_diff failed: {}", e);
e
})?;
// Close current files
if let Some(ref mut nm) = self.nm {
nm.close();
@ -1768,6 +1773,123 @@ impl Volume {
Ok(())
}
/// Read any new needles appended during compaction and append them to .cpd/.cpx
fn makeup_diff(&mut self) -> Result<(), VolumeError> {
let old_idx_size = self.nm.as_ref().map_or(0, |nm| nm.index_file_size());
if old_idx_size == 0 || old_idx_size <= self.last_compact_index_offset {
return Ok(());
}
let old_super_block = &self.super_block;
if old_super_block.compaction_revision != self.last_compact_revision {
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::Other,
format!(
"current old dat file's compact revision {} is not the expected one {}",
old_super_block.compaction_revision, self.last_compact_revision
),
)));
}
let old_idx_path = self.file_name(".idx");
let mut old_idx_file = File::open(&old_idx_path)?;
// Read new entries from .idx
let mut incremented_entries = std::collections::HashMap::new();
let offset = self.last_compact_index_offset;
old_idx_file.seek(SeekFrom::Start(offset))?;
let entry_count = (old_idx_size - offset) / NEEDLE_MAP_ENTRY_SIZE as u64;
for _ in 0..entry_count {
let mut buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
old_idx_file.read_exact(&mut buf)?;
let (key, needle_offset, size) = crate::storage::types::idx_entry_from_bytes(&buf);
incremented_entries.insert(key, (needle_offset, size));
}
if incremented_entries.is_empty() {
return Ok(());
}
let cpd_path = self.file_name(".cpd");
let cpx_path = self.file_name(".cpx");
let mut dst_dat = OpenOptions::new().read(true).write(true).open(&cpd_path)?;
let mut dst_idx = OpenOptions::new().write(true).append(true).open(&cpx_path)?;
let mut dat_offset = dst_dat.seek(SeekFrom::End(0))?;
let padding_rem = dat_offset % NEEDLE_PADDING_SIZE as u64;
if padding_rem != 0 {
dat_offset += NEEDLE_PADDING_SIZE as u64 - padding_rem;
dst_dat.seek(SeekFrom::Start(dat_offset))?;
}
let version = self.version();
let old_dat_path = self.file_name(".dat");
let old_dat_file = File::open(&old_dat_path)?;
for (key, (needle_offset, size)) in incremented_entries {
if !needle_offset.is_zero() && !size.is_deleted() && size.0 > 0 {
let actual_size = crate::storage::needle::needle::get_actual_size(size, version);
let mut blob = vec![0u8; actual_size as usize];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
old_dat_file.read_exact_at(&mut blob, needle_offset.to_actual_offset() as u64)?;
}
#[cfg(windows)]
{
crate::storage::volume::read_exact_at(&old_dat_file, &mut blob, needle_offset.to_actual_offset() as u64)?;
}
dst_dat.write_all(&blob)?;
let mut idx_entry_buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
crate::storage::types::idx_entry_to_bytes(
&mut idx_entry_buf,
key,
Offset::from_actual_offset(dat_offset as i64),
size,
);
dst_idx.write_all(&idx_entry_buf)?;
dat_offset += actual_size as u64;
} else {
let mut fake_del_needle = Needle {
id: key,
cookie: Cookie(0x12345678),
append_at_ns: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64,
..Needle::default()
};
let bytes = fake_del_needle.write_bytes(version);
dst_dat.write_all(&bytes)?;
let mut idx_entry_buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
crate::storage::types::idx_entry_to_bytes(
&mut idx_entry_buf,
key,
Offset::from_actual_offset(0),
Size(crate::storage::types::TOMBSTONE_FILE_SIZE.into()),
);
dst_idx.write_all(&idx_entry_buf)?;
dat_offset += bytes.len() as u64;
}
let padding = NEEDLE_PADDING_SIZE as u64 - (dat_offset % NEEDLE_PADDING_SIZE as u64);
if padding != NEEDLE_PADDING_SIZE as u64 {
dat_offset += padding;
dst_dat.seek(SeekFrom::Start(dat_offset))?;
}
}
dst_dat.sync_all()?;
dst_idx.sync_all()?;
Ok(())
}
// ---- Sync / Close ----
pub fn sync_to_disk(&mut self) -> io::Result<()> {

Loading…
Cancel
Save