diff --git a/seaweed-volume/src/storage/erasure_coding/ec_decoder.rs b/seaweed-volume/src/storage/erasure_coding/ec_decoder.rs index c06ed57a0..e897bcbf5 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_decoder.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_decoder.rs @@ -19,12 +19,13 @@ pub fn write_dat_file_from_shards( collection: &str, volume_id: VolumeId, dat_file_size: i64, + data_shards: usize, ) -> io::Result<()> { let base = volume_file_name(dir, collection, volume_id); let dat_path = format!("{}.dat", base); // Open data shards - let mut shards: Vec = (0..DATA_SHARDS_COUNT as u8) + let mut shards: Vec = (0..data_shards as u8) .map(|i| EcVolumeShard::new(dir, collection, volume_id, i)) .collect(); @@ -36,13 +37,13 @@ pub fn write_dat_file_from_shards( let mut remaining = dat_file_size; let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE; let small_block_size = ERASURE_CODING_SMALL_BLOCK_SIZE; - let large_row_size = (large_block_size * DATA_SHARDS_COUNT) as i64; + let large_row_size = (large_block_size * data_shards) as i64; let mut shard_offset: u64 = 0; // Read large blocks while remaining >= large_row_size { - for i in 0..DATA_SHARDS_COUNT { + for i in 0..data_shards { let mut buf = vec![0u8; large_block_size]; shards[i].read_at(&mut buf, shard_offset)?; let to_write = large_block_size.min(remaining as usize); @@ -57,7 +58,7 @@ pub fn write_dat_file_from_shards( // Read small blocks while remaining > 0 { - for i in 0..DATA_SHARDS_COUNT { + for i in 0..data_shards { let mut buf = vec![0u8; small_block_size]; shards[i].read_at(&mut buf, shard_offset)?; let to_write = small_block_size.min(remaining as usize); @@ -164,14 +165,16 @@ mod tests { let original_dat = std::fs::read(format!("{}/1.dat", dir)).unwrap(); // Encode to EC - ec_encoder::write_ec_files(dir, "", VolumeId(1)).unwrap(); + let data_shards = 10; + let parity_shards = 4; + ec_encoder::write_ec_files(dir, "", VolumeId(1), data_shards, parity_shards).unwrap(); // Delete original .dat and .idx std::fs::remove_file(format!("{}/1.dat", dir)).unwrap(); std::fs::remove_file(format!("{}/1.idx", dir)).unwrap(); // Reconstruct from EC shards - write_dat_file_from_shards(dir, "", VolumeId(1), original_dat_size as i64).unwrap(); + write_dat_file_from_shards(dir, "", VolumeId(1), original_dat_size as i64, data_shards).unwrap(); write_idx_file_from_ec_index(dir, "", VolumeId(1)).unwrap(); // Verify reconstructed .dat matches original diff --git a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs index e1180d4f4..a6fafe7e1 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs @@ -23,6 +23,8 @@ pub fn write_ec_files( dir: &str, collection: &str, volume_id: VolumeId, + data_shards: usize, + parity_shards: usize, ) -> io::Result<()> { let base = volume_file_name(dir, collection, volume_id); let dat_path = format!("{}.dat", base); @@ -35,12 +37,13 @@ pub fn write_ec_files( let dat_file = File::open(&dat_path)?; let dat_size = dat_file.metadata()?.len() as i64; - let rs = ReedSolomon::new(DATA_SHARDS_COUNT, PARITY_SHARDS_COUNT).map_err(|e| { + let rs = ReedSolomon::new(data_shards, parity_shards).map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e)) })?; // Create shard files - let mut shards: Vec = (0..TOTAL_SHARDS_COUNT as u8) + let total_shards = data_shards + parity_shards; + let mut shards: Vec = (0..total_shards as u8) .map(|i| EcVolumeShard::new(dir, collection, volume_id, i)) .collect(); @@ -49,7 +52,7 @@ pub fn write_ec_files( } // Encode in large blocks, then small blocks - encode_dat_file(&dat_file, dat_size, &rs, &mut shards)?; + encode_dat_file(&dat_file, dat_size, &rs, &mut shards, data_shards, parity_shards)?; // Close all shards for shard in &mut shards { @@ -99,9 +102,11 @@ fn encode_dat_file( dat_size: i64, rs: &ReedSolomon, shards: &mut [EcVolumeShard], + data_shards: usize, + parity_shards: usize, ) -> io::Result<()> { let block_size = ERASURE_CODING_SMALL_BLOCK_SIZE; - let row_size = block_size * DATA_SHARDS_COUNT; + let row_size = block_size * data_shards; let mut remaining = dat_size; let mut offset: u64 = 0; @@ -109,7 +114,7 @@ fn encode_dat_file( // Process all data in small blocks to avoid large memory allocations while remaining > 0 { let to_process = remaining.min(row_size as i64); - encode_one_batch(dat_file, offset, block_size, rs, shards)?; + encode_one_batch(dat_file, offset, block_size, rs, shards, data_shards, parity_shards)?; offset += to_process as u64; remaining -= to_process; } @@ -124,10 +129,13 @@ fn encode_one_batch( block_size: usize, rs: &ReedSolomon, shards: &mut [EcVolumeShard], + data_shards: usize, + parity_shards: usize, ) -> io::Result<()> { - // Each batch allocates block_size * TOTAL_SHARDS_COUNT bytes. + let total_shards = data_shards + parity_shards; + // Each batch allocates block_size * total_shards bytes. // With large blocks (1 GiB) this is 14 GiB -- guard against OOM. - let total_alloc = block_size.checked_mul(TOTAL_SHARDS_COUNT).ok_or_else(|| { + let total_alloc = block_size.checked_mul(total_shards).ok_or_else(|| { io::Error::new(io::ErrorKind::InvalidInput, "block_size * shard count overflows usize") })?; const MAX_BATCH_ALLOC: usize = 1024 * 1024 * 1024; // 1 GiB safety limit @@ -136,18 +144,18 @@ fn encode_one_batch( io::ErrorKind::InvalidInput, format!( "batch allocation too large ({} bytes, limit {} bytes); block_size={} shards={}", - total_alloc, MAX_BATCH_ALLOC, block_size, TOTAL_SHARDS_COUNT, + total_alloc, MAX_BATCH_ALLOC, block_size, total_shards, ), )); } // Allocate buffers for all shards - let mut buffers: Vec> = (0..TOTAL_SHARDS_COUNT) + let mut buffers: Vec> = (0..total_shards) .map(|_| vec![0u8; block_size]) .collect(); // Read data shards from .dat file - for i in 0..DATA_SHARDS_COUNT { + for i in 0..data_shards { let read_offset = offset + (i * block_size) as u64; #[cfg(unix)] @@ -211,10 +219,13 @@ mod tests { v.close(); // Encode to EC shards - write_ec_files(dir, "", VolumeId(1)).unwrap(); + let data_shards = 10; + let parity_shards = 4; + let total_shards = data_shards + parity_shards; + write_ec_files(dir, "", VolumeId(1), data_shards, parity_shards).unwrap(); // Verify shard files exist - for i in 0..TOTAL_SHARDS_COUNT { + for i in 0..total_shards { let path = format!("{}/{}.ec{:02}", dir, 1, i); assert!( std::path::Path::new(&path).exists(), @@ -229,11 +240,14 @@ mod tests { #[test] fn test_reed_solomon_basic() { - let rs = ReedSolomon::new(DATA_SHARDS_COUNT, PARITY_SHARDS_COUNT).unwrap(); + let data_shards = 10; + let parity_shards = 4; + let total_shards = data_shards + parity_shards; + let rs = ReedSolomon::new(data_shards, parity_shards).unwrap(); let block_size = 1024; - let mut shards: Vec> = (0..TOTAL_SHARDS_COUNT) + let mut shards: Vec> = (0..total_shards) .map(|i| { - if i < DATA_SHARDS_COUNT { + if i < data_shards { vec![(i as u8).wrapping_mul(7); block_size] } else { vec![0u8; block_size] @@ -245,7 +259,7 @@ mod tests { rs.encode(&mut shards).unwrap(); // Verify parity is non-zero (at least some) - let parity_nonzero: bool = shards[DATA_SHARDS_COUNT..].iter() + let parity_nonzero: bool = shards[data_shards..].iter() .any(|s| s.iter().any(|&b| b != 0)); assert!(parity_nonzero); diff --git a/seaweed-volume/src/storage/erasure_coding/ec_locate.rs b/seaweed-volume/src/storage/erasure_coding/ec_locate.rs index 0aca61688..4c1f06aa2 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_locate.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_locate.rs @@ -18,10 +18,10 @@ pub struct Interval { } impl Interval { - /// Convert an interval to the specific shard ID and offset within that shard. - pub fn to_shard_id_and_offset(&self) -> (ShardId, i64) { - let shard_id = (self.block_index % DATA_SHARDS_COUNT) as ShardId; - let row_index = self.block_index / DATA_SHARDS_COUNT; + pub fn to_shard_id_and_offset(&self, data_shards: u32) -> (ShardId, i64) { + let data_shards_usize = data_shards as usize; + let shard_id = (self.block_index % data_shards_usize) as ShardId; + let row_index = self.block_index / data_shards_usize; let block_size = if self.is_large_block { ERASURE_CODING_LARGE_BLOCK_SIZE as i64 @@ -42,7 +42,7 @@ impl Interval { /// Locate the EC shard intervals needed to read data at the given offset and size. /// /// `shard_size` is the size of a single shard file. -pub fn locate_data(offset: i64, size: Size, shard_size: i64) -> Vec { +pub fn locate_data(offset: i64, size: Size, shard_size: i64, data_shards: u32) -> Vec { let mut intervals = Vec::new(); let data_size = size.0 as i64; @@ -52,8 +52,8 @@ pub fn locate_data(offset: i64, size: Size, shard_size: i64) -> Vec { let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE as i64; let small_block_size = ERASURE_CODING_SMALL_BLOCK_SIZE as i64; - let large_row_size = large_block_size * DATA_SHARDS_COUNT as i64; - let small_row_size = small_block_size * DATA_SHARDS_COUNT as i64; + let large_row_size = large_block_size * data_shards as i64; + let small_row_size = small_block_size * data_shards as i64; // Number of large block rows let n_large_block_rows = if shard_size > 0 { @@ -138,6 +138,10 @@ mod tests { #[test] fn test_interval_to_shard_id() { + let data_shards = 10; + let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE as i64; + let _shard_size = 1024 * 1024; // Example shard size + // Block index 0 → shard 0 let interval = Interval { block_index: 0, @@ -146,7 +150,7 @@ mod tests { is_large_block: true, large_block_rows_count: 1, }; - let (shard_id, offset) = interval.to_shard_id_and_offset(); + let (shard_id, offset) = interval.to_shard_id_and_offset(data_shards); assert_eq!(shard_id, 0); assert_eq!(offset, 100); @@ -158,9 +162,21 @@ mod tests { is_large_block: true, large_block_rows_count: 1, }; - let (shard_id, _offset) = interval.to_shard_id_and_offset(); + let (shard_id, _offset) = interval.to_shard_id_and_offset(data_shards); assert_eq!(shard_id, 5); + // Block index 12 (data_shards=10) → row_index 1, shard_id 2 + let interval = Interval { + block_index: 12, + inner_block_offset: 200, + size: 50, + is_large_block: true, + large_block_rows_count: 5, + }; + let (shard_id, offset) = interval.to_shard_id_and_offset(data_shards); + assert_eq!(shard_id, 2); // 12 % 10 = 2 + assert_eq!(offset, large_block_size + 200); // row 1 offset + inner_block_offset + // Block index 10 → shard 0 (second row) let interval = Interval { block_index: 10, @@ -169,7 +185,7 @@ mod tests { is_large_block: true, large_block_rows_count: 2, }; - let (shard_id, offset) = interval.to_shard_id_and_offset(); + let (shard_id, offset) = interval.to_shard_id_and_offset(data_shards); assert_eq!(shard_id, 0); assert_eq!(offset, ERASURE_CODING_LARGE_BLOCK_SIZE as i64); // row 1 offset } @@ -177,7 +193,7 @@ mod tests { #[test] fn test_locate_data_small_file() { // Small file: 100 bytes at offset 50, shard size = 1MB - let intervals = locate_data(50, Size(100), 1024 * 1024); + let intervals = locate_data(50, Size(100), 1024 * 1024, 10); assert!(!intervals.is_empty()); // Should be a single small block interval (no large block rows for 1MB shard) @@ -187,7 +203,7 @@ mod tests { #[test] fn test_locate_data_empty() { - let intervals = locate_data(0, Size(0), 1024 * 1024); + let intervals = locate_data(0, Size(0), 1024 * 1024, 10); assert!(intervals.is_empty()); } @@ -200,7 +216,7 @@ mod tests { is_large_block: false, large_block_rows_count: 2, }; - let (_shard_id, offset) = interval.to_shard_id_and_offset(); + let (_shard_id, offset) = interval.to_shard_id_and_offset(10); // Should be after 2 large block rows assert_eq!(offset, 2 * ERASURE_CODING_LARGE_BLOCK_SIZE as i64); } diff --git a/seaweed-volume/src/storage/erasure_coding/ec_shard.rs b/seaweed-volume/src/storage/erasure_coding/ec_shard.rs index 389863673..e3a174734 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_shard.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_shard.rs @@ -126,40 +126,42 @@ pub struct ShardBits(pub u32); impl ShardBits { pub fn add_shard_id(&mut self, id: ShardId) { assert!( - (id as usize) < TOTAL_SHARDS_COUNT, - "shard id {} out of bounds (max {})", + (id as usize) < 32, + "shard id {} out of bounds (max 31)", id, - TOTAL_SHARDS_COUNT - 1, ); self.0 |= 1 << id; } pub fn remove_shard_id(&mut self, id: ShardId) { assert!( - (id as usize) < TOTAL_SHARDS_COUNT, - "shard id {} out of bounds (max {})", + (id as usize) < 32, + "shard id {} out of bounds (max 31)", id, - TOTAL_SHARDS_COUNT - 1, ); self.0 &= !(1 << id); } pub fn has_shard_id(&self, id: ShardId) -> bool { - if (id as usize) >= TOTAL_SHARDS_COUNT { + if (id as usize) >= 32 { return false; } self.0 & (1 << id) != 0 } - pub fn shard_count(&self) -> u32 { - self.0.count_ones() + pub fn shard_id_count(&self) -> usize { + self.0.count_ones() as usize } /// Iterator over present shard IDs. pub fn shard_ids(&self) -> Vec { - (0..TOTAL_SHARDS_COUNT as u8) - .filter(|&id| self.has_shard_id(id)) - .collect() + let mut ids = Vec::with_capacity(self.shard_id_count()); + for i in 0..32 { + if self.has_shard_id(i) { + ids.push(i); + } + } + ids } pub fn minus(&self, other: ShardBits) -> ShardBits { @@ -174,19 +176,19 @@ mod tests { #[test] fn test_shard_bits() { let mut bits = ShardBits::default(); - assert_eq!(bits.shard_count(), 0); + assert_eq!(bits.shard_id_count(), 0); bits.add_shard_id(0); bits.add_shard_id(3); bits.add_shard_id(13); - assert_eq!(bits.shard_count(), 3); + assert_eq!(bits.shard_id_count(), 3); assert!(bits.has_shard_id(0)); assert!(bits.has_shard_id(3)); assert!(!bits.has_shard_id(1)); bits.remove_shard_id(3); assert!(!bits.has_shard_id(3)); - assert_eq!(bits.shard_count(), 2); + assert_eq!(bits.shard_id_count(), 2); } #[test] diff --git a/seaweed-volume/src/storage/erasure_coding/ec_volume.rs b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs index dedf01eef..63436c9f2 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_volume.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs @@ -19,12 +19,31 @@ pub struct EcVolume { pub version: Version, pub shards: Vec>, // indexed by ShardId (0..14) pub dat_file_size: i64, + pub data_shards: u32, + pub parity_shards: u32, ecx_file: Option, ecx_file_size: i64, ecj_file: Option, pub disk_type: DiskType, } +pub fn read_ec_shard_config(dir: &str, volume_id: VolumeId) -> (u32, u32) { + let mut data_shards = 10; + let mut parity_shards = 4; + let vif_path = format!("{}/{}.vif", dir, volume_id.0); + if let Ok(vif_content) = std::fs::read_to_string(&vif_path) { + if let Ok(vif_info) = serde_json::from_str::(&vif_content) { + if let Some(ec) = vif_info.ec_shard_config { + if ec.data_shards > 0 && ec.parity_shards > 0 { + data_shards = ec.data_shards; + parity_shards = ec.parity_shards; + } + } + } + } + (data_shards, parity_shards) +} + impl EcVolume { /// Create a new EcVolume. Loads .ecx index and .ecj journal if present. pub fn new( @@ -33,8 +52,11 @@ impl EcVolume { collection: &str, volume_id: VolumeId, ) -> io::Result { - let mut shards = Vec::with_capacity(TOTAL_SHARDS_COUNT); - for _ in 0..TOTAL_SHARDS_COUNT { + let (data_shards, parity_shards) = read_ec_shard_config(dir, volume_id); + + let total_shards = (data_shards + parity_shards) as usize; + let mut shards = Vec::with_capacity(total_shards); + for _ in 0..total_shards { shards.push(None); } @@ -46,6 +68,8 @@ impl EcVolume { version: Version::current(), shards, dat_file_size: 0, + data_shards, + parity_shards, ecx_file: None, ecx_file_size: 0, ecj_file: None, @@ -97,10 +121,11 @@ impl EcVolume { /// Add a shard to this volume. pub fn add_shard(&mut self, mut shard: EcVolumeShard) -> io::Result<()> { let id = shard.shard_id as usize; - if id >= TOTAL_SHARDS_COUNT { + let total_shards = (self.data_shards + self.parity_shards) as usize; + if id >= total_shards { return Err(io::Error::new( io::ErrorKind::InvalidInput, - format!("invalid shard id: {}", id), + format!("invalid shard id: {} (max {})", id, total_shards - 1), )); } shard.open()?; @@ -192,6 +217,7 @@ impl EcVolume { offset.to_actual_offset(), size, shard_size, + self.data_shards, ); Ok(Some((offset, size, intervals))) diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 29258539d..72a97af06 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -159,30 +159,38 @@ pub struct VifRemoteFile { pub extension: String, } +#[derive(serde::Serialize, serde::Deserialize, Default, Clone)] +pub struct VifEcShardConfig { + #[serde(default, rename = "dataShards")] + pub data_shards: u32, + #[serde(default, rename = "parityShards")] + pub parity_shards: u32, +} + /// Serde-compatible representation of VolumeInfo for .vif JSON serialization. /// Matches Go's protobuf JSON format (jsonpb with EmitUnpopulated=true). #[derive(serde::Serialize, serde::Deserialize, Default, Clone)] pub struct VifVolumeInfo { #[serde(default)] pub files: Vec, - #[serde(default)] pub version: u32, - #[serde(default)] - pub replication: String, - #[serde(default, rename = "bytesOffset")] - pub bytes_offset: u32, - #[serde(default, rename = "datFileSize", with = "string_or_i64")] - pub dat_file_size: i64, - #[serde(default, rename = "expireAtSec", with = "string_or_u64")] + pub collection: String, + pub replica_placement: u32, + pub ttl: String, + #[serde(default, rename = "datFileSize")] + pub dat_file_size: u64, + #[serde(default, rename = "expireAtSec")] pub expire_at_sec: u64, #[serde(default, rename = "readOnly")] pub read_only: bool, + #[serde(default, rename = "ecShardConfig", skip_serializing_if = "Option::is_none")] + pub ec_shard_config: Option, } impl VifVolumeInfo { /// Convert from protobuf VolumeInfo to the serde-compatible struct. pub fn from_pb(pb: &PbVolumeInfo) -> Self { - VifVolumeInfo { + Self { files: pb .files .iter() @@ -197,11 +205,16 @@ impl VifVolumeInfo { }) .collect(), version: pb.version, - replication: pb.replication.clone(), - bytes_offset: pb.bytes_offset, + collection: pb.collection.clone(), + replica_placement: pb.replica_placement, + ttl: pb.ttl.clone(), dat_file_size: pb.dat_file_size, expire_at_sec: pb.expire_at_sec, read_only: pb.read_only, + ec_shard_config: pb.ec_shard_config.as_ref().map(|c| VifEcShardConfig { + data_shards: c.data_shards, + parity_shards: c.parity_shards, + }), } } @@ -222,12 +235,16 @@ impl VifVolumeInfo { }) .collect(), version: self.version, - replication: self.replication.clone(), - bytes_offset: self.bytes_offset, + collection: self.collection.clone(), + replica_placement: self.replica_placement, + ttl: self.ttl.clone(), dat_file_size: self.dat_file_size, expire_at_sec: self.expire_at_sec, read_only: self.read_only, - ec_shard_config: None, + ec_shard_config: self.ec_shard_config.as_ref().map(|c| crate::pb::volume_server_pb::EcShardConfig { + data_shards: c.data_shards, + parity_shards: c.parity_shards, + }), } } }