diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index d677be1db..9d5da6d53 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -1359,7 +1359,9 @@ impl VolumeServer for VolumeGrpcService { store.locations[loc_idx].directory.clone() }; - crate::storage::erasure_coding::ec_encoder::write_ec_files(&dir, collection, vid) + let (data_shards, parity_shards) = crate::storage::erasure_coding::ec_volume::read_ec_shard_config(&dir, vid); + + crate::storage::erasure_coding::ec_encoder::write_ec_files(&dir, collection, vid, data_shards as usize, parity_shards as usize) .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(volume_server_pb::VolumeEcShardsGenerateResponse {})) @@ -1389,9 +1391,11 @@ impl VolumeServer for VolumeGrpcService { }; // Check which shards are missing - use crate::storage::erasure_coding::ec_shard::TOTAL_SHARDS_COUNT; + let (data_shards, parity_shards) = crate::storage::erasure_coding::ec_volume::read_ec_shard_config(&dir, vid); + let total_shards = data_shards + parity_shards; + let mut missing: Vec = Vec::new(); - for shard_id in 0..TOTAL_SHARDS_COUNT as u8 { + for shard_id in 0..total_shards as u8 { let shard = crate::storage::erasure_coding::ec_shard::EcVolumeShard::new(&dir, collection, vid, shard_id); if !std::path::Path::new(&shard.file_name()).exists() { missing.push(shard_id as u32); @@ -1404,8 +1408,8 @@ impl VolumeServer for VolumeGrpcService { })); } - // Rebuild missing shards by regenerating all EC files - crate::storage::erasure_coding::ec_encoder::write_ec_files(&dir, collection, vid) + // Rebuild missing shards by regenerating all missing EC files via Reed-Solomon reconstruct + crate::storage::erasure_coding::ec_encoder::rebuild_ec_files(&dir, collection, vid, &missing, data_shards as usize, parity_shards as usize) .map_err(|e| Status::internal(format!("rebuild ec shards: {}", e)))?; Ok(Response::new(volume_server_pb::VolumeEcShardsRebuildResponse { diff --git a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs index a6fafe7e1..93c785b2f 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs @@ -62,6 +62,92 @@ pub fn write_ec_files( Ok(()) } +/// Rebuild missing shards by reconstructing them using Reed-Solomon. +pub fn rebuild_ec_files( + dir: &str, + collection: &str, + vid: VolumeId, + missing_shards: &[u32], + data_shards: usize, + parity_shards: usize, +) -> io::Result<()> { + let rs = ReedSolomon::new(data_shards, parity_shards).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e)) + })?; + + let total_shards = data_shards + parity_shards; + let mut shards: Vec = (0..total_shards as u8) + .map(|i| EcVolumeShard::new(dir, collection, vid, i)) + .collect(); + + let mut shard_size = 0; + for shard in &mut shards { + if shard.open().is_ok() { + shard_size = shard.file_size(); + break; + } + } + + if shard_size == 0 { + return Err(io::Error::new(io::ErrorKind::NotFound, "no valid EC shards found to rebuild from")); + } + + // Open/Create all shards + for (i, shard) in shards.iter_mut().enumerate() { + if missing_shards.contains(&(i as u32)) { + let path = shard.file_name(); + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(&path)?; + shard.set_file(file); + } else { + shard.open()?; + } + } + + let block_size = ERASURE_CODING_SMALL_BLOCK_SIZE; + let mut remaining = shard_size; + let mut offset: u64 = 0; + + while remaining > 0 { + let to_process = remaining.min(block_size as i64) as usize; + let mut buffers: Vec>> = vec![None; total_shards]; + + for i in 0..total_shards { + if !missing_shards.contains(&(i as u32)) { + let mut buf = vec![0u8; to_process]; + shards[i].read_at(&mut buf, offset)?; + buffers[i] = Some(buf); + } + } + + rs.reconstruct(&mut buffers).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("rs reconstruct: {:?}", e)) + })?; + + for i in 0..total_shards { + if missing_shards.contains(&(i as u32)) { + if let Some(ref buf) = buffers[i] { + shards[i].write_at(buf, offset)?; + } + } + } + + offset += to_process as u64; + remaining -= to_process as i64; + } + + // Close all + for shard in &mut shards { + shard.close(); + } + + Ok(()) +} + /// Write sorted .ecx index from .idx file. fn write_sorted_ecx_from_idx(idx_path: &str, ecx_path: &str) -> io::Result<()> { if !std::path::Path::new(idx_path).exists() {