Browse Source

feat(ec): implement EC volume scrubbing and parity verification

rust-volume-server
Chris Lu 2 days ago
parent
commit
9ffab64b69
  1. 76
      seaweed-volume/src/server/grpc_server.rs
  2. 179
      seaweed-volume/src/storage/erasure_coding/ec_encoder.rs

76
seaweed-volume/src/server/grpc_server.rs

@ -2270,28 +2270,68 @@ impl VolumeServer for VolumeGrpcService {
_ => return Err(Status::invalid_argument(format!("unsupported EC volume scrub mode {}", mode))),
}
if req.volume_ids.is_empty() {
// Auto-select: no EC volumes exist in our implementation
return Ok(Response::new(volume_server_pb::ScrubEcVolumeResponse {
total_volumes: 0,
total_files: 0,
broken_volume_ids: vec![],
broken_shard_infos: vec![],
details: vec![],
}));
}
let store = self.state.store.read().unwrap();
let vids: Vec<VolumeId> = if req.volume_ids.is_empty() {
store.ec_volumes.keys().copied().collect()
} else {
req.volume_ids.iter().map(|&id| VolumeId(id)).collect()
};
// Specific volume IDs requested — EC volumes don't exist, so error
for &vid in &req.volume_ids {
return Err(Status::not_found(format!("EC volume id {} not found", vid)));
let mut total_volumes: u64 = 0;
let total_files: u64 = 0;
let mut broken_volume_ids: Vec<u32> = Vec::new();
let mut broken_shard_infos: Vec<volume_server_pb::EcShardInfo> = Vec::new();
let mut details: Vec<String> = Vec::new();
for vid in &vids {
let collection = {
if let Some(ecv) = store.ec_volumes.get(vid) {
ecv.collection.clone()
} else {
continue;
}
};
let dir = store.find_ec_dir(*vid, &collection).unwrap_or_else(|| String::from(""));
if dir.is_empty() {
continue;
}
total_volumes += 1;
let (data_shards, parity_shards) = crate::storage::erasure_coding::ec_volume::read_ec_shard_config(&dir, *vid);
match crate::storage::erasure_coding::ec_encoder::verify_ec_shards(
&dir, &collection, *vid, data_shards as usize, parity_shards as usize
) {
Ok((broken, msgs)) => {
if !broken.is_empty() {
broken_volume_ids.push(vid.0);
for b in broken {
broken_shard_infos.push(volume_server_pb::EcShardInfo {
volume_id: vid.0,
collection: collection.clone(),
shard_id: b,
..Default::default()
});
}
}
for msg in msgs {
details.push(format!("ecvol {}: {}", vid.0, msg));
}
}
Err(e) => {
broken_volume_ids.push(vid.0);
details.push(format!("ecvol {}: scrub error: {}", vid.0, e));
}
}
}
Ok(Response::new(volume_server_pb::ScrubEcVolumeResponse {
total_volumes: 0,
total_files: 0,
broken_volume_ids: vec![],
broken_shard_infos: vec![],
details: vec![],
total_volumes,
total_files,
broken_volume_ids,
broken_shard_infos,
details,
}))
}

179
seaweed-volume/src/storage/erasure_coding/ec_encoder.rs

