Browse Source

feat(ec): support custom EC shard configurations from .vif files

rust-volume-server
Chris Lu 2 days ago
parent
commit
4fc094259f
  1. 15
      seaweed-volume/src/storage/erasure_coding/ec_decoder.rs
  2. 46
      seaweed-volume/src/storage/erasure_coding/ec_encoder.rs
  3. 42
      seaweed-volume/src/storage/erasure_coding/ec_locate.rs
  4. 32
      seaweed-volume/src/storage/erasure_coding/ec_shard.rs
  5. 34
      seaweed-volume/src/storage/erasure_coding/ec_volume.rs
  6. 45
      seaweed-volume/src/storage/volume.rs

15
seaweed-volume/src/storage/erasure_coding/ec_decoder.rs

@ -19,12 +19,13 @@ pub fn write_dat_file_from_shards(
collection: &str,
volume_id: VolumeId,
dat_file_size: i64,
data_shards: usize,
) -> io::Result<()> {
let base = volume_file_name(dir, collection, volume_id);
let dat_path = format!("{}.dat", base);
// Open data shards
let mut shards: Vec<EcVolumeShard> = (0..DATA_SHARDS_COUNT as u8)
let mut shards: Vec<EcVolumeShard> = (0..data_shards as u8)
.map(|i| EcVolumeShard::new(dir, collection, volume_id, i))
.collect();
@ -36,13 +37,13 @@ pub fn write_dat_file_from_shards(
let mut remaining = dat_file_size;
let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE;
let small_block_size = ERASURE_CODING_SMALL_BLOCK_SIZE;
let large_row_size = (large_block_size * DATA_SHARDS_COUNT) as i64;
let large_row_size = (large_block_size * data_shards) as i64;
let mut shard_offset: u64 = 0;
// Read large blocks
while remaining >= large_row_size {
for i in 0..DATA_SHARDS_COUNT {
for i in 0..data_shards {
let mut buf = vec![0u8; large_block_size];
shards[i].read_at(&mut buf, shard_offset)?;
let to_write = large_block_size.min(remaining as usize);
@ -57,7 +58,7 @@ pub fn write_dat_file_from_shards(
// Read small blocks
while remaining > 0 {
for i in 0..DATA_SHARDS_COUNT {
for i in 0..data_shards {
let mut buf = vec![0u8; small_block_size];
shards[i].read_at(&mut buf, shard_offset)?;
let to_write = small_block_size.min(remaining as usize);
@ -164,14 +165,16 @@ mod tests {
let original_dat = std::fs::read(format!("{}/1.dat", dir)).unwrap();
// Encode to EC
ec_encoder::write_ec_files(dir, "", VolumeId(1)).unwrap();
let data_shards = 10;
let parity_shards = 4;
ec_encoder::write_ec_files(dir, "", VolumeId(1), data_shards, parity_shards).unwrap();
// Delete original .dat and .idx
std::fs::remove_file(format!("{}/1.dat", dir)).unwrap();
std::fs::remove_file(format!("{}/1.idx", dir)).unwrap();
// Reconstruct from EC shards
write_dat_file_from_shards(dir, "", VolumeId(1), original_dat_size as i64).unwrap();
write_dat_file_from_shards(dir, "", VolumeId(1), original_dat_size as i64, data_shards).unwrap();
write_idx_file_from_ec_index(dir, "", VolumeId(1)).unwrap();
// Verify reconstructed .dat matches original

46
seaweed-volume/src/storage/erasure_coding/ec_encoder.rs

@ -23,6 +23,8 @@ pub fn write_ec_files(
dir: &str,
collection: &str,
volume_id: VolumeId,
data_shards: usize,
parity_shards: usize,
) -> io::Result<()> {
let base = volume_file_name(dir, collection, volume_id);
let dat_path = format!("{}.dat", base);
@ -35,12 +37,13 @@ pub fn write_ec_files(
let dat_file = File::open(&dat_path)?;
let dat_size = dat_file.metadata()?.len() as i64;
let rs = ReedSolomon::new(DATA_SHARDS_COUNT, PARITY_SHARDS_COUNT).map_err(|e| {
let rs = ReedSolomon::new(data_shards, parity_shards).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e))
})?;
// Create shard files
let mut shards: Vec<EcVolumeShard> = (0..TOTAL_SHARDS_COUNT as u8)
let total_shards = data_shards + parity_shards;
let mut shards: Vec<EcVolumeShard> = (0..total_shards as u8)
.map(|i| EcVolumeShard::new(dir, collection, volume_id, i))
.collect();
@ -49,7 +52,7 @@ pub fn write_ec_files(
}
// Encode in large blocks, then small blocks
encode_dat_file(&dat_file, dat_size, &rs, &mut shards)?;
encode_dat_file(&dat_file, dat_size, &rs, &mut shards, data_shards, parity_shards)?;
// Close all shards
for shard in &mut shards {
@ -99,9 +102,11 @@ fn encode_dat_file(
dat_size: i64,
rs: &ReedSolomon,
shards: &mut [EcVolumeShard],
data_shards: usize,
parity_shards: usize,
) -> io::Result<()> {
let block_size = ERASURE_CODING_SMALL_BLOCK_SIZE;
let row_size = block_size * DATA_SHARDS_COUNT;
let row_size = block_size * data_shards;
let mut remaining = dat_size;
let mut offset: u64 = 0;
@ -109,7 +114,7 @@ fn encode_dat_file(
// Process all data in small blocks to avoid large memory allocations
while remaining > 0 {
let to_process = remaining.min(row_size as i64);
encode_one_batch(dat_file, offset, block_size, rs, shards)?;
encode_one_batch(dat_file, offset, block_size, rs, shards, data_shards, parity_shards)?;
offset += to_process as u64;
remaining -= to_process;
}
@ -124,10 +129,13 @@ fn encode_one_batch(
block_size: usize,
rs: &ReedSolomon,
shards: &mut [EcVolumeShard],
data_shards: usize,
parity_shards: usize,
) -> io::Result<()> {
// Each batch allocates block_size * TOTAL_SHARDS_COUNT bytes.
let total_shards = data_shards + parity_shards;
// Each batch allocates block_size * total_shards bytes.
// With large blocks (1 GiB) this is 14 GiB -- guard against OOM.
let total_alloc = block_size.checked_mul(TOTAL_SHARDS_COUNT).ok_or_else(|| {
let total_alloc = block_size.checked_mul(total_shards).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "block_size * shard count overflows usize")
})?;
const MAX_BATCH_ALLOC: usize = 1024 * 1024 * 1024; // 1 GiB safety limit
@ -136,18 +144,18 @@ fn encode_one_batch(
io::ErrorKind::InvalidInput,
format!(
"batch allocation too large ({} bytes, limit {} bytes); block_size={} shards={}",
total_alloc, MAX_BATCH_ALLOC, block_size, TOTAL_SHARDS_COUNT,
total_alloc, MAX_BATCH_ALLOC, block_size, total_shards,
),
));
}
// Allocate buffers for all shards
let mut buffers: Vec<Vec<u8>> = (0..TOTAL_SHARDS_COUNT)
let mut buffers: Vec<Vec<u8>> = (0..total_shards)
.map(|_| vec![0u8; block_size])
.collect();
// Read data shards from .dat file
for i in 0..DATA_SHARDS_COUNT {
for i in 0..data_shards {
let read_offset = offset + (i * block_size) as u64;
#[cfg(unix)]
@ -211,10 +219,13 @@ mod tests {
v.close();
// Encode to EC shards
write_ec_files(dir, "", VolumeId(1)).unwrap();
let data_shards = 10;
let parity_shards = 4;
let total_shards = data_shards + parity_shards;
write_ec_files(dir, "", VolumeId(1), data_shards, parity_shards).unwrap();
// Verify shard files exist
for i in 0..TOTAL_SHARDS_COUNT {
for i in 0..total_shards {
let path = format!("{}/{}.ec{:02}", dir, 1, i);
assert!(
std::path::Path::new(&path).exists(),
@ -229,11 +240,14 @@ mod tests {
#[test]
fn test_reed_solomon_basic() {
let rs = ReedSolomon::new(DATA_SHARDS_COUNT, PARITY_SHARDS_COUNT).unwrap();
let data_shards = 10;
let parity_shards = 4;
let total_shards = data_shards + parity_shards;
let rs = ReedSolomon::new(data_shards, parity_shards).unwrap();
let block_size = 1024;
let mut shards: Vec<Vec<u8>> = (0..TOTAL_SHARDS_COUNT)
let mut shards: Vec<Vec<u8>> = (0..total_shards)
.map(|i| {
if i < DATA_SHARDS_COUNT {
if i < data_shards {
vec![(i as u8).wrapping_mul(7); block_size]
} else {
vec![0u8; block_size]
@ -245,7 +259,7 @@ mod tests {
rs.encode(&mut shards).unwrap();
// Verify parity is non-zero (at least some)
let parity_nonzero: bool = shards[DATA_SHARDS_COUNT..].iter()
let parity_nonzero: bool = shards[data_shards..].iter()
.any(|s| s.iter().any(|&b| b != 0));
assert!(parity_nonzero);

42
seaweed-volume/src/storage/erasure_coding/ec_locate.rs

@ -18,10 +18,10 @@ pub struct Interval {
}
impl Interval {
/// Convert an interval to the specific shard ID and offset within that shard.
pub fn to_shard_id_and_offset(&self) -> (ShardId, i64) {
let shard_id = (self.block_index % DATA_SHARDS_COUNT) as ShardId;
let row_index = self.block_index / DATA_SHARDS_COUNT;
pub fn to_shard_id_and_offset(&self, data_shards: u32) -> (ShardId, i64) {
let data_shards_usize = data_shards as usize;
let shard_id = (self.block_index % data_shards_usize) as ShardId;
let row_index = self.block_index / data_shards_usize;
let block_size = if self.is_large_block {
ERASURE_CODING_LARGE_BLOCK_SIZE as i64
@ -42,7 +42,7 @@ impl Interval {
/// Locate the EC shard intervals needed to read data at the given offset and size.
///
/// `shard_size` is the size of a single shard file.
pub fn locate_data(offset: i64, size: Size, shard_size: i64) -> Vec<Interval> {
pub fn locate_data(offset: i64, size: Size, shard_size: i64, data_shards: u32) -> Vec<Interval> {
let mut intervals = Vec::new();
let data_size = size.0 as i64;
@ -52,8 +52,8 @@ pub fn locate_data(offset: i64, size: Size, shard_size: i64) -> Vec<Interval> {
let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE as i64;
let small_block_size = ERASURE_CODING_SMALL_BLOCK_SIZE as i64;
let large_row_size = large_block_size * DATA_SHARDS_COUNT as i64;
let small_row_size = small_block_size * DATA_SHARDS_COUNT as i64;
let large_row_size = large_block_size * data_shards as i64;
let small_row_size = small_block_size * data_shards as i64;
// Number of large block rows
let n_large_block_rows = if shard_size > 0 {
@ -138,6 +138,10 @@ mod tests {
#[test]
fn test_interval_to_shard_id() {
let data_shards = 10;
let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE as i64;
let _shard_size = 1024 * 1024; // Example shard size
// Block index 0 → shard 0
let interval = Interval {
block_index: 0,
@ -146,7 +150,7 @@ mod tests {
is_large_block: true,
large_block_rows_count: 1,
};
let (shard_id, offset) = interval.to_shard_id_and_offset();
let (shard_id, offset) = interval.to_shard_id_and_offset(data_shards);
assert_eq!(shard_id, 0);
assert_eq!(offset, 100);
@ -158,9 +162,21 @@ mod tests {
is_large_block: true,
large_block_rows_count: 1,
};
let (shard_id, _offset) = interval.to_shard_id_and_offset();
let (shard_id, _offset) = interval.to_shard_id_and_offset(data_shards);
assert_eq!(shard_id, 5);
// Block index 12 (data_shards=10) → row_index 1, shard_id 2
let interval = Interval {
block_index: 12,
inner_block_offset: 200,
size: 50,
is_large_block: true,
large_block_rows_count: 5,
};
let (shard_id, offset) = interval.to_shard_id_and_offset(data_shards);
assert_eq!(shard_id, 2); // 12 % 10 = 2
assert_eq!(offset, large_block_size + 200); // row 1 offset + inner_block_offset
// Block index 10 → shard 0 (second row)
let interval = Interval {
block_index: 10,
@ -169,7 +185,7 @@ mod tests {
is_large_block: true,
large_block_rows_count: 2,
};
let (shard_id, offset) = interval.to_shard_id_and_offset();
let (shard_id, offset) = interval.to_shard_id_and_offset(data_shards);
assert_eq!(shard_id, 0);
assert_eq!(offset, ERASURE_CODING_LARGE_BLOCK_SIZE as i64); // row 1 offset
}
@ -177,7 +193,7 @@ mod tests {
#[test]
fn test_locate_data_small_file() {
// Small file: 100 bytes at offset 50, shard size = 1MB
let intervals = locate_data(50, Size(100), 1024 * 1024);
let intervals = locate_data(50, Size(100), 1024 * 1024, 10);
assert!(!intervals.is_empty());
// Should be a single small block interval (no large block rows for 1MB shard)
@ -187,7 +203,7 @@ mod tests {
#[test]
fn test_locate_data_empty() {
let intervals = locate_data(0, Size(0), 1024 * 1024);
let intervals = locate_data(0, Size(0), 1024 * 1024, 10);
assert!(intervals.is_empty());
}
@ -200,7 +216,7 @@ mod tests {
is_large_block: false,
large_block_rows_count: 2,
};
let (_shard_id, offset) = interval.to_shard_id_and_offset();
let (_shard_id, offset) = interval.to_shard_id_and_offset(10);
// Should be after 2 large block rows
assert_eq!(offset, 2 * ERASURE_CODING_LARGE_BLOCK_SIZE as i64);
}

32
seaweed-volume/src/storage/erasure_coding/ec_shard.rs

@ -126,40 +126,42 @@ pub struct ShardBits(pub u32);
impl ShardBits {
pub fn add_shard_id(&mut self, id: ShardId) {
assert!(
(id as usize) < TOTAL_SHARDS_COUNT,
"shard id {} out of bounds (max {})",
(id as usize) < 32,
"shard id {} out of bounds (max 31)",
id,
TOTAL_SHARDS_COUNT - 1,
);
self.0 |= 1 << id;
}
pub fn remove_shard_id(&mut self, id: ShardId) {
assert!(
(id as usize) < TOTAL_SHARDS_COUNT,
"shard id {} out of bounds (max {})",
(id as usize) < 32,
"shard id {} out of bounds (max 31)",
id,
TOTAL_SHARDS_COUNT - 1,
);
self.0 &= !(1 << id);
}
pub fn has_shard_id(&self, id: ShardId) -> bool {
if (id as usize) >= TOTAL_SHARDS_COUNT {
if (id as usize) >= 32 {
return false;
}
self.0 & (1 << id) != 0
}
pub fn shard_count(&self) -> u32 {
self.0.count_ones()
pub fn shard_id_count(&self) -> usize {
self.0.count_ones() as usize
}
/// Iterator over present shard IDs.
pub fn shard_ids(&self) -> Vec<ShardId> {
(0..TOTAL_SHARDS_COUNT as u8)
.filter(|&id| self.has_shard_id(id))
.collect()
let mut ids = Vec::with_capacity(self.shard_id_count());
for i in 0..32 {
if self.has_shard_id(i) {
ids.push(i);
}
}
ids
}
pub fn minus(&self, other: ShardBits) -> ShardBits {
@ -174,19 +176,19 @@ mod tests {
#[test]
fn test_shard_bits() {
let mut bits = ShardBits::default();
assert_eq!(bits.shard_count(), 0);
assert_eq!(bits.shard_id_count(), 0);
bits.add_shard_id(0);
bits.add_shard_id(3);
bits.add_shard_id(13);
assert_eq!(bits.shard_count(), 3);
assert_eq!(bits.shard_id_count(), 3);
assert!(bits.has_shard_id(0));
assert!(bits.has_shard_id(3));
assert!(!bits.has_shard_id(1));
bits.remove_shard_id(3);
assert!(!bits.has_shard_id(3));
assert_eq!(bits.shard_count(), 2);
assert_eq!(bits.shard_id_count(), 2);
}
#[test]

34
seaweed-volume/src/storage/erasure_coding/ec_volume.rs

@ -19,12 +19,31 @@ pub struct EcVolume {
pub version: Version,
pub shards: Vec<Option<EcVolumeShard>>, // indexed by ShardId (0..14)
pub dat_file_size: i64,
pub data_shards: u32,
pub parity_shards: u32,
ecx_file: Option<File>,
ecx_file_size: i64,
ecj_file: Option<File>,
pub disk_type: DiskType,
}
pub fn read_ec_shard_config(dir: &str, volume_id: VolumeId) -> (u32, u32) {
let mut data_shards = 10;
let mut parity_shards = 4;
let vif_path = format!("{}/{}.vif", dir, volume_id.0);
if let Ok(vif_content) = std::fs::read_to_string(&vif_path) {
if let Ok(vif_info) = serde_json::from_str::<crate::storage::volume::VifVolumeInfo>(&vif_content) {
if let Some(ec) = vif_info.ec_shard_config {
if ec.data_shards > 0 && ec.parity_shards > 0 {
data_shards = ec.data_shards;
parity_shards = ec.parity_shards;
}
}
}
}
(data_shards, parity_shards)
}
impl EcVolume {
/// Create a new EcVolume. Loads .ecx index and .ecj journal if present.
pub fn new(
@ -33,8 +52,11 @@ impl EcVolume {
collection: &str,
volume_id: VolumeId,
) -> io::Result<Self> {
let mut shards = Vec::with_capacity(TOTAL_SHARDS_COUNT);
for _ in 0..TOTAL_SHARDS_COUNT {
let (data_shards, parity_shards) = read_ec_shard_config(dir, volume_id);
let total_shards = (data_shards + parity_shards) as usize;
let mut shards = Vec::with_capacity(total_shards);
for _ in 0..total_shards {
shards.push(None);
}
@ -46,6 +68,8 @@ impl EcVolume {
version: Version::current(),
shards,
dat_file_size: 0,
data_shards,
parity_shards,
ecx_file: None,
ecx_file_size: 0,
ecj_file: None,
@ -97,10 +121,11 @@ impl EcVolume {
/// Add a shard to this volume.
pub fn add_shard(&mut self, mut shard: EcVolumeShard) -> io::Result<()> {
let id = shard.shard_id as usize;
if id >= TOTAL_SHARDS_COUNT {
let total_shards = (self.data_shards + self.parity_shards) as usize;
if id >= total_shards {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("invalid shard id: {}", id),
format!("invalid shard id: {} (max {})", id, total_shards - 1),
));
}
shard.open()?;
@ -192,6 +217,7 @@ impl EcVolume {
offset.to_actual_offset(),
size,
shard_size,
self.data_shards,
);
Ok(Some((offset, size, intervals)))

45
seaweed-volume/src/storage/volume.rs

@ -159,30 +159,38 @@ pub struct VifRemoteFile {
pub extension: String,
}
#[derive(serde::Serialize, serde::Deserialize, Default, Clone)]
pub struct VifEcShardConfig {
#[serde(default, rename = "dataShards")]
pub data_shards: u32,
#[serde(default, rename = "parityShards")]
pub parity_shards: u32,
}
/// Serde-compatible representation of VolumeInfo for .vif JSON serialization.
/// Matches Go's protobuf JSON format (jsonpb with EmitUnpopulated=true).
#[derive(serde::Serialize, serde::Deserialize, Default, Clone)]
pub struct VifVolumeInfo {
#[serde(default)]
pub files: Vec<VifRemoteFile>,
#[serde(default)]
pub version: u32,
#[serde(default)]
pub replication: String,
#[serde(default, rename = "bytesOffset")]
pub bytes_offset: u32,
#[serde(default, rename = "datFileSize", with = "string_or_i64")]
pub dat_file_size: i64,
#[serde(default, rename = "expireAtSec", with = "string_or_u64")]
pub collection: String,
pub replica_placement: u32,
pub ttl: String,
#[serde(default, rename = "datFileSize")]
pub dat_file_size: u64,
#[serde(default, rename = "expireAtSec")]
pub expire_at_sec: u64,
#[serde(default, rename = "readOnly")]
pub read_only: bool,
#[serde(default, rename = "ecShardConfig", skip_serializing_if = "Option::is_none")]
pub ec_shard_config: Option<VifEcShardConfig>,
}
impl VifVolumeInfo {
/// Convert from protobuf VolumeInfo to the serde-compatible struct.
pub fn from_pb(pb: &PbVolumeInfo) -> Self {
VifVolumeInfo {
Self {
files: pb
.files
.iter()
@ -197,11 +205,16 @@ impl VifVolumeInfo {
})
.collect(),
version: pb.version,
replication: pb.replication.clone(),
bytes_offset: pb.bytes_offset,
collection: pb.collection.clone(),
replica_placement: pb.replica_placement,
ttl: pb.ttl.clone(),
dat_file_size: pb.dat_file_size,
expire_at_sec: pb.expire_at_sec,
read_only: pb.read_only,
ec_shard_config: pb.ec_shard_config.as_ref().map(|c| VifEcShardConfig {
data_shards: c.data_shards,
parity_shards: c.parity_shards,
}),
}
}
@ -222,12 +235,16 @@ impl VifVolumeInfo {
})
.collect(),
version: self.version,
replication: self.replication.clone(),
bytes_offset: self.bytes_offset,
collection: self.collection.clone(),
replica_placement: self.replica_placement,
ttl: self.ttl.clone(),
dat_file_size: self.dat_file_size,
expire_at_sec: self.expire_at_sec,
read_only: self.read_only,
ec_shard_config: None,
ec_shard_config: self.ec_shard_config.as_ref().map(|c| crate::pb::volume_server_pb::EcShardConfig {
data_shards: c.data_shards,
parity_shards: c.parity_shards,
}),
}
}
}

Loading…
Cancel
Save