diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 9d5da6d53..322d35883 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -2270,28 +2270,68 @@ impl VolumeServer for VolumeGrpcService { _ => return Err(Status::invalid_argument(format!("unsupported EC volume scrub mode {}", mode))), } - if req.volume_ids.is_empty() { - // Auto-select: no EC volumes exist in our implementation - return Ok(Response::new(volume_server_pb::ScrubEcVolumeResponse { - total_volumes: 0, - total_files: 0, - broken_volume_ids: vec![], - broken_shard_infos: vec![], - details: vec![], - })); - } + let store = self.state.store.read().unwrap(); + let vids: Vec = if req.volume_ids.is_empty() { + store.ec_volumes.keys().copied().collect() + } else { + req.volume_ids.iter().map(|&id| VolumeId(id)).collect() + }; - // Specific volume IDs requested — EC volumes don't exist, so error - for &vid in &req.volume_ids { - return Err(Status::not_found(format!("EC volume id {} not found", vid))); + let mut total_volumes: u64 = 0; + let total_files: u64 = 0; + let mut broken_volume_ids: Vec = Vec::new(); + let mut broken_shard_infos: Vec = Vec::new(); + let mut details: Vec = Vec::new(); + + for vid in &vids { + let collection = { + if let Some(ecv) = store.ec_volumes.get(vid) { + ecv.collection.clone() + } else { + continue; + } + }; + + let dir = store.find_ec_dir(*vid, &collection).unwrap_or_else(|| String::from("")); + if dir.is_empty() { + continue; + } + + total_volumes += 1; + let (data_shards, parity_shards) = crate::storage::erasure_coding::ec_volume::read_ec_shard_config(&dir, *vid); + + match crate::storage::erasure_coding::ec_encoder::verify_ec_shards( + &dir, &collection, *vid, data_shards as usize, parity_shards as usize + ) { + Ok((broken, msgs)) => { + if !broken.is_empty() { + broken_volume_ids.push(vid.0); + for b in broken { + broken_shard_infos.push(volume_server_pb::EcShardInfo { + volume_id: vid.0, + collection: collection.clone(), + shard_id: b, + ..Default::default() + }); + } + } + for msg in msgs { + details.push(format!("ecvol {}: {}", vid.0, msg)); + } + } + Err(e) => { + broken_volume_ids.push(vid.0); + details.push(format!("ecvol {}: scrub error: {}", vid.0, e)); + } + } } Ok(Response::new(volume_server_pb::ScrubEcVolumeResponse { - total_volumes: 0, - total_files: 0, - broken_volume_ids: vec![], - broken_shard_infos: vec![], - details: vec![], + total_volumes, + total_files, + broken_volume_ids, + broken_shard_infos, + details, })) } diff --git a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs index 93c785b2f..3a19158cc 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs @@ -62,49 +62,59 @@ pub fn write_ec_files( Ok(()) } -/// Rebuild missing shards by reconstructing them using Reed-Solomon. +/// Rebuild missing EC shard files from existing shards using Reed-Solomon reconstruct. +/// +/// This does not require the `.dat` file, only the existing `.ecXX` shard files. pub fn rebuild_ec_files( dir: &str, collection: &str, - vid: VolumeId, - missing_shards: &[u32], + volume_id: VolumeId, + missing_shard_ids: &[u32], data_shards: usize, parity_shards: usize, ) -> io::Result<()> { + if missing_shard_ids.is_empty() { + return Ok(()); + } + let rs = ReedSolomon::new(data_shards, parity_shards).map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e)) })?; let total_shards = data_shards + parity_shards; let mut shards: Vec = (0..total_shards as u8) - .map(|i| EcVolumeShard::new(dir, collection, vid, i)) + .map(|i| EcVolumeShard::new(dir, collection, volume_id, i)) .collect(); + // Determine the exact shard size from the first available existing shard let mut shard_size = 0; - for shard in &mut shards { - if shard.open().is_ok() { - shard_size = shard.file_size(); - break; + for (i, shard) in shards.iter_mut().enumerate() { + if !missing_shard_ids.contains(&(i as u32)) { + if let Ok(_) = shard.open() { + let size = shard.file_size(); + if size > shard_size { + shard_size = size; + } + } else { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("missing non-rebuild shard {}", i), + )); + } } } if shard_size == 0 { - return Err(io::Error::new(io::ErrorKind::NotFound, "no valid EC shards found to rebuild from")); + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "all existing shards are empty or cannot find an existing shard to determine size", + )); } - // Open/Create all shards - for (i, shard) in shards.iter_mut().enumerate() { - if missing_shards.contains(&(i as u32)) { - let path = shard.file_name(); - let file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .truncate(true) - .open(&path)?; - shard.set_file(file); - } else { - shard.open()?; + // Create the missing shards for writing + for i in missing_shard_ids { + if let Some(shard) = shards.get_mut(*i as usize) { + shard.create()?; } } @@ -112,26 +122,130 @@ pub fn rebuild_ec_files( let mut remaining = shard_size; let mut offset: u64 = 0; + // Process all data in blocks while remaining > 0 { let to_process = remaining.min(block_size as i64) as usize; + + // Allocate buffers for all shards. Option> is required by rs.reconstruct() let mut buffers: Vec>> = vec![None; total_shards]; - for i in 0..total_shards { - if !missing_shards.contains(&(i as u32)) { + // Read available shards + for (i, shard) in shards.iter().enumerate() { + if !missing_shard_ids.contains(&(i as u32)) { let mut buf = vec![0u8; to_process]; - shards[i].read_at(&mut buf, offset)?; + shard.read_at(&mut buf, offset)?; buffers[i] = Some(buf); } } + // Reconstruct missing shards rs.reconstruct(&mut buffers).map_err(|e| { - io::Error::new(io::ErrorKind::Other, format!("rs reconstruct: {:?}", e)) + io::Error::new(io::ErrorKind::Other, format!("reed-solomon reconstruct: {:?}", e)) })?; + // Write recovered data into the missing shards + for i in missing_shard_ids { + let idx = *i as usize; + if let Some(buf) = buffers[idx].take() { + shards[idx].write_all(&buf)?; + } + } + + offset += to_process as u64; + remaining -= to_process as i64; + } + + // Close all shards + for shard in &mut shards { + shard.close(); + } + + Ok(()) +} + +/// Verify EC shards by computing parity against the existing data and identifying corrupted shards. +pub fn verify_ec_shards( + dir: &str, + collection: &str, + volume_id: VolumeId, + data_shards: usize, + parity_shards: usize, +) -> io::Result<(Vec, Vec)> { + let rs = ReedSolomon::new(data_shards, parity_shards).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e)) + })?; + + let total_shards = data_shards + parity_shards; + let mut shards: Vec = (0..total_shards as u8) + .map(|i| EcVolumeShard::new(dir, collection, volume_id, i)) + .collect(); + + let mut shard_size = 0; + let mut broken_shards = std::collections::HashSet::new(); + let mut details = Vec::new(); + + for (i, shard) in shards.iter_mut().enumerate() { + if let Ok(_) = shard.open() { + let size = shard.file_size(); + if size > shard_size { + shard_size = size; + } + } else { + broken_shards.insert(i as u32); + details.push(format!("failed to open or missing shard {}", i)); + } + } + + if shard_size == 0 || broken_shards.len() >= parity_shards { + // Can't do much if we don't know the size or have too many missing + return Ok((broken_shards.into_iter().collect(), details)); + } + + let block_size = ERASURE_CODING_SMALL_BLOCK_SIZE; + let mut remaining = shard_size; + let mut offset: u64 = 0; + + while remaining > 0 { + let to_process = remaining.min(block_size as i64) as usize; + let mut buffers = vec![vec![0u8; to_process]; total_shards]; + + let mut read_failed = false; for i in 0..total_shards { - if missing_shards.contains(&(i as u32)) { - if let Some(ref buf) = buffers[i] { - shards[i].write_at(buf, offset)?; + if !broken_shards.contains(&(i as u32)) { + if let Err(e) = shards[i].read_at(&mut buffers[i], offset) { + broken_shards.insert(i as u32); + details.push(format!("read error shard {}: {}", i, e)); + read_failed = true; + } + } else { + read_failed = true; + } + } + + // Only do verification if all shards were readable + if !read_failed { + // Need to convert Vec> to &[&[u8]] for rs.verify + let slice_ptrs: Vec<&[u8]> = buffers.iter().map(|v| v.as_slice()).collect(); + if let Ok(is_valid) = rs.verify(&slice_ptrs) { + if !is_valid { + // Reed-Solomon verification failed. We cannot easily pinpoint which shard + // is corrupted without recalculating parities or syndromes, so we just + // log that this batch has corruption. Wait, we can test each parity shard! + // Let's re-encode from the first `data_shards` and compare to the actual `parity_shards`. + + let mut verify_buffers = buffers.clone(); + // Clear the parity parts + for i in data_shards..total_shards { + verify_buffers[i].fill(0); + } + if rs.encode(&mut verify_buffers).is_ok() { + for i in 0..total_shards { + if buffers[i] != verify_buffers[i] { + broken_shards.insert(i as u32); + details.push(format!("parity mismatch on shard {} at offset {}", i, offset)); + } + } + } } } } @@ -140,12 +254,15 @@ pub fn rebuild_ec_files( remaining -= to_process as i64; } - // Close all + // Close all shards for shard in &mut shards { shard.close(); } - Ok(()) + let mut broken_vec: Vec = broken_shards.into_iter().collect(); + broken_vec.sort_unstable(); + + Ok((broken_vec, details)) } /// Write sorted .ecx index from .idx file.