@ -62,49 +62,59 @@ pub fn write_ec_files(
Ok(())
}
/// Rebuild missing shards by reconstructing them using Reed-Solomon.
/// Rebuild missing EC shard files from existing shards using Reed-Solomon reconstruct.
///
/// This does not require the `.dat` file, only the existing `.ecXX` shard files.
pub fn rebuild_ec_files(
dir: &str,
collection: &str,
vid: VolumeId,
missing_shards: &[u32],
volume_id: VolumeId,
missing_shard_ids: &[u32],
data_shards: usize,
parity_shards: usize,
) -> io::Result<()> {
if missing_shard_ids.is_empty() {
return Ok(());
}
let rs = ReedSolomon::new(data_shards, parity_shards).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e))
})?;
let total_shards = data_shards + parity_shards;
let mut shards: Vec<EcVolumeShard> = (0..total_shards as u8)
.map(|i| EcVolumeShard::new(dir, collection, vid, i))
.map(|i| EcVolumeShard::new(dir, collection, volume_id, i))
.collect();
// Determine the exact shard size from the first available existing shard
let mut shard_size = 0;
for shard in &mut shards {
if shard.open().is_ok() {
shard_size = shard.file_size();
break;
for (i, shard) in shards.iter_mut().enumerate() {
if !missing_shard_ids.contains(&(i as u32)) {
if let Ok(_) = shard.open() {
let size = shard.file_size();
if size > shard_size {
shard_size = size;
}
} else {
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!("missing non-rebuild shard {}", i),
));
}
}
}
if shard_size == 0 {
return Err(io::Error::new(io::ErrorKind::NotFound, "no valid EC shards found to rebuild from"));
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"all existing shards are empty or cannot find an existing shard to determine size",
));
}
// Open/Create all shards
for (i, shard) in shards.iter_mut().enumerate() {
if missing_shards.contains(&(i as u32)) {
let path = shard.file_name();
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)?;
shard.set_file(file);
} else {
shard.open()?;
// Create the missing shards for writing
for i in missing_shard_ids {
if let Some(shard) = shards.get_mut(*i as usize) {
shard.create()?;
}
}
@ -112,26 +122,130 @@ pub fn rebuild_ec_files(
let mut remaining = shard_size;
let mut offset: u64 = 0;
// Process all data in blocks
while remaining > 0 {
let to_process = remaining.min(block_size as i64) as usize;
// Allocate buffers for all shards. Option<Vec<u8>> is required by rs.reconstruct()
let mut buffers: Vec<Option<Vec<u8>>> = vec![None; total_shards];
for i in 0..total_shards {
if !missing_shards.contains(&(i as u32)) {
// Read available shards
for (i, shard) in shards.iter().enumerate() {
if !missing_shard_ids.contains(&(i as u32)) {
let mut buf = vec![0u8; to_process];
shards[i].read_at(&mut buf, offset)?;
shard.read_at(&mut buf, offset)?;
buffers[i] = Some(buf);
}
}
// Reconstruct missing shards
rs.reconstruct(&mut buffers).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("rs reconstruct: {:?}", e))
io::Error::new(io::ErrorKind::Other, format!("reed-solomon reconstruct: {:?}", e))
})?;
// Write recovered data into the missing shards
for i in missing_shard_ids {
let idx = *i as usize;
if let Some(buf) = buffers[idx].take() {
shards[idx].write_all(&buf)?;
}
}
offset += to_process as u64;
remaining -= to_process as i64;
}
// Close all shards
for shard in &mut shards {
shard.close();
}
Ok(())
}
/// Verify EC shards by computing parity against the existing data and identifying corrupted shards.
pub fn verify_ec_shards(
dir: &str,
collection: &str,
volume_id: VolumeId,
data_shards: usize,
parity_shards: usize,
) -> io::Result<(Vec<u32>, Vec<String>)> {
let rs = ReedSolomon::new(data_shards, parity_shards).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e))
})?;
let total_shards = data_shards + parity_shards;
let mut shards: Vec<EcVolumeShard> = (0..total_shards as u8)
.map(|i| EcVolumeShard::new(dir, collection, volume_id, i))
.collect();
let mut shard_size = 0;
let mut broken_shards = std::collections::HashSet::new();
let mut details = Vec::new();
for (i, shard) in shards.iter_mut().enumerate() {
if let Ok(_) = shard.open() {
let size = shard.file_size();
if size > shard_size {
shard_size = size;
}
} else {
broken_shards.insert(i as u32);
details.push(format!("failed to open or missing shard {}", i));
}
}
if shard_size == 0 || broken_shards.len() >= parity_shards {
// Can't do much if we don't know the size or have too many missing
return Ok((broken_shards.into_iter().collect(), details));
}
let block_size = ERASURE_CODING_SMALL_BLOCK_SIZE;
let mut remaining = shard_size;
let mut offset: u64 = 0;
while remaining > 0 {
let to_process = remaining.min(block_size as i64) as usize;
let mut buffers = vec![vec![0u8; to_process]; total_shards];
let mut read_failed = false;
for i in 0..total_shards {
if missing_shards.contains(&(i as u32)) {
if let Some(ref buf) = buffers[i] {
shards[i].write_at(buf, offset)?;
if !broken_shards.contains(&(i as u32)) {
if let Err(e) = shards[i].read_at(&mut buffers[i], offset) {
broken_shards.insert(i as u32);
details.push(format!("read error shard {}: {}", i, e));
read_failed = true;
}
} else {
read_failed = true;
}
}
// Only do verification if all shards were readable
if !read_failed {
// Need to convert Vec<Vec<u8>> to &[&[u8]] for rs.verify
let slice_ptrs: Vec<&[u8]> = buffers.iter().map(|v| v.as_slice()).collect();
if let Ok(is_valid) = rs.verify(&slice_ptrs) {
if !is_valid {
// Reed-Solomon verification failed. We cannot easily pinpoint which shard
// is corrupted without recalculating parities or syndromes, so we just
// log that this batch has corruption. Wait, we can test each parity shard!
// Let's re-encode from the first `data_shards` and compare to the actual `parity_shards`.
let mut verify_buffers = buffers.clone();
// Clear the parity parts
for i in data_shards..total_shards {
verify_buffers[i].fill(0);
}
if rs.encode(&mut verify_buffers).is_ok() {
for i in 0..total_shards {
if buffers[i] != verify_buffers[i] {
broken_shards.insert(i as u32);
details.push(format!("parity mismatch on shard {} at offset {}", i, offset));
}
}
}
}
}
}
@ -140,12 +254,15 @@ pub fn rebuild_ec_files(
remaining -= to_process as i64;
}
// Close all
// Close all shards
for shard in &mut shards {
shard.close();
}
Ok(())
let mut broken_vec: Vec<u32> = broken_shards.into_iter().collect();
broken_vec.sort_unstable();
Ok((broken_vec, details))
}
/// Write sorted .ecx index from .idx file.

Loading…
Cancel
Save