Browse Source

Add erasure coding module with encode/decode and shard management

Implements Reed-Solomon 10+4 erasure coding matching Go's
erasure_coding package. Features:
- EcVolumeShard: individual shard file (.ec00-.ec13) management
- ShardBits: bitmap for tracking locally available shards
- EcVolume: manages shards + sorted .ecx index + .ecj deletion journal
- Binary search needle lookup in sorted .ecx index
- ec_encoder: encodes .dat file into 14 shard files with large/small blocks
- ec_decoder: reconstructs .dat + .idx from data shards and EC index
- ec_locate: maps needle offset/size to shard intervals
- Full round-trip test: volume → EC encode → delete originals → reconstruct → verify
- 15 unit tests including Reed-Solomon reconstruction
rust-volume-server
Chris Lu 5 days ago
parent
commit
38b1a6d6a6
  1. 197
      seaweed-volume/src/storage/erasure_coding/ec_decoder.rs
  2. 248
      seaweed-volume/src/storage/erasure_coding/ec_encoder.rs
  3. 207
      seaweed-volume/src/storage/erasure_coding/ec_locate.rs
  4. 202
      seaweed-volume/src/storage/erasure_coding/ec_shard.rs
  5. 352
      seaweed-volume/src/storage/erasure_coding/ec_volume.rs
  6. 13
      seaweed-volume/src/storage/erasure_coding/mod.rs
  7. 1
      seaweed-volume/src/storage/mod.rs

197
seaweed-volume/src/storage/erasure_coding/ec_decoder.rs

