diff --git a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs index 6acf1746c..b98db9fb0 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs @@ -314,6 +314,161 @@ fn write_sorted_ecx_from_idx(idx_path: &str, ecx_path: &str) -> io::Result<()> { Ok(()) } +/// Rebuild the .ecx index file by walking needles in the EC data shards. +/// +/// This is the equivalent of Go's `RebuildEcxFile`. It reads the logical .dat +/// content from the EC data shards, walks through needle headers to extract +/// (needle_id, offset, size) entries, deduplicates them, and writes a sorted +/// .ecx index file. +pub fn rebuild_ecx_file( + dir: &str, + collection: &str, + volume_id: VolumeId, + data_shards: usize, +) -> io::Result<()> { + use crate::storage::needle::needle::get_actual_size; + use crate::storage::super_block::SUPER_BLOCK_SIZE; + + let base = volume_file_name(dir, collection, volume_id); + let ecx_path = format!("{}.ecx", base); + + // Open data shards to read logical .dat content + let mut shards: Vec = (0..data_shards as u8) + .map(|i| EcVolumeShard::new(dir, collection, volume_id, i)) + .collect(); + + for shard in &mut shards { + if let Err(_) = shard.open() { + // If a data shard is missing, we can't rebuild ecx + for s in &mut shards { + s.close(); + } + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("cannot open data shard for ecx rebuild"), + )); + } + } + + // Determine total logical data size from shard sizes + let shard_size = shards.iter().map(|s| s.file_size()).max().unwrap_or(0); + let total_data_size = shard_size as i64 * data_shards as i64; + + // Read version from superblock (first byte of logical data) + let mut sb_buf = [0u8; SUPER_BLOCK_SIZE]; + read_from_data_shards(&shards, &mut sb_buf, 0, data_shards)?; + let version = Version(sb_buf[0]); + + // Walk needles starting after superblock + let mut offset = SUPER_BLOCK_SIZE as i64; + let header_size = NEEDLE_HEADER_SIZE; + let mut entries: Vec<(NeedleId, Offset, Size)> = Vec::new(); + + while offset + header_size as i64 <= total_data_size { + // Read needle header (cookie + needle_id + size = 16 bytes) + let mut header_buf = [0u8; NEEDLE_HEADER_SIZE]; + if read_from_data_shards(&shards, &mut header_buf, offset as u64, data_shards).is_err() { + break; + } + + let cookie = Cookie::from_bytes(&header_buf[..COOKIE_SIZE]); + let needle_id = NeedleId::from_bytes(&header_buf[COOKIE_SIZE..COOKIE_SIZE + NEEDLE_ID_SIZE]); + let size = Size::from_bytes(&header_buf[COOKIE_SIZE + NEEDLE_ID_SIZE..header_size]); + + // Validate: stop if we hit zero cookie+id (end of data) + if cookie.0 == 0 && needle_id.0 == 0 { + break; + } + + // Validate size is reasonable + if size.0 < 0 && !size.is_deleted() { + break; + } + + let actual_size = get_actual_size(size, version); + if actual_size <= 0 || offset + actual_size > total_data_size { + break; + } + + entries.push((needle_id, Offset::from_actual_offset(offset), size)); + + // Advance to next needle (aligned to NEEDLE_PADDING_SIZE) + offset += actual_size; + let padding_rem = offset % NEEDLE_PADDING_SIZE as i64; + if padding_rem != 0 { + offset += NEEDLE_PADDING_SIZE as i64 - padding_rem; + } + } + + for shard in &mut shards { + shard.close(); + } + + // Sort by NeedleId, then by offset (later entries override earlier) + entries.sort_by_key(|&(key, offset, _)| (key, offset.to_actual_offset())); + + // Deduplicate: keep latest entry per needle_id + entries.reverse(); + entries.dedup_by_key(|entry| entry.0); + entries.reverse(); + + // Write sorted .ecx + let mut ecx_file = File::create(&ecx_path)?; + for &(key, offset, size) in &entries { + idx::write_index_entry(&mut ecx_file, key, offset, size)?; + } + ecx_file.sync_all()?; + + Ok(()) +} + +/// Read bytes from EC data shards at a logical offset in the .dat file. +fn read_from_data_shards( + shards: &[EcVolumeShard], + buf: &mut [u8], + logical_offset: u64, + data_shards: usize, +) -> io::Result<()> { + let small_block = ERASURE_CODING_SMALL_BLOCK_SIZE as u64; + let data_shards_u64 = data_shards as u64; + + let mut bytes_read = 0u64; + let mut remaining = buf.len() as u64; + let mut current_offset = logical_offset; + + while remaining > 0 { + // Determine which shard and at what shard-offset this logical offset maps to. + // The data is interleaved: large blocks first, then small blocks. + // For simplicity, use the small block size for all calculations since + // large blocks are multiples of small blocks. + let row_size = small_block * data_shards_u64; + let row_index = current_offset / row_size; + let row_offset = current_offset % row_size; + let shard_index = (row_offset / small_block) as usize; + let shard_offset = row_index * small_block + (row_offset % small_block); + + if shard_index >= data_shards { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "shard index out of range", + )); + } + + // How many bytes can we read from this position in this shard block + let bytes_left_in_block = small_block - (row_offset % small_block); + let to_read = remaining.min(bytes_left_in_block) as usize; + + let dest = &mut buf[bytes_read as usize..bytes_read as usize + to_read]; + shards[shard_index].read_at(dest, shard_offset)?; + + bytes_read += to_read as u64; + remaining -= to_read as u64; + current_offset += to_read as u64; + } + + Ok(()) +} + /// Encode the .dat file data into shard files. /// /// Uses a two-phase approach matching Go's ec_encoder.go: