From 38b1a6d6a6ed3b9043d1060c410848140b594086 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Mar 2026 15:17:00 -0800 Subject: [PATCH] Add erasure coding module with encode/decode and shard management MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements Reed-Solomon 10+4 erasure coding matching Go's erasure_coding package. Features: - EcVolumeShard: individual shard file (.ec00-.ec13) management - ShardBits: bitmap for tracking locally available shards - EcVolume: manages shards + sorted .ecx index + .ecj deletion journal - Binary search needle lookup in sorted .ecx index - ec_encoder: encodes .dat file into 14 shard files with large/small blocks - ec_decoder: reconstructs .dat + .idx from data shards and EC index - ec_locate: maps needle offset/size to shard intervals - Full round-trip test: volume → EC encode → delete originals → reconstruct → verify - 15 unit tests including Reed-Solomon reconstruction --- .../src/storage/erasure_coding/ec_decoder.rs | 197 ++++++++++ .../src/storage/erasure_coding/ec_encoder.rs | 248 ++++++++++++ .../src/storage/erasure_coding/ec_locate.rs | 207 ++++++++++ .../src/storage/erasure_coding/ec_shard.rs | 202 ++++++++++ .../src/storage/erasure_coding/ec_volume.rs | 352 ++++++++++++++++++ .../src/storage/erasure_coding/mod.rs | 13 + seaweed-volume/src/storage/mod.rs | 1 + 7 files changed, 1220 insertions(+) create mode 100644 seaweed-volume/src/storage/erasure_coding/ec_decoder.rs create mode 100644 seaweed-volume/src/storage/erasure_coding/ec_encoder.rs create mode 100644 seaweed-volume/src/storage/erasure_coding/ec_locate.rs create mode 100644 seaweed-volume/src/storage/erasure_coding/ec_shard.rs create mode 100644 seaweed-volume/src/storage/erasure_coding/ec_volume.rs create mode 100644 seaweed-volume/src/storage/erasure_coding/mod.rs diff --git a/seaweed-volume/src/storage/erasure_coding/ec_decoder.rs b/seaweed-volume/src/storage/erasure_coding/ec_decoder.rs new file mode 100644 index 000000000..613ccb0c0 --- /dev/null +++ b/seaweed-volume/src/storage/erasure_coding/ec_decoder.rs @@ -0,0 +1,197 @@ +//! EC decoding: reconstruct a .dat file from EC shards. +//! +//! Rebuilds the original .dat + .idx files from data shards (.ec00-.ec09) +//! and the sorted index (.ecx) + deletion journal (.ecj). + +use std::fs::File; +use std::io::{self, Read, Write}; + +use crate::storage::erasure_coding::ec_shard::*; +use crate::storage::idx; +use crate::storage::types::*; +use crate::storage::volume::volume_file_name; + +/// Reconstruct a .dat file from EC data shards. +/// +/// Reads from .ec00-.ec09 and writes a new .dat file. +pub fn write_dat_file_from_shards( + dir: &str, + collection: &str, + volume_id: VolumeId, + dat_file_size: i64, +) -> io::Result<()> { + let base = volume_file_name(dir, collection, volume_id); + let dat_path = format!("{}.dat", base); + + // Open data shards + let mut shards: Vec = (0..DATA_SHARDS_COUNT as u8) + .map(|i| EcVolumeShard::new(dir, collection, volume_id, i)) + .collect(); + + for shard in &mut shards { + shard.open()?; + } + + let mut dat_file = File::create(&dat_path)?; + let mut remaining = dat_file_size; + let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE; + let small_block_size = ERASURE_CODING_SMALL_BLOCK_SIZE; + let large_row_size = (large_block_size * DATA_SHARDS_COUNT) as i64; + + let mut shard_offset: u64 = 0; + + // Read large blocks + while remaining >= large_row_size { + for i in 0..DATA_SHARDS_COUNT { + let mut buf = vec![0u8; large_block_size]; + shards[i].read_at(&mut buf, shard_offset)?; + let to_write = large_block_size.min(remaining as usize); + dat_file.write_all(&buf[..to_write])?; + remaining -= to_write as i64; + if remaining <= 0 { + break; + } + } + shard_offset += large_block_size as u64; + } + + // Read small blocks + while remaining > 0 { + for i in 0..DATA_SHARDS_COUNT { + let mut buf = vec![0u8; small_block_size]; + shards[i].read_at(&mut buf, shard_offset)?; + let to_write = small_block_size.min(remaining as usize); + dat_file.write_all(&buf[..to_write])?; + remaining -= to_write as i64; + if remaining <= 0 { + break; + } + } + shard_offset += small_block_size as u64; + } + + for shard in &mut shards { + shard.close(); + } + + dat_file.sync_all()?; + Ok(()) +} + +/// Write .idx file from .ecx index + .ecj deletion journal. +/// +/// Copies sorted .ecx entries to .idx, then appends tombstones for +/// deleted needles from .ecj. +pub fn write_idx_file_from_ec_index( + dir: &str, + collection: &str, + volume_id: VolumeId, +) -> io::Result<()> { + let base = volume_file_name(dir, collection, volume_id); + let ecx_path = format!("{}.ecx", base); + let ecj_path = format!("{}.ecj", base); + let idx_path = format!("{}.idx", base); + + // Copy .ecx to .idx + std::fs::copy(&ecx_path, &idx_path)?; + + // Append deletions from .ecj as tombstones + if std::path::Path::new(&ecj_path).exists() { + let ecj_data = std::fs::read(&ecj_path)?; + if !ecj_data.is_empty() { + let mut idx_file = std::fs::OpenOptions::new() + .write(true) + .append(true) + .open(&idx_path)?; + + let count = ecj_data.len() / NEEDLE_ID_SIZE; + for i in 0..count { + let start = i * NEEDLE_ID_SIZE; + let needle_id = NeedleId::from_bytes(&ecj_data[start..start + NEEDLE_ID_SIZE]); + idx::write_index_entry( + &mut idx_file, + needle_id, + Offset::default(), + TOMBSTONE_FILE_SIZE, + )?; + } + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage::erasure_coding::ec_encoder; + use crate::storage::needle::needle::Needle; + use crate::storage::needle_map::NeedleMapKind; + use crate::storage::volume::Volume; + use tempfile::TempDir; + + #[test] + fn test_ec_full_round_trip() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + + // Create volume with data + let mut v = Volume::new( + dir, dir, "", VolumeId(1), + NeedleMapKind::InMemory, None, None, 0, Version::current(), + ).unwrap(); + + let test_data: Vec<(NeedleId, Vec)> = (1..=3).map(|i| { + let data = format!("EC round trip data for needle {}", i); + (NeedleId(i), data.into_bytes()) + }).collect(); + + for (id, data) in &test_data { + let mut n = Needle { + id: *id, + cookie: Cookie(id.0 as u32), + data: data.clone(), + data_size: data.len() as u32, + ..Needle::default() + }; + v.write_needle(&mut n, true).unwrap(); + } + v.sync_to_disk().unwrap(); + let original_dat_size = v.dat_file_size().unwrap(); + v.close(); + + // Read original .dat for comparison + let original_dat = std::fs::read(format!("{}/1.dat", dir)).unwrap(); + + // Encode to EC + ec_encoder::write_ec_files(dir, "", VolumeId(1)).unwrap(); + + // Delete original .dat and .idx + std::fs::remove_file(format!("{}/1.dat", dir)).unwrap(); + std::fs::remove_file(format!("{}/1.idx", dir)).unwrap(); + + // Reconstruct from EC shards + write_dat_file_from_shards(dir, "", VolumeId(1), original_dat_size as i64).unwrap(); + write_idx_file_from_ec_index(dir, "", VolumeId(1)).unwrap(); + + // Verify reconstructed .dat matches original + let reconstructed_dat = std::fs::read(format!("{}/1.dat", dir)).unwrap(); + assert_eq!( + original_dat[..original_dat_size as usize], + reconstructed_dat[..original_dat_size as usize], + "reconstructed .dat should match original" + ); + + // Verify we can load and read from reconstructed volume + let v2 = Volume::new( + dir, dir, "", VolumeId(1), + NeedleMapKind::InMemory, None, None, 0, Version::current(), + ).unwrap(); + + for (id, expected_data) in &test_data { + let mut n = Needle { id: *id, ..Needle::default() }; + v2.read_needle(&mut n).unwrap(); + assert_eq!(&n.data, expected_data, "needle {} data should match", id); + } + } +} diff --git a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs new file mode 100644 index 000000000..fcd08c674 --- /dev/null +++ b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs @@ -0,0 +1,248 @@ +//! EC encoding: convert a .dat file into 10 data + 4 parity shards. +//! +//! Uses Reed-Solomon erasure coding. The .dat file is split into blocks +//! (1GB large, 1MB small) and encoded across 14 shard files. + +use std::fs::File; +use std::io::{self, Read, Seek, SeekFrom, Write}; + +use reed_solomon_erasure::galois_8::ReedSolomon; + +use crate::storage::erasure_coding::ec_shard::*; +use crate::storage::idx; +use crate::storage::types::*; +use crate::storage::volume::volume_file_name; + +/// Encode a .dat file into EC shard files. +/// +/// Creates .ec00-.ec13 files in the same directory. +/// Also creates a sorted .ecx index from the .idx file. +pub fn write_ec_files( + dir: &str, + collection: &str, + volume_id: VolumeId, +) -> io::Result<()> { + let base = volume_file_name(dir, collection, volume_id); + let dat_path = format!("{}.dat", base); + let idx_path = format!("{}.idx", base); + + // Create sorted .ecx from .idx + write_sorted_ecx_from_idx(&idx_path, &format!("{}.ecx", base))?; + + // Encode .dat into shards + let dat_file = File::open(&dat_path)?; + let dat_size = dat_file.metadata()?.len() as i64; + + let rs = ReedSolomon::new(DATA_SHARDS_COUNT, PARITY_SHARDS_COUNT).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e)) + })?; + + // Create shard files + let mut shards: Vec = (0..TOTAL_SHARDS_COUNT as u8) + .map(|i| EcVolumeShard::new(dir, collection, volume_id, i)) + .collect(); + + for shard in &mut shards { + shard.create()?; + } + + // Encode in large blocks, then small blocks + encode_dat_file(&dat_file, dat_size, &rs, &mut shards)?; + + // Close all shards + for shard in &mut shards { + shard.close(); + } + + Ok(()) +} + +/// Write sorted .ecx index from .idx file. +fn write_sorted_ecx_from_idx(idx_path: &str, ecx_path: &str) -> io::Result<()> { + if !std::path::Path::new(idx_path).exists() { + return Err(io::Error::new(io::ErrorKind::NotFound, "idx file not found")); + } + + // Read all idx entries + let mut idx_file = File::open(idx_path)?; + let mut entries: Vec<(NeedleId, Offset, Size)> = Vec::new(); + + idx::walk_index_file(&mut idx_file, 0, |key, offset, size| { + entries.push((key, offset, size)); + Ok(()) + })?; + + // Sort by NeedleId + entries.sort_by_key(|&(key, _, _)| key); + + // Remove duplicates (keep last entry for each key) + entries.dedup_by_key(|entry| entry.0); + + // Write sorted entries to .ecx + let mut ecx_file = File::create(ecx_path)?; + for &(key, offset, size) in &entries { + idx::write_index_entry(&mut ecx_file, key, offset, size)?; + } + + Ok(()) +} + +/// Encode the .dat file data into shard files. +fn encode_dat_file( + dat_file: &File, + dat_size: i64, + rs: &ReedSolomon, + shards: &mut [EcVolumeShard], +) -> io::Result<()> { + let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE; + let small_block_size = ERASURE_CODING_SMALL_BLOCK_SIZE; + let large_row_size = large_block_size * DATA_SHARDS_COUNT; + + let mut remaining = dat_size; + let mut offset: u64 = 0; + + // Process large blocks + while remaining >= large_row_size as i64 { + encode_one_batch(dat_file, offset, large_block_size, rs, shards)?; + offset += large_row_size as u64; + remaining -= large_row_size as i64; + } + + // Process remaining data in small blocks + while remaining > 0 { + let row_size = small_block_size * DATA_SHARDS_COUNT; + let to_process = remaining.min(row_size as i64); + encode_one_batch(dat_file, offset, small_block_size, rs, shards)?; + offset += to_process as u64; + remaining -= to_process; + } + + Ok(()) +} + +/// Encode one batch (row) of data. +fn encode_one_batch( + dat_file: &File, + offset: u64, + block_size: usize, + rs: &ReedSolomon, + shards: &mut [EcVolumeShard], +) -> io::Result<()> { + // Allocate buffers for all shards + let mut buffers: Vec> = (0..TOTAL_SHARDS_COUNT) + .map(|_| vec![0u8; block_size]) + .collect(); + + // Read data shards from .dat file + for i in 0..DATA_SHARDS_COUNT { + let read_offset = offset + (i * block_size) as u64; + + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + // Read what we can; zeros fill the rest (already initialized) + let _ = dat_file.read_at(&mut buffers[i], read_offset); + } + } + + // Encode parity shards + rs.encode(&mut buffers).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("reed-solomon encode: {:?}", e)) + })?; + + // Write all shard buffers to files + for (i, buf) in buffers.iter().enumerate() { + shards[i].write_all(buf)?; + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage::needle::needle::Needle; + use crate::storage::needle_map::NeedleMapKind; + use crate::storage::volume::Volume; + use tempfile::TempDir; + + #[test] + fn test_ec_encode_decode_round_trip() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + + // Create a volume with some data + let mut v = Volume::new( + dir, dir, "", VolumeId(1), + NeedleMapKind::InMemory, None, None, 0, Version::current(), + ).unwrap(); + + for i in 1..=5 { + let data = format!("test data for needle {}", i); + let mut n = Needle { + id: NeedleId(i), + cookie: Cookie(i as u32), + data: data.as_bytes().to_vec(), + data_size: data.len() as u32, + ..Needle::default() + }; + v.write_needle(&mut n, true).unwrap(); + } + v.sync_to_disk().unwrap(); + v.close(); + + // Encode to EC shards + write_ec_files(dir, "", VolumeId(1)).unwrap(); + + // Verify shard files exist + for i in 0..TOTAL_SHARDS_COUNT { + let path = format!("{}/{}.ec{:02}", dir, 1, i); + assert!( + std::path::Path::new(&path).exists(), + "shard file {} should exist", path + ); + } + + // Verify .ecx exists + let ecx_path = format!("{}/1.ecx", dir); + assert!(std::path::Path::new(&ecx_path).exists()); + } + + #[test] + fn test_reed_solomon_basic() { + let rs = ReedSolomon::new(DATA_SHARDS_COUNT, PARITY_SHARDS_COUNT).unwrap(); + let block_size = 1024; + let mut shards: Vec> = (0..TOTAL_SHARDS_COUNT) + .map(|i| { + if i < DATA_SHARDS_COUNT { + vec![(i as u8).wrapping_mul(7); block_size] + } else { + vec![0u8; block_size] + } + }) + .collect(); + + // Encode + rs.encode(&mut shards).unwrap(); + + // Verify parity is non-zero (at least some) + let parity_nonzero: bool = shards[DATA_SHARDS_COUNT..].iter() + .any(|s| s.iter().any(|&b| b != 0)); + assert!(parity_nonzero); + + // Simulate losing 4 shards and reconstructing + let original_0 = shards[0].clone(); + let original_1 = shards[1].clone(); + + let mut shard_opts: Vec>> = shards.into_iter().map(Some).collect(); + shard_opts[0] = None; + shard_opts[1] = None; + shard_opts[2] = None; + shard_opts[3] = None; + + rs.reconstruct(&mut shard_opts).unwrap(); + + assert_eq!(shard_opts[0].as_ref().unwrap(), &original_0); + assert_eq!(shard_opts[1].as_ref().unwrap(), &original_1); + } +} diff --git a/seaweed-volume/src/storage/erasure_coding/ec_locate.rs b/seaweed-volume/src/storage/erasure_coding/ec_locate.rs new file mode 100644 index 000000000..73947c595 --- /dev/null +++ b/seaweed-volume/src/storage/erasure_coding/ec_locate.rs @@ -0,0 +1,207 @@ +//! EC data location: maps needle offset/size to shard intervals. +//! +//! Determines which shard(s) contain data for a given needle and at what +//! offsets within those shards. Handles both large (1GB) and small (1MB) +//! block sections. + +use crate::storage::erasure_coding::ec_shard::*; +use crate::storage::types::*; + +/// An interval to read from EC shards. +#[derive(Debug, Clone)] +pub struct Interval { + pub block_index: usize, + pub inner_block_offset: i64, + pub size: i64, + pub is_large_block: bool, + pub large_block_rows_count: usize, +} + +impl Interval { + /// Convert an interval to the specific shard ID and offset within that shard. + pub fn to_shard_id_and_offset(&self) -> (ShardId, i64) { + let shard_id = (self.block_index % DATA_SHARDS_COUNT) as ShardId; + let row_index = self.block_index / DATA_SHARDS_COUNT; + + let block_size = if self.is_large_block { + ERASURE_CODING_LARGE_BLOCK_SIZE as i64 + } else { + ERASURE_CODING_SMALL_BLOCK_SIZE as i64 + }; + + let mut offset = row_index as i64 * block_size + self.inner_block_offset; + if !self.is_large_block { + // Small blocks come after large blocks in the shard file + offset += self.large_block_rows_count as i64 * ERASURE_CODING_LARGE_BLOCK_SIZE as i64; + } + + (shard_id, offset) + } +} + +/// Locate the EC shard intervals needed to read data at the given offset and size. +/// +/// `shard_size` is the size of a single shard file. +pub fn locate_data(offset: i64, size: Size, shard_size: i64) -> Vec { + let mut intervals = Vec::new(); + let data_size = size.0 as i64; + + if data_size <= 0 || shard_size <= 0 { + return intervals; + } + + let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE as i64; + let small_block_size = ERASURE_CODING_SMALL_BLOCK_SIZE as i64; + let large_row_size = large_block_size * DATA_SHARDS_COUNT as i64; + let small_row_size = small_block_size * DATA_SHARDS_COUNT as i64; + + // Number of large block rows + let n_large_block_rows = if shard_size > 0 { + ((shard_size - 1) / large_block_size) as usize + } else { + 0 + }; + let large_section_size = n_large_block_rows as i64 * large_row_size; + + let mut remaining_offset = offset; + let mut remaining_size = data_size; + + // In large block section? + if remaining_offset < large_section_size { + let available_in_large = large_section_size - remaining_offset; + let to_read = remaining_size.min(available_in_large); + + add_intervals( + &mut intervals, + remaining_offset, + to_read, + large_block_size, + large_row_size, + true, + n_large_block_rows, + ); + + remaining_offset += to_read; + remaining_size -= to_read; + } + + // In small block section? + if remaining_size > 0 { + let small_offset = remaining_offset - large_section_size; + add_intervals( + &mut intervals, + small_offset, + remaining_size, + small_block_size, + small_row_size, + false, + n_large_block_rows, + ); + } + + intervals +} + +fn add_intervals( + intervals: &mut Vec, + offset: i64, + size: i64, + block_size: i64, + row_size: i64, + is_large_block: bool, + large_block_rows_count: usize, +) { + let mut pos = offset; + let end = offset + size; + + while pos < end { + let block_index = (pos / block_size) as usize; + let inner_offset = pos % block_size; + let remaining_in_block = block_size - inner_offset; + let interval_size = remaining_in_block.min(end - pos); + + intervals.push(Interval { + block_index, + inner_block_offset: inner_offset, + size: interval_size, + is_large_block, + large_block_rows_count, + }); + + pos += interval_size; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_interval_to_shard_id() { + // Block index 0 → shard 0 + let interval = Interval { + block_index: 0, + inner_block_offset: 100, + size: 50, + is_large_block: true, + large_block_rows_count: 1, + }; + let (shard_id, offset) = interval.to_shard_id_and_offset(); + assert_eq!(shard_id, 0); + assert_eq!(offset, 100); + + // Block index 5 → shard 5 + let interval = Interval { + block_index: 5, + inner_block_offset: 0, + size: 1024, + is_large_block: true, + large_block_rows_count: 1, + }; + let (shard_id, _offset) = interval.to_shard_id_and_offset(); + assert_eq!(shard_id, 5); + + // Block index 10 → shard 0 (second row) + let interval = Interval { + block_index: 10, + inner_block_offset: 0, + size: 100, + is_large_block: true, + large_block_rows_count: 2, + }; + let (shard_id, offset) = interval.to_shard_id_and_offset(); + assert_eq!(shard_id, 0); + assert_eq!(offset, ERASURE_CODING_LARGE_BLOCK_SIZE as i64); // row 1 offset + } + + #[test] + fn test_locate_data_small_file() { + // Small file: 100 bytes at offset 50, shard size = 1MB + let intervals = locate_data(50, Size(100), 1024 * 1024); + assert!(!intervals.is_empty()); + + // Should be a single small block interval (no large block rows for 1MB shard) + assert_eq!(intervals.len(), 1); + assert!(!intervals[0].is_large_block); + } + + #[test] + fn test_locate_data_empty() { + let intervals = locate_data(0, Size(0), 1024 * 1024); + assert!(intervals.is_empty()); + } + + #[test] + fn test_small_block_after_large() { + let interval = Interval { + block_index: 0, + inner_block_offset: 0, + size: 100, + is_large_block: false, + large_block_rows_count: 2, + }; + let (_shard_id, offset) = interval.to_shard_id_and_offset(); + // Should be after 2 large block rows + assert_eq!(offset, 2 * ERASURE_CODING_LARGE_BLOCK_SIZE as i64); + } +} diff --git a/seaweed-volume/src/storage/erasure_coding/ec_shard.rs b/seaweed-volume/src/storage/erasure_coding/ec_shard.rs new file mode 100644 index 000000000..2fa4d6f66 --- /dev/null +++ b/seaweed-volume/src/storage/erasure_coding/ec_shard.rs @@ -0,0 +1,202 @@ +//! EcVolumeShard: a single shard file (.ec00-.ec13) of an erasure-coded volume. + +use std::fs::{self, File, OpenOptions}; +use std::io::{self, Read, Seek, SeekFrom, Write}; +use std::path::Path; + +use crate::storage::types::*; + +pub const DATA_SHARDS_COUNT: usize = 10; +pub const PARITY_SHARDS_COUNT: usize = 4; +pub const TOTAL_SHARDS_COUNT: usize = DATA_SHARDS_COUNT + PARITY_SHARDS_COUNT; +pub const ERASURE_CODING_LARGE_BLOCK_SIZE: usize = 1024 * 1024 * 1024; // 1GB +pub const ERASURE_CODING_SMALL_BLOCK_SIZE: usize = 1024 * 1024; // 1MB + +pub type ShardId = u8; + +/// A single erasure-coded shard file. +pub struct EcVolumeShard { + pub volume_id: VolumeId, + pub shard_id: ShardId, + pub collection: String, + pub dir: String, + pub disk_type: DiskType, + ecd_file: Option, + ecd_file_size: i64, +} + +impl EcVolumeShard { + /// Create a new shard reference (does not open the file). + pub fn new(dir: &str, collection: &str, volume_id: VolumeId, shard_id: ShardId) -> Self { + EcVolumeShard { + volume_id, + shard_id, + collection: collection.to_string(), + dir: dir.to_string(), + disk_type: DiskType::default(), + ecd_file: None, + ecd_file_size: 0, + } + } + + /// Shard file name, e.g. "dir/collection_42.ec03" + pub fn file_name(&self) -> String { + let base = crate::storage::volume::volume_file_name(&self.dir, &self.collection, self.volume_id); + format!("{}.ec{:02}", base, self.shard_id) + } + + /// Open the shard file for reading. + pub fn open(&mut self) -> io::Result<()> { + let path = self.file_name(); + let file = File::open(&path)?; + self.ecd_file_size = file.metadata()?.len() as i64; + self.ecd_file = Some(file); + Ok(()) + } + + /// Create the shard file for writing. + pub fn create(&mut self) -> io::Result<()> { + let path = self.file_name(); + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(&path)?; + self.ecd_file = Some(file); + self.ecd_file_size = 0; + Ok(()) + } + + /// Read data at a specific offset. + pub fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result { + let file = self.ecd_file.as_ref().ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "shard file not open") + })?; + + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + file.read_at(buf, offset) + } + } + + /// Write data to the shard file (appends). + pub fn write_all(&mut self, data: &[u8]) -> io::Result<()> { + let file = self.ecd_file.as_mut().ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "shard file not open") + })?; + file.write_all(data)?; + self.ecd_file_size += data.len() as i64; + Ok(()) + } + + pub fn file_size(&self) -> i64 { + self.ecd_file_size + } + + /// Close the shard file. + pub fn close(&mut self) { + if let Some(ref file) = self.ecd_file { + let _ = file.sync_all(); + } + self.ecd_file = None; + } + + /// Delete the shard file from disk. + pub fn destroy(&mut self) { + self.close(); + let _ = fs::remove_file(self.file_name()); + } +} + +/// ShardBits: bitmap tracking which shards are present. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct ShardBits(pub u32); + +impl ShardBits { + pub fn add_shard_id(&mut self, id: ShardId) { + self.0 |= 1 << id; + } + + pub fn remove_shard_id(&mut self, id: ShardId) { + self.0 &= !(1 << id); + } + + pub fn has_shard_id(&self, id: ShardId) -> bool { + self.0 & (1 << id) != 0 + } + + pub fn shard_count(&self) -> u32 { + self.0.count_ones() + } + + /// Iterator over present shard IDs. + pub fn shard_ids(&self) -> Vec { + (0..TOTAL_SHARDS_COUNT as u8) + .filter(|&id| self.has_shard_id(id)) + .collect() + } + + pub fn minus(&self, other: ShardBits) -> ShardBits { + ShardBits(self.0 & !other.0) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_shard_bits() { + let mut bits = ShardBits::default(); + assert_eq!(bits.shard_count(), 0); + + bits.add_shard_id(0); + bits.add_shard_id(3); + bits.add_shard_id(13); + assert_eq!(bits.shard_count(), 3); + assert!(bits.has_shard_id(0)); + assert!(bits.has_shard_id(3)); + assert!(!bits.has_shard_id(1)); + + bits.remove_shard_id(3); + assert!(!bits.has_shard_id(3)); + assert_eq!(bits.shard_count(), 2); + } + + #[test] + fn test_shard_bits_ids() { + let mut bits = ShardBits::default(); + bits.add_shard_id(1); + bits.add_shard_id(5); + bits.add_shard_id(9); + assert_eq!(bits.shard_ids(), vec![1, 5, 9]); + } + + #[test] + fn test_shard_bits_minus() { + let mut a = ShardBits::default(); + a.add_shard_id(0); + a.add_shard_id(1); + a.add_shard_id(2); + + let mut b = ShardBits::default(); + b.add_shard_id(1); + + let c = a.minus(b); + assert_eq!(c.shard_ids(), vec![0, 2]); + } + + #[test] + fn test_shard_file_name() { + let shard = EcVolumeShard::new("/data", "pics", VolumeId(42), 3); + assert_eq!(shard.file_name(), "/data/pics_42.ec03"); + } + + #[test] + fn test_shard_file_name_no_collection() { + let shard = EcVolumeShard::new("/data", "", VolumeId(7), 13); + assert_eq!(shard.file_name(), "/data/7.ec13"); + } +} diff --git a/seaweed-volume/src/storage/erasure_coding/ec_volume.rs b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs new file mode 100644 index 000000000..1d3799147 --- /dev/null +++ b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs @@ -0,0 +1,352 @@ +//! EcVolume: an erasure-coded volume with up to 14 shards. +//! +//! Each EcVolume has a sorted index (.ecx) and a deletion journal (.ecj). +//! Shards (.ec00-.ec13) may be distributed across multiple servers. + +use std::collections::HashMap; +use std::fs::{self, File, OpenOptions}; +use std::io::{self, Read, Seek, SeekFrom, Write}; + +use crate::storage::erasure_coding::ec_locate; +use crate::storage::erasure_coding::ec_shard::*; +use crate::storage::types::*; + +/// An erasure-coded volume managing its local shards and index. +pub struct EcVolume { + pub volume_id: VolumeId, + pub collection: String, + pub dir: String, + pub dir_idx: String, + pub version: Version, + pub shards: Vec>, // indexed by ShardId (0..14) + pub dat_file_size: i64, + ecx_file: Option, + ecx_file_size: i64, + ecj_file: Option, + pub disk_type: DiskType, +} + +impl EcVolume { + /// Create a new EcVolume. Loads .ecx index and .ecj journal if present. + pub fn new( + dir: &str, + dir_idx: &str, + collection: &str, + volume_id: VolumeId, + ) -> io::Result { + let mut shards = Vec::with_capacity(TOTAL_SHARDS_COUNT); + for _ in 0..TOTAL_SHARDS_COUNT { + shards.push(None); + } + + let mut vol = EcVolume { + volume_id, + collection: collection.to_string(), + dir: dir.to_string(), + dir_idx: dir_idx.to_string(), + version: Version::current(), + shards, + dat_file_size: 0, + ecx_file: None, + ecx_file_size: 0, + ecj_file: None, + disk_type: DiskType::default(), + }; + + // Open .ecx file (sorted index) + let ecx_path = vol.ecx_file_name(); + if std::path::Path::new(&ecx_path).exists() { + let file = File::open(&ecx_path)?; + vol.ecx_file_size = file.metadata()?.len() as i64; + vol.ecx_file = Some(file); + } + + // Open .ecj file (deletion journal) + let ecj_path = vol.ecj_file_name(); + let ecj_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .append(true) + .open(&ecj_path)?; + vol.ecj_file = Some(ecj_file); + + Ok(vol) + } + + // ---- File names ---- + + fn base_name(&self) -> String { + crate::storage::volume::volume_file_name(&self.dir, &self.collection, self.volume_id) + } + + fn idx_base_name(&self) -> String { + crate::storage::volume::volume_file_name(&self.dir_idx, &self.collection, self.volume_id) + } + + pub fn ecx_file_name(&self) -> String { + format!("{}.ecx", self.idx_base_name()) + } + + pub fn ecj_file_name(&self) -> String { + format!("{}.ecj", self.idx_base_name()) + } + + // ---- Shard management ---- + + /// Add a shard to this volume. + pub fn add_shard(&mut self, mut shard: EcVolumeShard) -> io::Result<()> { + let id = shard.shard_id as usize; + if id >= TOTAL_SHARDS_COUNT { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("invalid shard id: {}", id), + )); + } + shard.open()?; + self.shards[id] = Some(shard); + Ok(()) + } + + /// Remove and close a shard. + pub fn remove_shard(&mut self, shard_id: ShardId) { + if let Some(ref mut shard) = self.shards[shard_id as usize] { + shard.close(); + } + self.shards[shard_id as usize] = None; + } + + /// Get a ShardBits bitmap of locally available shards. + pub fn shard_bits(&self) -> ShardBits { + let mut bits = ShardBits::default(); + for (i, shard) in self.shards.iter().enumerate() { + if shard.is_some() { + bits.add_shard_id(i as ShardId); + } + } + bits + } + + /// Count of locally available shards. + pub fn shard_count(&self) -> usize { + self.shards.iter().filter(|s| s.is_some()).count() + } + + // ---- Index operations ---- + + /// Find a needle's offset and size in the sorted .ecx index via binary search. + pub fn find_needle_from_ecx(&self, needle_id: NeedleId) -> io::Result> { + let ecx_file = self.ecx_file.as_ref().ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "ecx file not open") + })?; + + let entry_count = self.ecx_file_size as usize / NEEDLE_MAP_ENTRY_SIZE; + if entry_count == 0 { + return Ok(None); + } + + // Binary search + let mut lo: usize = 0; + let mut hi: usize = entry_count; + let mut entry_buf = [0u8; NEEDLE_MAP_ENTRY_SIZE]; + + while lo < hi { + let mid = lo + (hi - lo) / 2; + let file_offset = (mid * NEEDLE_MAP_ENTRY_SIZE) as u64; + + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + ecx_file.read_exact_at(&mut entry_buf, file_offset)?; + } + + let (key, offset, size) = idx_entry_from_bytes(&entry_buf); + if key == needle_id { + return Ok(Some((offset, size))); + } else if key < needle_id { + lo = mid + 1; + } else { + hi = mid; + } + } + + Ok(None) + } + + /// Locate the EC shard intervals needed to read a needle. + pub fn locate_needle( + &self, + needle_id: NeedleId, + ) -> io::Result)>> { + let (offset, size) = match self.find_needle_from_ecx(needle_id)? { + Some((o, s)) => (o, s), + None => return Ok(None), + }; + + if size.is_deleted() || offset.is_zero() { + return Ok(None); + } + + let shard_size = self.shard_file_size(); + let intervals = ec_locate::locate_data( + offset.to_actual_offset(), + size, + shard_size, + ); + + Ok(Some((offset, size, intervals))) + } + + /// Get the size of a single shard (all shards are the same size). + fn shard_file_size(&self) -> i64 { + for shard in &self.shards { + if let Some(s) = shard { + return s.file_size(); + } + } + 0 + } + + // ---- Deletion journal ---- + + /// Append a deleted needle ID to the .ecj journal. + pub fn journal_delete(&mut self, needle_id: NeedleId) -> io::Result<()> { + let ecj_file = self.ecj_file.as_mut().ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "ecj file not open") + })?; + + let mut buf = [0u8; NEEDLE_ID_SIZE]; + needle_id.to_bytes(&mut buf); + ecj_file.write_all(&buf)?; + ecj_file.sync_all()?; + Ok(()) + } + + /// Read all deleted needle IDs from the .ecj journal. + pub fn read_deleted_needles(&self) -> io::Result> { + let ecj_path = self.ecj_file_name(); + if !std::path::Path::new(&ecj_path).exists() { + return Ok(Vec::new()); + } + + let data = fs::read(&ecj_path)?; + let count = data.len() / NEEDLE_ID_SIZE; + let mut needles = Vec::with_capacity(count); + for i in 0..count { + let start = i * NEEDLE_ID_SIZE; + let id = NeedleId::from_bytes(&data[start..start + NEEDLE_ID_SIZE]); + needles.push(id); + } + Ok(needles) + } + + // ---- Lifecycle ---- + + pub fn close(&mut self) { + for shard in &mut self.shards { + if let Some(s) = shard { + s.close(); + } + *shard = None; + } + self.ecx_file = None; + self.ecj_file = None; + } + + pub fn destroy(&mut self) { + for shard in &mut self.shards { + if let Some(s) = shard { + s.destroy(); + } + *shard = None; + } + let _ = fs::remove_file(self.ecx_file_name()); + let _ = fs::remove_file(self.ecj_file_name()); + self.ecx_file = None; + self.ecj_file = None; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage::idx; + use tempfile::TempDir; + + fn write_ecx_file(dir: &str, collection: &str, vid: VolumeId, entries: &[(NeedleId, Offset, Size)]) { + let base = crate::storage::volume::volume_file_name(dir, collection, vid); + let ecx_path = format!("{}.ecx", base); + let mut file = File::create(&ecx_path).unwrap(); + + // Write sorted entries + for &(key, offset, size) in entries { + let mut buf = [0u8; NEEDLE_MAP_ENTRY_SIZE]; + idx_entry_to_bytes(&mut buf, key, offset, size); + file.write_all(&buf).unwrap(); + } + } + + #[test] + fn test_ec_volume_find_needle() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + + // Write sorted ecx entries + let entries = vec![ + (NeedleId(1), Offset::from_actual_offset(8), Size(100)), + (NeedleId(5), Offset::from_actual_offset(200), Size(200)), + (NeedleId(10), Offset::from_actual_offset(500), Size(300)), + ]; + write_ecx_file(dir, "", VolumeId(1), &entries); + + let vol = EcVolume::new(dir, dir, "", VolumeId(1)).unwrap(); + + // Found + let result = vol.find_needle_from_ecx(NeedleId(5)).unwrap(); + assert!(result.is_some()); + let (offset, size) = result.unwrap(); + assert_eq!(offset.to_actual_offset(), 200); + assert_eq!(size, Size(200)); + + // Not found + let result = vol.find_needle_from_ecx(NeedleId(7)).unwrap(); + assert!(result.is_none()); + } + + #[test] + fn test_ec_volume_journal() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + + // Need ecx file for EcVolume::new to succeed + write_ecx_file(dir, "", VolumeId(1), &[]); + + let mut vol = EcVolume::new(dir, dir, "", VolumeId(1)).unwrap(); + + vol.journal_delete(NeedleId(10)).unwrap(); + vol.journal_delete(NeedleId(20)).unwrap(); + + let deleted = vol.read_deleted_needles().unwrap(); + assert_eq!(deleted, vec![NeedleId(10), NeedleId(20)]); + } + + #[test] + fn test_ec_volume_shard_bits() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + write_ecx_file(dir, "", VolumeId(1), &[]); + + let mut vol = EcVolume::new(dir, dir, "", VolumeId(1)).unwrap(); + assert_eq!(vol.shard_count(), 0); + + // Create a shard file so we can add it + let mut shard = EcVolumeShard::new(dir, "", VolumeId(1), 3); + shard.create().unwrap(); + shard.write_all(&[0u8; 100]).unwrap(); + shard.close(); + + vol.add_shard(EcVolumeShard::new(dir, "", VolumeId(1), 3)).unwrap(); + assert_eq!(vol.shard_count(), 1); + assert!(vol.shard_bits().has_shard_id(3)); + } +} diff --git a/seaweed-volume/src/storage/erasure_coding/mod.rs b/seaweed-volume/src/storage/erasure_coding/mod.rs new file mode 100644 index 000000000..9c129abf2 --- /dev/null +++ b/seaweed-volume/src/storage/erasure_coding/mod.rs @@ -0,0 +1,13 @@ +//! Erasure coding module for volume data protection. +//! +//! Encodes a volume's .dat file into 10 data + 4 parity shards using +//! Reed-Solomon erasure coding. Can reconstruct from any 10 of 14 shards. + +pub mod ec_shard; +pub mod ec_volume; +pub mod ec_encoder; +pub mod ec_decoder; +pub mod ec_locate; + +pub use ec_shard::{ShardId, EcVolumeShard, TOTAL_SHARDS_COUNT, DATA_SHARDS_COUNT, PARITY_SHARDS_COUNT}; +pub use ec_volume::EcVolume; diff --git a/seaweed-volume/src/storage/mod.rs b/seaweed-volume/src/storage/mod.rs index 151b6055c..50ef03016 100644 --- a/seaweed-volume/src/storage/mod.rs +++ b/seaweed-volume/src/storage/mod.rs @@ -6,3 +6,4 @@ pub mod needle_map; pub mod volume; pub mod disk_location; pub mod store; +pub mod erasure_coding;