@ -0,0 +1,197 @@
//! EC decoding: reconstruct a .dat file from EC shards.
//!
//! Rebuilds the original .dat + .idx files from data shards (.ec00-.ec09)
//! and the sorted index (.ecx) + deletion journal (.ecj).
use std::fs::File;
use std::io::{self, Read, Write};
use crate::storage::erasure_coding::ec_shard::*;
use crate::storage::idx;
use crate::storage::types::*;
use crate::storage::volume::volume_file_name;
/// Reconstruct a .dat file from EC data shards.
///
/// Reads from .ec00-.ec09 and writes a new .dat file.
pub fn write_dat_file_from_shards(
dir: &str,
collection: &str,
volume_id: VolumeId,
dat_file_size: i64,
) -> io::Result<()> {
let base = volume_file_name(dir, collection, volume_id);
let dat_path = format!("{}.dat", base);
// Open data shards
let mut shards: Vec<EcVolumeShard> = (0..DATA_SHARDS_COUNT as u8)
.map(|i| EcVolumeShard::new(dir, collection, volume_id, i))
.collect();
for shard in &mut shards {
shard.open()?;
}
let mut dat_file = File::create(&dat_path)?;
let mut remaining = dat_file_size;
let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE;
let small_block_size = ERASURE_CODING_SMALL_BLOCK_SIZE;
let large_row_size = (large_block_size * DATA_SHARDS_COUNT) as i64;
let mut shard_offset: u64 = 0;
// Read large blocks
while remaining >= large_row_size {
for i in 0..DATA_SHARDS_COUNT {
let mut buf = vec![0u8; large_block_size];
shards[i].read_at(&mut buf, shard_offset)?;
let to_write = large_block_size.min(remaining as usize);
dat_file.write_all(&buf[..to_write])?;
remaining -= to_write as i64;
if remaining <= 0 {
break;
}
}
shard_offset += large_block_size as u64;
}
// Read small blocks
while remaining > 0 {
for i in 0..DATA_SHARDS_COUNT {
let mut buf = vec![0u8; small_block_size];
shards[i].read_at(&mut buf, shard_offset)?;
let to_write = small_block_size.min(remaining as usize);
dat_file.write_all(&buf[..to_write])?;
remaining -= to_write as i64;
if remaining <= 0 {
break;
}
}
shard_offset += small_block_size as u64;
}
for shard in &mut shards {
shard.close();
}
dat_file.sync_all()?;
Ok(())
}
/// Write .idx file from .ecx index + .ecj deletion journal.
///
/// Copies sorted .ecx entries to .idx, then appends tombstones for
/// deleted needles from .ecj.
pub fn write_idx_file_from_ec_index(
dir: &str,
collection: &str,
volume_id: VolumeId,
) -> io::Result<()> {
let base = volume_file_name(dir, collection, volume_id);
let ecx_path = format!("{}.ecx", base);
let ecj_path = format!("{}.ecj", base);
let idx_path = format!("{}.idx", base);
// Copy .ecx to .idx
std::fs::copy(&ecx_path, &idx_path)?;
// Append deletions from .ecj as tombstones
if std::path::Path::new(&ecj_path).exists() {
let ecj_data = std::fs::read(&ecj_path)?;
if !ecj_data.is_empty() {
let mut idx_file = std::fs::OpenOptions::new()
.write(true)
.append(true)
.open(&idx_path)?;
let count = ecj_data.len() / NEEDLE_ID_SIZE;
for i in 0..count {
let start = i * NEEDLE_ID_SIZE;
let needle_id = NeedleId::from_bytes(&ecj_data[start..start + NEEDLE_ID_SIZE]);
idx::write_index_entry(
&mut idx_file,
needle_id,
Offset::default(),
TOMBSTONE_FILE_SIZE,
)?;
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::erasure_coding::ec_encoder;
use crate::storage::needle::needle::Needle;
use crate::storage::needle_map::NeedleMapKind;
use crate::storage::volume::Volume;
use tempfile::TempDir;
#[test]
fn test_ec_full_round_trip() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
// Create volume with data
let mut v = Volume::new(
dir, dir, "", VolumeId(1),
NeedleMapKind::InMemory, None, None, 0, Version::current(),
).unwrap();
let test_data: Vec<(NeedleId, Vec<u8>)> = (1..=3).map(|i| {
let data = format!("EC round trip data for needle {}", i);
(NeedleId(i), data.into_bytes())
}).collect();
for (id, data) in &test_data {
let mut n = Needle {
id: *id,
cookie: Cookie(id.0 as u32),
data: data.clone(),
data_size: data.len() as u32,
..Needle::default()
};
v.write_needle(&mut n, true).unwrap();
}
v.sync_to_disk().unwrap();
let original_dat_size = v.dat_file_size().unwrap();
v.close();
// Read original .dat for comparison
let original_dat = std::fs::read(format!("{}/1.dat", dir)).unwrap();
// Encode to EC
ec_encoder::write_ec_files(dir, "", VolumeId(1)).unwrap();
// Delete original .dat and .idx
std::fs::remove_file(format!("{}/1.dat", dir)).unwrap();
std::fs::remove_file(format!("{}/1.idx", dir)).unwrap();
// Reconstruct from EC shards
write_dat_file_from_shards(dir, "", VolumeId(1), original_dat_size as i64).unwrap();
write_idx_file_from_ec_index(dir, "", VolumeId(1)).unwrap();
// Verify reconstructed .dat matches original
let reconstructed_dat = std::fs::read(format!("{}/1.dat", dir)).unwrap();
assert_eq!(
original_dat[..original_dat_size as usize],
reconstructed_dat[..original_dat_size as usize],
"reconstructed .dat should match original"
);
// Verify we can load and read from reconstructed volume
let v2 = Volume::new(
dir, dir, "", VolumeId(1),
NeedleMapKind::InMemory, None, None, 0, Version::current(),
).unwrap();
for (id, expected_data) in &test_data {
let mut n = Needle { id: *id, ..Needle::default() };
v2.read_needle(&mut n).unwrap();
assert_eq!(&n.data, expected_data, "needle {} data should match", id);
}
}
}

248
seaweed-volume/src/storage/erasure_coding/ec_encoder.rs

@ -0,0 +1,248 @@
//! EC encoding: convert a .dat file into 10 data + 4 parity shards.
//!
//! Uses Reed-Solomon erasure coding. The .dat file is split into blocks
//! (1GB large, 1MB small) and encoded across 14 shard files.
use std::fs::File;
use std::io::{self, Read, Seek, SeekFrom, Write};
use reed_solomon_erasure::galois_8::ReedSolomon;
use crate::storage::erasure_coding::ec_shard::*;
use crate::storage::idx;
use crate::storage::types::*;
use crate::storage::volume::volume_file_name;
/// Encode a .dat file into EC shard files.
///
/// Creates .ec00-.ec13 files in the same directory.
/// Also creates a sorted .ecx index from the .idx file.
pub fn write_ec_files(
dir: &str,
collection: &str,
volume_id: VolumeId,
) -> io::Result<()> {
let base = volume_file_name(dir, collection, volume_id);
let dat_path = format!("{}.dat", base);
let idx_path = format!("{}.idx", base);
// Create sorted .ecx from .idx
write_sorted_ecx_from_idx(&idx_path, &format!("{}.ecx", base))?;
// Encode .dat into shards
let dat_file = File::open(&dat_path)?;
let dat_size = dat_file.metadata()?.len() as i64;
let rs = ReedSolomon::new(DATA_SHARDS_COUNT, PARITY_SHARDS_COUNT).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e))
})?;
// Create shard files
let mut shards: Vec<EcVolumeShard> = (0..TOTAL_SHARDS_COUNT as u8)
.map(|i| EcVolumeShard::new(dir, collection, volume_id, i))
.collect();
for shard in &mut shards {
shard.create()?;
}
// Encode in large blocks, then small blocks
encode_dat_file(&dat_file, dat_size, &rs, &mut shards)?;
// Close all shards
for shard in &mut shards {
shard.close();
}
Ok(())
}
/// Write sorted .ecx index from .idx file.
fn write_sorted_ecx_from_idx(idx_path: &str, ecx_path: &str) -> io::Result<()> {
if !std::path::Path::new(idx_path).exists() {
return Err(io::Error::new(io::ErrorKind::NotFound, "idx file not found"));
}
// Read all idx entries
let mut idx_file = File::open(idx_path)?;
let mut entries: Vec<(NeedleId, Offset, Size)> = Vec::new();
idx::walk_index_file(&mut idx_file, 0, |key, offset, size| {
entries.push((key, offset, size));
Ok(())
})?;
// Sort by NeedleId
entries.sort_by_key(|&(key, _, _)| key);
// Remove duplicates (keep last entry for each key)
entries.dedup_by_key(|entry| entry.0);
// Write sorted entries to .ecx
let mut ecx_file = File::create(ecx_path)?;
for &(key, offset, size) in &entries {
idx::write_index_entry(&mut ecx_file, key, offset, size)?;
}
Ok(())
}
/// Encode the .dat file data into shard files.
fn encode_dat_file(
dat_file: &File,
dat_size: i64,
rs: &ReedSolomon,
shards: &mut [EcVolumeShard],
) -> io::Result<()> {
let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE;
let small_block_size = ERASURE_CODING_SMALL_BLOCK_SIZE;
let large_row_size = large_block_size * DATA_SHARDS_COUNT;
let mut remaining = dat_size;
let mut offset: u64 = 0;
// Process large blocks
while remaining >= large_row_size as i64 {
encode_one_batch(dat_file, offset, large_block_size, rs, shards)?;
offset += large_row_size as u64;
remaining -= large_row_size as i64;
}
// Process remaining data in small blocks
while remaining > 0 {
let row_size = small_block_size * DATA_SHARDS_COUNT;
let to_process = remaining.min(row_size as i64);
encode_one_batch(dat_file, offset, small_block_size, rs, shards)?;
offset += to_process as u64;
remaining -= to_process;
}
Ok(())
}
/// Encode one batch (row) of data.
fn encode_one_batch(
dat_file: &File,
offset: u64,
block_size: usize,
rs: &ReedSolomon,
shards: &mut [EcVolumeShard],
) -> io::Result<()> {
// Allocate buffers for all shards
let mut buffers: Vec<Vec<u8>> = (0..TOTAL_SHARDS_COUNT)
.map(|_| vec![0u8; block_size])
.collect();
// Read data shards from .dat file
for i in 0..DATA_SHARDS_COUNT {
let read_offset = offset + (i * block_size) as u64;
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
// Read what we can; zeros fill the rest (already initialized)
let _ = dat_file.read_at(&mut buffers[i], read_offset);
}
}
// Encode parity shards
rs.encode(&mut buffers).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("reed-solomon encode: {:?}", e))
})?;
// Write all shard buffers to files
for (i, buf) in buffers.iter().enumerate() {
shards[i].write_all(buf)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::needle::needle::Needle;
use crate::storage::needle_map::NeedleMapKind;
use crate::storage::volume::Volume;
use tempfile::TempDir;
#[test]
fn test_ec_encode_decode_round_trip() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
// Create a volume with some data
let mut v = Volume::new(
dir, dir, "", VolumeId(1),
NeedleMapKind::InMemory, None, None, 0, Version::current(),
).unwrap();
for i in 1..=5 {
let data = format!("test data for needle {}", i);
let mut n = Needle {
id: NeedleId(i),
cookie: Cookie(i as u32),
data: data.as_bytes().to_vec(),
data_size: data.len() as u32,
..Needle::default()
};
v.write_needle(&mut n, true).unwrap();
}
v.sync_to_disk().unwrap();
v.close();
// Encode to EC shards
write_ec_files(dir, "", VolumeId(1)).unwrap();
// Verify shard files exist
for i in 0..TOTAL_SHARDS_COUNT {
let path = format!("{}/{}.ec{:02}", dir, 1, i);
assert!(
std::path::Path::new(&path).exists(),
"shard file {} should exist", path
);
}
// Verify .ecx exists
let ecx_path = format!("{}/1.ecx", dir);
assert!(std::path::Path::new(&ecx_path).exists());
}
#[test]
fn test_reed_solomon_basic() {
let rs = ReedSolomon::new(DATA_SHARDS_COUNT, PARITY_SHARDS_COUNT).unwrap();
let block_size = 1024;
let mut shards: Vec<Vec<u8>> = (0..TOTAL_SHARDS_COUNT)
.map(|i| {
if i < DATA_SHARDS_COUNT {
vec![(i as u8).wrapping_mul(7); block_size]
} else {
vec![0u8; block_size]
}
})
.collect();
// Encode
rs.encode(&mut shards).unwrap();
// Verify parity is non-zero (at least some)
let parity_nonzero: bool = shards[DATA_SHARDS_COUNT..].iter()
.any(|s| s.iter().any(|&b| b != 0));
assert!(parity_nonzero);
// Simulate losing 4 shards and reconstructing
let original_0 = shards[0].clone();
let original_1 = shards[1].clone();
let mut shard_opts: Vec<Option<Vec<u8>>> = shards.into_iter().map(Some).collect();
shard_opts[0] = None;
shard_opts[1] = None;
shard_opts[2] = None;
shard_opts[3] = None;
rs.reconstruct(&mut shard_opts).unwrap();
assert_eq!(shard_opts[0].as_ref().unwrap(), &original_0);
assert_eq!(shard_opts[1].as_ref().unwrap(), &original_1);
}
}

