Browse Source

Add rebuild_ecx_file for EC index reconstruction from data shards

rust-volume-server
Chris Lu 4 days ago
parent
commit
0b43e548f7
  1. 155
      seaweed-volume/src/storage/erasure_coding/ec_encoder.rs

155
seaweed-volume/src/storage/erasure_coding/ec_encoder.rs

@ -314,6 +314,161 @@ fn write_sorted_ecx_from_idx(idx_path: &str, ecx_path: &str) -> io::Result<()> {
Ok(())
}
/// Rebuild the .ecx index file by walking needles in the EC data shards.
///
/// This is the equivalent of Go's `RebuildEcxFile`. It reads the logical .dat
/// content from the EC data shards, walks through needle headers to extract
/// (needle_id, offset, size) entries, deduplicates them, and writes a sorted
/// .ecx index file.
pub fn rebuild_ecx_file(
dir: &str,
collection: &str,
volume_id: VolumeId,
data_shards: usize,
) -> io::Result<()> {
use crate::storage::needle::needle::get_actual_size;
use crate::storage::super_block::SUPER_BLOCK_SIZE;
let base = volume_file_name(dir, collection, volume_id);
let ecx_path = format!("{}.ecx", base);
// Open data shards to read logical .dat content
let mut shards: Vec<EcVolumeShard> = (0..data_shards as u8)
.map(|i| EcVolumeShard::new(dir, collection, volume_id, i))
.collect();
for shard in &mut shards {
if let Err(_) = shard.open() {
// If a data shard is missing, we can't rebuild ecx
for s in &mut shards {
s.close();
}
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!("cannot open data shard for ecx rebuild"),
));
}
}
// Determine total logical data size from shard sizes
let shard_size = shards.iter().map(|s| s.file_size()).max().unwrap_or(0);
let total_data_size = shard_size as i64 * data_shards as i64;
// Read version from superblock (first byte of logical data)
let mut sb_buf = [0u8; SUPER_BLOCK_SIZE];
read_from_data_shards(&shards, &mut sb_buf, 0, data_shards)?;
let version = Version(sb_buf[0]);
// Walk needles starting after superblock
let mut offset = SUPER_BLOCK_SIZE as i64;
let header_size = NEEDLE_HEADER_SIZE;
let mut entries: Vec<(NeedleId, Offset, Size)> = Vec::new();
while offset + header_size as i64 <= total_data_size {
// Read needle header (cookie + needle_id + size = 16 bytes)
let mut header_buf = [0u8; NEEDLE_HEADER_SIZE];
if read_from_data_shards(&shards, &mut header_buf, offset as u64, data_shards).is_err() {
break;
}
let cookie = Cookie::from_bytes(&header_buf[..COOKIE_SIZE]);
let needle_id = NeedleId::from_bytes(&header_buf[COOKIE_SIZE..COOKIE_SIZE + NEEDLE_ID_SIZE]);
let size = Size::from_bytes(&header_buf[COOKIE_SIZE + NEEDLE_ID_SIZE..header_size]);
// Validate: stop if we hit zero cookie+id (end of data)
if cookie.0 == 0 && needle_id.0 == 0 {
break;
}
// Validate size is reasonable
if size.0 < 0 && !size.is_deleted() {
break;
}
let actual_size = get_actual_size(size, version);
if actual_size <= 0 || offset + actual_size > total_data_size {
break;
}
entries.push((needle_id, Offset::from_actual_offset(offset), size));
// Advance to next needle (aligned to NEEDLE_PADDING_SIZE)
offset += actual_size;
let padding_rem = offset % NEEDLE_PADDING_SIZE as i64;
if padding_rem != 0 {
offset += NEEDLE_PADDING_SIZE as i64 - padding_rem;
}
}
for shard in &mut shards {
shard.close();
}
// Sort by NeedleId, then by offset (later entries override earlier)
entries.sort_by_key(|&(key, offset, _)| (key, offset.to_actual_offset()));
// Deduplicate: keep latest entry per needle_id
entries.reverse();
entries.dedup_by_key(|entry| entry.0);
entries.reverse();
// Write sorted .ecx
let mut ecx_file = File::create(&ecx_path)?;
for &(key, offset, size) in &entries {
idx::write_index_entry(&mut ecx_file, key, offset, size)?;
}
ecx_file.sync_all()?;
Ok(())
}
/// Read bytes from EC data shards at a logical offset in the .dat file.
fn read_from_data_shards(
shards: &[EcVolumeShard],
buf: &mut [u8],
logical_offset: u64,
data_shards: usize,
) -> io::Result<()> {
let small_block = ERASURE_CODING_SMALL_BLOCK_SIZE as u64;
let data_shards_u64 = data_shards as u64;
let mut bytes_read = 0u64;
let mut remaining = buf.len() as u64;
let mut current_offset = logical_offset;
while remaining > 0 {
// Determine which shard and at what shard-offset this logical offset maps to.
// The data is interleaved: large blocks first, then small blocks.
// For simplicity, use the small block size for all calculations since
// large blocks are multiples of small blocks.
let row_size = small_block * data_shards_u64;
let row_index = current_offset / row_size;
let row_offset = current_offset % row_size;
let shard_index = (row_offset / small_block) as usize;
let shard_offset = row_index * small_block + (row_offset % small_block);
if shard_index >= data_shards {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"shard index out of range",
));
}
// How many bytes can we read from this position in this shard block
let bytes_left_in_block = small_block - (row_offset % small_block);
let to_read = remaining.min(bytes_left_in_block) as usize;
let dest = &mut buf[bytes_read as usize..bytes_read as usize + to_read];
shards[shard_index].read_at(dest, shard_offset)?;
bytes_read += to_read as u64;
remaining -= to_read as u64;
current_offset += to_read as u64;
}
Ok(())
}
/// Encode the .dat file data into shard files.
///
/// Uses a two-phase approach matching Go's ec_encoder.go:

Loading…
Cancel
Save