Browse Source

feat(ec): implement Reed-Solomon shard reconstruction

rust-volume-server
Chris Lu 2 days ago
parent
commit
adb18b77fb
  1. 14
      seaweed-volume/src/server/grpc_server.rs
  2. 86
      seaweed-volume/src/storage/erasure_coding/ec_encoder.rs

14
seaweed-volume/src/server/grpc_server.rs

@ -1359,7 +1359,9 @@ impl VolumeServer for VolumeGrpcService {
store.locations[loc_idx].directory.clone() store.locations[loc_idx].directory.clone()
}; };
crate::storage::erasure_coding::ec_encoder::write_ec_files(&dir, collection, vid)
let (data_shards, parity_shards) = crate::storage::erasure_coding::ec_volume::read_ec_shard_config(&dir, vid);
crate::storage::erasure_coding::ec_encoder::write_ec_files(&dir, collection, vid, data_shards as usize, parity_shards as usize)
.map_err(|e| Status::internal(e.to_string()))?; .map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(volume_server_pb::VolumeEcShardsGenerateResponse {})) Ok(Response::new(volume_server_pb::VolumeEcShardsGenerateResponse {}))
@ -1389,9 +1391,11 @@ impl VolumeServer for VolumeGrpcService {
}; };
// Check which shards are missing // Check which shards are missing
use crate::storage::erasure_coding::ec_shard::TOTAL_SHARDS_COUNT;
let (data_shards, parity_shards) = crate::storage::erasure_coding::ec_volume::read_ec_shard_config(&dir, vid);
let total_shards = data_shards + parity_shards;
let mut missing: Vec<u32> = Vec::new(); let mut missing: Vec<u32> = Vec::new();
for shard_id in 0..TOTAL_SHARDS_COUNT as u8 {
for shard_id in 0..total_shards as u8 {
let shard = crate::storage::erasure_coding::ec_shard::EcVolumeShard::new(&dir, collection, vid, shard_id); let shard = crate::storage::erasure_coding::ec_shard::EcVolumeShard::new(&dir, collection, vid, shard_id);
if !std::path::Path::new(&shard.file_name()).exists() { if !std::path::Path::new(&shard.file_name()).exists() {
missing.push(shard_id as u32); missing.push(shard_id as u32);
@ -1404,8 +1408,8 @@ impl VolumeServer for VolumeGrpcService {
})); }));
} }
// Rebuild missing shards by regenerating all EC files
crate::storage::erasure_coding::ec_encoder::write_ec_files(&dir, collection, vid)
// Rebuild missing shards by regenerating all missing EC files via Reed-Solomon reconstruct
crate::storage::erasure_coding::ec_encoder::rebuild_ec_files(&dir, collection, vid, &missing, data_shards as usize, parity_shards as usize)
.map_err(|e| Status::internal(format!("rebuild ec shards: {}", e)))?; .map_err(|e| Status::internal(format!("rebuild ec shards: {}", e)))?;
Ok(Response::new(volume_server_pb::VolumeEcShardsRebuildResponse { Ok(Response::new(volume_server_pb::VolumeEcShardsRebuildResponse {

86
seaweed-volume/src/storage/erasure_coding/ec_encoder.rs

@ -62,6 +62,92 @@ pub fn write_ec_files(
Ok(()) Ok(())
} }
/// Rebuild missing shards by reconstructing them using Reed-Solomon.
pub fn rebuild_ec_files(
dir: &str,
collection: &str,
vid: VolumeId,
missing_shards: &[u32],
data_shards: usize,
parity_shards: usize,
) -> io::Result<()> {
let rs = ReedSolomon::new(data_shards, parity_shards).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e))
})?;
let total_shards = data_shards + parity_shards;
let mut shards: Vec<EcVolumeShard> = (0..total_shards as u8)
.map(|i| EcVolumeShard::new(dir, collection, vid, i))
.collect();
let mut shard_size = 0;
for shard in &mut shards {
if shard.open().is_ok() {
shard_size = shard.file_size();
break;
}
}
if shard_size == 0 {
return Err(io::Error::new(io::ErrorKind::NotFound, "no valid EC shards found to rebuild from"));
}
// Open/Create all shards
for (i, shard) in shards.iter_mut().enumerate() {
if missing_shards.contains(&(i as u32)) {
let path = shard.file_name();
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)?;
shard.set_file(file);
} else {
shard.open()?;
}
}
let block_size = ERASURE_CODING_SMALL_BLOCK_SIZE;
let mut remaining = shard_size;
let mut offset: u64 = 0;
while remaining > 0 {
let to_process = remaining.min(block_size as i64) as usize;
let mut buffers: Vec<Option<Vec<u8>>> = vec![None; total_shards];
for i in 0..total_shards {
if !missing_shards.contains(&(i as u32)) {
let mut buf = vec![0u8; to_process];
shards[i].read_at(&mut buf, offset)?;
buffers[i] = Some(buf);
}
}
rs.reconstruct(&mut buffers).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("rs reconstruct: {:?}", e))
})?;
for i in 0..total_shards {
if missing_shards.contains(&(i as u32)) {
if let Some(ref buf) = buffers[i] {
shards[i].write_at(buf, offset)?;
}
}
}
offset += to_process as u64;
remaining -= to_process as i64;
}
// Close all
for shard in &mut shards {
shard.close();
}
Ok(())
}
/// Write sorted .ecx index from .idx file. /// Write sorted .ecx index from .idx file.
fn write_sorted_ecx_from_idx(idx_path: &str, ecx_path: &str) -> io::Result<()> { fn write_sorted_ecx_from_idx(idx_path: &str, ecx_path: &str) -> io::Result<()> {
if !std::path::Path::new(idx_path).exists() { if !std::path::Path::new(idx_path).exists() {

Loading…
Cancel
Save