207
seaweed-volume/src/storage/erasure_coding/ec_locate.rs

@ -0,0 +1,207 @@
//! EC data location: maps needle offset/size to shard intervals.
//!
//! Determines which shard(s) contain data for a given needle and at what
//! offsets within those shards. Handles both large (1GB) and small (1MB)
//! block sections.
use crate::storage::erasure_coding::ec_shard::*;
use crate::storage::types::*;
/// An interval to read from EC shards.
#[derive(Debug, Clone)]
pub struct Interval {
pub block_index: usize,
pub inner_block_offset: i64,
pub size: i64,
pub is_large_block: bool,
pub large_block_rows_count: usize,
}
impl Interval {
/// Convert an interval to the specific shard ID and offset within that shard.
pub fn to_shard_id_and_offset(&self) -> (ShardId, i64) {
let shard_id = (self.block_index % DATA_SHARDS_COUNT) as ShardId;
let row_index = self.block_index / DATA_SHARDS_COUNT;
let block_size = if self.is_large_block {
ERASURE_CODING_LARGE_BLOCK_SIZE as i64
} else {
ERASURE_CODING_SMALL_BLOCK_SIZE as i64
};
let mut offset = row_index as i64 * block_size + self.inner_block_offset;
if !self.is_large_block {
// Small blocks come after large blocks in the shard file
offset += self.large_block_rows_count as i64 * ERASURE_CODING_LARGE_BLOCK_SIZE as i64;
}
(shard_id, offset)
}
}
/// Locate the EC shard intervals needed to read data at the given offset and size.
///
/// `shard_size` is the size of a single shard file.
pub fn locate_data(offset: i64, size: Size, shard_size: i64) -> Vec<Interval> {
let mut intervals = Vec::new();
let data_size = size.0 as i64;
if data_size <= 0 || shard_size <= 0 {
return intervals;
}
let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE as i64;
let small_block_size = ERASURE_CODING_SMALL_BLOCK_SIZE as i64;
let large_row_size = large_block_size * DATA_SHARDS_COUNT as i64;
let small_row_size = small_block_size * DATA_SHARDS_COUNT as i64;
// Number of large block rows
let n_large_block_rows = if shard_size > 0 {
((shard_size - 1) / large_block_size) as usize
} else {
0
};
let large_section_size = n_large_block_rows as i64 * large_row_size;
let mut remaining_offset = offset;
let mut remaining_size = data_size;
// In large block section?
if remaining_offset < large_section_size {
let available_in_large = large_section_size - remaining_offset;
let to_read = remaining_size.min(available_in_large);
add_intervals(
&mut intervals,
remaining_offset,
to_read,
large_block_size,
large_row_size,
true,
n_large_block_rows,
);
remaining_offset += to_read;
remaining_size -= to_read;
}
// In small block section?
if remaining_size > 0 {
let small_offset = remaining_offset - large_section_size;
add_intervals(
&mut intervals,
small_offset,
remaining_size,
small_block_size,
small_row_size,
false,
n_large_block_rows,
);
}
intervals
}
fn add_intervals(
intervals: &mut Vec<Interval>,
offset: i64,
size: i64,
block_size: i64,
row_size: i64,
is_large_block: bool,
large_block_rows_count: usize,
) {
let mut pos = offset;
let end = offset + size;
while pos < end {
let block_index = (pos / block_size) as usize;
let inner_offset = pos % block_size;
let remaining_in_block = block_size - inner_offset;
let interval_size = remaining_in_block.min(end - pos);
intervals.push(Interval {
block_index,
inner_block_offset: inner_offset,
size: interval_size,
is_large_block,
large_block_rows_count,
});
pos += interval_size;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_interval_to_shard_id() {
// Block index 0 → shard 0
let interval = Interval {
block_index: 0,
inner_block_offset: 100,
size: 50,
is_large_block: true,
large_block_rows_count: 1,
};
let (shard_id, offset) = interval.to_shard_id_and_offset();
assert_eq!(shard_id, 0);
assert_eq!(offset, 100);
// Block index 5 → shard 5
let interval = Interval {
block_index: 5,
inner_block_offset: 0,
size: 1024,
is_large_block: true,
large_block_rows_count: 1,
};
let (shard_id, _offset) = interval.to_shard_id_and_offset();
assert_eq!(shard_id, 5);
// Block index 10 → shard 0 (second row)
let interval = Interval {
block_index: 10,
inner_block_offset: 0,
size: 100,
is_large_block: true,
large_block_rows_count: 2,
};
let (shard_id, offset) = interval.to_shard_id_and_offset();
assert_eq!(shard_id, 0);
assert_eq!(offset, ERASURE_CODING_LARGE_BLOCK_SIZE as i64); // row 1 offset
}
#[test]
fn test_locate_data_small_file() {
// Small file: 100 bytes at offset 50, shard size = 1MB
let intervals = locate_data(50, Size(100), 1024 * 1024);
assert!(!intervals.is_empty());
// Should be a single small block interval (no large block rows for 1MB shard)
assert_eq!(intervals.len(), 1);
assert!(!intervals[0].is_large_block);
}
#[test]
fn test_locate_data_empty() {
let intervals = locate_data(0, Size(0), 1024 * 1024);
assert!(intervals.is_empty());
}
#[test]
fn test_small_block_after_large() {
let interval = Interval {
block_index: 0,
inner_block_offset: 0,
size: 100,
is_large_block: false,
large_block_rows_count: 2,
};
let (_shard_id, offset) = interval.to_shard_id_and_offset();
// Should be after 2 large block rows
assert_eq!(offset, 2 * ERASURE_CODING_LARGE_BLOCK_SIZE as i64);
}
}

202
seaweed-volume/src/storage/erasure_coding/ec_shard.rs

@ -0,0 +1,202 @@
//! EcVolumeShard: a single shard file (.ec00-.ec13) of an erasure-coded volume.
use std::fs::{self, File, OpenOptions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::Path;
use crate::storage::types::*;
pub const DATA_SHARDS_COUNT: usize = 10;
pub const PARITY_SHARDS_COUNT: usize = 4;
pub const TOTAL_SHARDS_COUNT: usize = DATA_SHARDS_COUNT + PARITY_SHARDS_COUNT;
pub const ERASURE_CODING_LARGE_BLOCK_SIZE: usize = 1024 * 1024 * 1024; // 1GB
pub const ERASURE_CODING_SMALL_BLOCK_SIZE: usize = 1024 * 1024; // 1MB
pub type ShardId = u8;
/// A single erasure-coded shard file.
pub struct EcVolumeShard {
pub volume_id: VolumeId,
pub shard_id: ShardId,
pub collection: String,
pub dir: String,
pub disk_type: DiskType,
ecd_file: Option<File>,
ecd_file_size: i64,
}
impl EcVolumeShard {
/// Create a new shard reference (does not open the file).
pub fn new(dir: &str, collection: &str, volume_id: VolumeId, shard_id: ShardId) -> Self {
EcVolumeShard {
volume_id,
shard_id,
collection: collection.to_string(),
dir: dir.to_string(),
disk_type: DiskType::default(),
ecd_file: None,
ecd_file_size: 0,
}
}
/// Shard file name, e.g. "dir/collection_42.ec03"
pub fn file_name(&self) -> String {
let base = crate::storage::volume::volume_file_name(&self.dir, &self.collection, self.volume_id);
format!("{}.ec{:02}", base, self.shard_id)
}
/// Open the shard file for reading.
pub fn open(&mut self) -> io::Result<()> {
let path = self.file_name();
let file = File::open(&path)?;
self.ecd_file_size = file.metadata()?.len() as i64;
self.ecd_file = Some(file);
Ok(())
}
/// Create the shard file for writing.
pub fn create(&mut self) -> io::Result<()> {
let path = self.file_name();
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)?;
self.ecd_file = Some(file);
self.ecd_file_size = 0;
Ok(())
}
/// Read data at a specific offset.
pub fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
let file = self.ecd_file.as_ref().ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "shard file not open")
})?;
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
file.read_at(buf, offset)
}
}
/// Write data to the shard file (appends).
pub fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
let file = self.ecd_file.as_mut().ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "shard file not open")
})?;
file.write_all(data)?;
self.ecd_file_size += data.len() as i64;
Ok(())
}
pub fn file_size(&self) -> i64 {
self.ecd_file_size
}
/// Close the shard file.
pub fn close(&mut self) {
if let Some(ref file) = self.ecd_file {
let _ = file.sync_all();
}
self.ecd_file = None;
}
/// Delete the shard file from disk.
pub fn destroy(&mut self) {
self.close();
let _ = fs::remove_file(self.file_name());
}
}
/// ShardBits: bitmap tracking which shards are present.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct ShardBits(pub u32);
impl ShardBits {
pub fn add_shard_id(&mut self, id: ShardId) {
self.0 |= 1 << id;
}
pub fn remove_shard_id(&mut self, id: ShardId) {
self.0 &= !(1 << id);
}
pub fn has_shard_id(&self, id: ShardId) -> bool {
self.0 & (1 << id) != 0
}
pub fn shard_count(&self) -> u32 {
self.0.count_ones()
}
/// Iterator over present shard IDs.
pub fn shard_ids(&self) -> Vec<ShardId> {
(0..TOTAL_SHARDS_COUNT as u8)
.filter(|&id| self.has_shard_id(id))
.collect()
}
pub fn minus(&self, other: ShardBits) -> ShardBits {
ShardBits(self.0 & !other.0)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_shard_bits() {
let mut bits = ShardBits::default();
assert_eq!(bits.shard_count(), 0);
bits.add_shard_id(0);
bits.add_shard_id(3);
bits.add_shard_id(13);
assert_eq!(bits.shard_count(), 3);
assert!(bits.has_shard_id(0));
assert!(bits.has_shard_id(3));
assert!(!bits.has_shard_id(1));
bits.remove_shard_id(3);
assert!(!bits.has_shard_id(3));
assert_eq!(bits.shard_count(), 2);
}
#[test]
fn test_shard_bits_ids() {
let mut bits = ShardBits::default();
bits.add_shard_id(1);
bits.add_shard_id(5);
bits.add_shard_id(9);
assert_eq!(bits.shard_ids(), vec![1, 5, 9]);
}
#[test]
fn test_shard_bits_minus() {
let mut a = ShardBits::default();
a.add_shard_id(0);
a.add_shard_id(1);
a.add_shard_id(2);
let mut b = ShardBits::default();
b.add_shard_id(1);
let c = a.minus(b);
assert_eq!(c.shard_ids(), vec![0, 2]);
}
#[test]
fn test_shard_file_name() {
let shard = EcVolumeShard::new("/data", "pics", VolumeId(42), 3);
assert_eq!(shard.file_name(), "/data/pics_42.ec03");
}
#[test]
fn test_shard_file_name_no_collection() {
let shard = EcVolumeShard::new("/data", "", VolumeId(7), 13);
assert_eq!(shard.file_name(), "/data/7.ec13");
}
}

352
seaweed-volume/src/storage/erasure_coding/ec_volume.rs

@ -0,0 +1,352 @@
//! EcVolume: an erasure-coded volume with up to 14 shards.
//!
//! Each EcVolume has a sorted index (.ecx) and a deletion journal (.ecj).
//! Shards (.ec00-.ec13) may be distributed across multiple servers.
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use crate::storage::erasure_coding::ec_locate;
use crate::storage::erasure_coding::ec_shard::*;
use crate::storage::types::*;
/// An erasure-coded volume managing its local shards and index.
pub struct EcVolume {
pub volume_id: VolumeId,
pub collection: String,
pub dir: String,
pub dir_idx: String,
pub version: Version,
pub shards: Vec<Option<EcVolumeShard>>, // indexed by ShardId (0..14)
pub dat_file_size: i64,
ecx_file: Option<File>,
ecx_file_size: i64,
ecj_file: Option<File>,
pub disk_type: DiskType,
}
impl EcVolume {
/// Create a new EcVolume. Loads .ecx index and .ecj journal if present.
pub fn new(
dir: &str,
dir_idx: &str,
collection: &str,
volume_id: VolumeId,
) -> io::Result<Self> {
let mut shards = Vec::with_capacity(TOTAL_SHARDS_COUNT);
for _ in 0..TOTAL_SHARDS_COUNT {
shards.push(None);
}
let mut vol = EcVolume {
volume_id,
collection: collection.to_string(),
dir: dir.to_string(),
dir_idx: dir_idx.to_string(),
version: Version::current(),
shards,
dat_file_size: 0,
ecx_file: None,
ecx_file_size: 0,
ecj_file: None,
disk_type: DiskType::default(),
};
// Open .ecx file (sorted index)
let ecx_path = vol.ecx_file_name();
if std::path::Path::new(&ecx_path).exists() {
let file = File::open(&ecx_path)?;
vol.ecx_file_size = file.metadata()?.len() as i64;
vol.ecx_file = Some(file);
}
// Open .ecj file (deletion journal)
let ecj_path = vol.ecj_file_name();
let ecj_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.append(true)
.open(&ecj_path)?;
vol.ecj_file = Some(ecj_file);
Ok(vol)
}
// ---- File names ----
fn base_name(&self) -> String {
crate::storage::volume::volume_file_name(&self.dir, &self.collection, self.volume_id)
}
fn idx_base_name(&self) -> String {
crate::storage::volume::volume_file_name(&self.dir_idx, &self.collection, self.volume_id)
}
pub fn ecx_file_name(&self) -> String {
format!("{}.ecx", self.idx_base_name())
}
pub fn ecj_file_name(&self) -> String {
format!("{}.ecj", self.idx_base_name())
}
// ---- Shard management ----
/// Add a shard to this volume.
pub fn add_shard(&mut self, mut shard: EcVolumeShard) -> io::Result<()> {
let id = shard.shard_id as usize;
if id >= TOTAL_SHARDS_COUNT {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("invalid shard id: {}", id),
));
}
shard.open()?;
self.shards[id] = Some(shard);
Ok(())
}
/// Remove and close a shard.
pub fn remove_shard(&mut self, shard_id: ShardId) {
if let Some(ref mut shard) = self.shards[shard_id as usize] {
shard.close();
}
self.shards[shard_id as usize] = None;
}
/// Get a ShardBits bitmap of locally available shards.
pub fn shard_bits(&self) -> ShardBits {
let mut bits = ShardBits::default();
for (i, shard) in self.shards.iter().enumerate() {
if shard.is_some() {
bits.add_shard_id(i as ShardId);
}
}
bits
}
/// Count of locally available shards.
pub fn shard_count(&self) -> usize {
self.shards.iter().filter(|s| s.is_some()).count()
}
// ---- Index operations ----
/// Find a needle's offset and size in the sorted .ecx index via binary search.
pub fn find_needle_from_ecx(&self, needle_id: NeedleId) -> io::Result<Option<(Offset, Size)>> {
let ecx_file = self.ecx_file.as_ref().ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "ecx file not open")
})?;
let entry_count = self.ecx_file_size as usize / NEEDLE_MAP_ENTRY_SIZE;
if entry_count == 0 {
return Ok(None);
}
// Binary search
let mut lo: usize = 0;
let mut hi: usize = entry_count;
let mut entry_buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
while lo < hi {
let mid = lo + (hi - lo) / 2;
let file_offset = (mid * NEEDLE_MAP_ENTRY_SIZE) as u64;
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
ecx_file.read_exact_at(&mut entry_buf, file_offset)?;
}
let (key, offset, size) = idx_entry_from_bytes(&entry_buf);
if key == needle_id {
return Ok(Some((offset, size)));
} else if key < needle_id {
lo = mid + 1;
} else {
hi = mid;
}
}
Ok(None)
}
/// Locate the EC shard intervals needed to read a needle.
pub fn locate_needle(
&self,
needle_id: NeedleId,
) -> io::Result<Option<(Offset, Size, Vec<ec_locate::Interval>)>> {
let (offset, size) = match self.find_needle_from_ecx(needle_id)? {
Some((o, s)) => (o, s),
None => return Ok(None),
};
if size.is_deleted() || offset.is_zero() {
return Ok(None);
}
let shard_size = self.shard_file_size();
let intervals = ec_locate::locate_data(
offset.to_actual_offset(),
size,
shard_size,
);
Ok(Some((offset, size, intervals)))
}
/// Get the size of a single shard (all shards are the same size).
fn shard_file_size(&self) -> i64 {
for shard in &self.shards {
if let Some(s) = shard {
return s.file_size();
}
}
0
}
// ---- Deletion journal ----
/// Append a deleted needle ID to the .ecj journal.
pub fn journal_delete(&mut self, needle_id: NeedleId) -> io::Result<()> {
let ecj_file = self.ecj_file.as_mut().ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "ecj file not open")
})?;
let mut buf = [0u8; NEEDLE_ID_SIZE];
needle_id.to_bytes(&mut buf);
ecj_file.write_all(&buf)?;
ecj_file.sync_all()?;
Ok(())
}
/// Read all deleted needle IDs from the .ecj journal.
pub fn read_deleted_needles(&self) -> io::Result<Vec<NeedleId>> {
let ecj_path = self.ecj_file_name();
if !std::path::Path::new(&ecj_path).exists() {
return Ok(Vec::new());
}
let data = fs::read(&ecj_path)?;
let count = data.len() / NEEDLE_ID_SIZE;
let mut needles = Vec::with_capacity(count);
for i in 0..count {
let start = i * NEEDLE_ID_SIZE;
let id = NeedleId::from_bytes(&data[start..start + NEEDLE_ID_SIZE]);
needles.push(id);
}
Ok(needles)
}
// ---- Lifecycle ----
pub fn close(&mut self) {
for shard in &mut self.shards {
if let Some(s) = shard {
s.close();
}
*shard = None;
}
self.ecx_file = None;
self.ecj_file = None;
}
pub fn destroy(&mut self) {
for shard in &mut self.shards {
if let Some(s) = shard {
s.destroy();
}
*shard = None;
}
let _ = fs::remove_file(self.ecx_file_name());
let _ = fs::remove_file(self.ecj_file_name());
self.ecx_file = None;
self.ecj_file = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::idx;
use tempfile::TempDir;
fn write_ecx_file(dir: &str, collection: &str, vid: VolumeId, entries: &[(NeedleId, Offset, Size)]) {
let base = crate::storage::volume::volume_file_name(dir, collection, vid);
let ecx_path = format!("{}.ecx", base);
let mut file = File::create(&ecx_path).unwrap();
// Write sorted entries
for &(key, offset, size) in entries {
let mut buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
idx_entry_to_bytes(&mut buf, key, offset, size);
file.write_all(&buf).unwrap();
}
}
#[test]
fn test_ec_volume_find_needle() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
// Write sorted ecx entries
let entries = vec![
(NeedleId(1), Offset::from_actual_offset(8), Size(100)),
(NeedleId(5), Offset::from_actual_offset(200), Size(200)),
(NeedleId(10), Offset::from_actual_offset(500), Size(300)),
];
write_ecx_file(dir, "", VolumeId(1), &entries);
let vol = EcVolume::new(dir, dir, "", VolumeId(1)).unwrap();
// Found
let result = vol.find_needle_from_ecx(NeedleId(5)).unwrap();
assert!(result.is_some());
let (offset, size) = result.unwrap();
assert_eq!(offset.to_actual_offset(), 200);
assert_eq!(size, Size(200));
// Not found
let result = vol.find_needle_from_ecx(NeedleId(7)).unwrap();
assert!(result.is_none());
}
#[test]
fn test_ec_volume_journal() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
// Need ecx file for EcVolume::new to succeed
write_ecx_file(dir, "", VolumeId(1), &[]);
let mut vol = EcVolume::new(dir, dir, "", VolumeId(1)).unwrap();
vol.journal_delete(NeedleId(10)).unwrap();
vol.journal_delete(NeedleId(20)).unwrap();
let deleted = vol.read_deleted_needles().unwrap();
assert_eq!(deleted, vec![NeedleId(10), NeedleId(20)]);
}
#[test]
fn test_ec_volume_shard_bits() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
write_ecx_file(dir, "", VolumeId(1), &[]);
let mut vol = EcVolume::new(dir, dir, "", VolumeId(1)).unwrap();
assert_eq!(vol.shard_count(), 0);
// Create a shard file so we can add it
let mut shard = EcVolumeShard::new(dir, "", VolumeId(1), 3);
shard.create().unwrap();
shard.write_all(&[0u8; 100]).unwrap();
shard.close();
vol.add_shard(EcVolumeShard::new(dir, "", VolumeId(1), 3)).unwrap();
assert_eq!(vol.shard_count(), 1);
assert!(vol.shard_bits().has_shard_id(3));
}
}

13
seaweed-volume/src/storage/erasure_coding/mod.rs

@ -0,0 +1,13 @@
//! Erasure coding module for volume data protection.
//!
//! Encodes a volume's .dat file into 10 data + 4 parity shards using
//! Reed-Solomon erasure coding. Can reconstruct from any 10 of 14 shards.
pub mod ec_shard;
pub mod ec_volume;
pub mod ec_encoder;
pub mod ec_decoder;
pub mod ec_locate;
pub use ec_shard::{ShardId, EcVolumeShard, TOTAL_SHARDS_COUNT, DATA_SHARDS_COUNT, PARITY_SHARDS_COUNT};
pub use ec_volume::EcVolume;

1
seaweed-volume/src/storage/mod.rs

@ -6,3 +6,4 @@ pub mod needle_map;
pub mod volume;
pub mod disk_location;
pub mod store;
pub mod erasure_coding;
Loading…
Cancel
Save