Browse Source

seaweed-volume: add idx file walker and wire up storage module tree

Index file (.idx) format: sequential 17-byte entries walked in 1024-row batches.
walk_index_file() and write_index_entry() match Go's idx.WalkIndexFile().
3 unit tests. Added crc32c dependency for Castagnoli CRC.
rust-volume-server
Chris Lu 6 days ago
parent
commit
fe161578db
  1. 19
      seaweed-volume/Cargo.lock
  2. 3
      seaweed-volume/Cargo.toml
  3. 1
      seaweed-volume/src/main.rs
  4. 100
      seaweed-volume/src/storage/idx/mod.rs
  5. 4
      seaweed-volume/src/storage/mod.rs

19
seaweed-volume/Cargo.lock

@ -373,6 +373,15 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
[[package]]
name = "crc32c"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47"
dependencies = [
"rustc_version",
]
[[package]]
name = "crc32fast"
version = "1.5.0"
@ -1940,6 +1949,15 @@ version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustc_version"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
dependencies = [
"semver",
]
[[package]]
name = "rustix"
version = "0.38.44"
@ -2064,6 +2082,7 @@ dependencies = [
"bytes",
"chrono",
"clap",
"crc32c",
"crc32fast",
"dashmap",
"futures",

3
seaweed-volume/Cargo.toml

@ -50,7 +50,8 @@ toml = "0.8"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
# CRC32
# CRC32 — using Castagnoli polynomial (CRC32-C), matching Go's crc32.Castagnoli
crc32c = "0.6"
crc32fast = "1"
# Memory-mapped files

1
seaweed-volume/src/main.rs

@ -1,4 +1,5 @@
mod config;
mod storage;
fn main() {
let cli = config::parse_cli();

100
seaweed-volume/src/storage/idx/mod.rs

@ -0,0 +1,100 @@
//! Index file (.idx) format: sequential 17-byte entries.
//!
//! Each entry: NeedleId(8) + Offset(5) + Size(4) = 17 bytes.
use crate::storage::types::*;
use std::io::{self, Read, Seek, SeekFrom};
const ROWS_TO_READ: usize = 1024;
/// Walk all entries in an .idx file, calling `f` for each.
/// Mirrors Go's `WalkIndexFile()`.
pub fn walk_index_file<R, F>(reader: &mut R, start_from: u64, mut f: F) -> io::Result<()>
where
R: Read + Seek,
F: FnMut(NeedleId, Offset, Size) -> io::Result<()>,
{
let reader_offset = start_from * NEEDLE_MAP_ENTRY_SIZE as u64;
reader.seek(SeekFrom::Start(reader_offset))?;
let mut buf = vec![0u8; NEEDLE_MAP_ENTRY_SIZE * ROWS_TO_READ];
loop {
let count = match reader.read(&mut buf) {
Ok(0) => return Ok(()),
Ok(n) => n,
Err(ref e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(()),
Err(e) => return Err(e),
};
let mut i = 0;
while i + NEEDLE_MAP_ENTRY_SIZE <= count {
let (key, offset, size) = idx_entry_from_bytes(&buf[i..i + NEEDLE_MAP_ENTRY_SIZE]);
f(key, offset, size)?;
i += NEEDLE_MAP_ENTRY_SIZE;
}
}
}
/// Write a single index entry to a writer.
pub fn write_index_entry<W: io::Write>(writer: &mut W, key: NeedleId, offset: Offset, size: Size) -> io::Result<()> {
let mut buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
idx_entry_to_bytes(&mut buf, key, offset, size);
writer.write_all(&buf)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_walk_index_file() {
// Create a small index with 3 entries
let mut data = Vec::new();
let entries = vec![
(NeedleId(1), Offset::from_actual_offset(0), Size(100)),
(NeedleId(2), Offset::from_actual_offset(128), Size(200)),
(NeedleId(3), Offset::from_actual_offset(384), Size(300)),
];
for (key, offset, size) in &entries {
let mut buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
idx_entry_to_bytes(&mut buf, *key, *offset, *size);
data.extend_from_slice(&buf);
}
let mut cursor = Cursor::new(data);
let mut collected = Vec::new();
walk_index_file(&mut cursor, 0, |key, offset, size| {
collected.push((key, offset.to_actual_offset(), size));
Ok(())
}).unwrap();
assert_eq!(collected.len(), 3);
assert_eq!(collected[0].0, NeedleId(1));
assert_eq!(collected[0].1, 0);
assert_eq!(collected[0].2, Size(100));
assert_eq!(collected[1].0, NeedleId(2));
assert_eq!(collected[2].0, NeedleId(3));
}
#[test]
fn test_walk_empty() {
let mut cursor = Cursor::new(Vec::new());
let mut count = 0;
walk_index_file(&mut cursor, 0, |_, _, _| { count += 1; Ok(()) }).unwrap();
assert_eq!(count, 0);
}
#[test]
fn test_write_index_entry() {
let mut buf = Vec::new();
write_index_entry(&mut buf, NeedleId(42), Offset::from_actual_offset(8 * 10), Size(512)).unwrap();
assert_eq!(buf.len(), NEEDLE_MAP_ENTRY_SIZE);
let (key, offset, size) = idx_entry_from_bytes(&buf);
assert_eq!(key, NeedleId(42));
assert_eq!(offset.to_actual_offset(), 80);
assert_eq!(size, Size(512));
}
}

4
seaweed-volume/src/storage/mod.rs

@ -0,0 +1,4 @@
pub mod types;
pub mod needle;
pub mod super_block;
pub mod idx;
Loading…
Cancel
Save