Browse Source

Format pending Rust source updates

pull/8539/head
Chris Lu 2 weeks ago
parent
commit
f2c834e79c
  1. 102
      seaweed-volume/src/storage/disk_location.rs
  2. 15
      seaweed-volume/src/storage/erasure_coding/ec_encoder.rs
  3. 16
      seaweed-volume/src/storage/erasure_coding/ec_volume.rs
  4. 78
      seaweed-volume/src/storage/needle_map.rs
  5. 9
      seaweed-volume/src/storage/needle_map/compact_map.rs
  6. 97
      seaweed-volume/src/storage/store.rs
  7. 3
      seaweed-volume/src/storage/types.rs
  8. 53
      seaweed-volume/src/storage/volume.rs
  9. 12
      seaweed-volume/src/version.rs

102
seaweed-volume/src/storage/disk_location.rs

@ -611,15 +611,13 @@ pub fn get_disk_stats(path: &str) -> (u64, u64) {
/// Matches Go's `calculateExpectedShardSize`: large blocks (1GB * data_shards) first,
/// then small blocks (1MB * data_shards) for the remainder.
fn calculate_expected_shard_size(dat_file_size: i64) -> i64 {
let large_batch_size =
ERASURE_CODING_LARGE_BLOCK_SIZE as i64 * DATA_SHARDS_COUNT as i64;
let large_batch_size = ERASURE_CODING_LARGE_BLOCK_SIZE as i64 * DATA_SHARDS_COUNT as i64;
let num_large_batches = dat_file_size / large_batch_size;
let mut shard_size = num_large_batches * ERASURE_CODING_LARGE_BLOCK_SIZE as i64;
let remaining = dat_file_size - (num_large_batches * large_batch_size);
if remaining > 0 {
let small_batch_size =
ERASURE_CODING_SMALL_BLOCK_SIZE as i64 * DATA_SHARDS_COUNT as i64;
let small_batch_size = ERASURE_CODING_SMALL_BLOCK_SIZE as i64 * DATA_SHARDS_COUNT as i64;
// Ceiling division
let num_small_batches = (remaining + small_batch_size - 1) / small_batch_size;
shard_size += num_small_batches * ERASURE_CODING_SMALL_BLOCK_SIZE as i64;
@ -679,8 +677,16 @@ mod tests {
)
.unwrap();
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
loc.create_volume(
VolumeId(1),
"",
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap();
assert_eq!(loc.volumes_len(), 1);
assert!(loc.find_volume(VolumeId(1)).is_some());
@ -704,10 +710,26 @@ mod tests {
Vec::new(),
)
.unwrap();
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
loc.create_volume(VolumeId(2), "test", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
loc.create_volume(
VolumeId(1),
"",
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap();
loc.create_volume(
VolumeId(2),
"test",
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap();
loc.close();
}
@ -743,10 +765,26 @@ mod tests {
)
.unwrap();
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
loc.create_volume(VolumeId(2), "", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
loc.create_volume(
VolumeId(1),
"",
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap();
loc.create_volume(
VolumeId(2),
"",
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap();
assert_eq!(loc.volumes_len(), 2);
loc.delete_volume(VolumeId(1)).unwrap();
@ -768,12 +806,36 @@ mod tests {
)
.unwrap();
loc.create_volume(VolumeId(1), "pics", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
loc.create_volume(VolumeId(2), "pics", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
loc.create_volume(VolumeId(3), "docs", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
loc.create_volume(
VolumeId(1),
"pics",
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap();
loc.create_volume(
VolumeId(2),
"pics",
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap();
loc.create_volume(
VolumeId(3),
"docs",
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap();
assert_eq!(loc.volumes_len(), 3);
loc.delete_collection("pics");

15
seaweed-volume/src/storage/erasure_coding/ec_encoder.rs

@ -588,7 +588,15 @@ mod tests {
let data_shards = 10;
let parity_shards = 4;
let total_shards = data_shards + parity_shards;
write_ec_files(dat_dir, idx_dir, "", VolumeId(1), data_shards, parity_shards).unwrap();
write_ec_files(
dat_dir,
idx_dir,
"",
VolumeId(1),
data_shards,
parity_shards,
)
.unwrap();
// Verify all 14 shard files in data dir
for i in 0..total_shards {
@ -653,6 +661,9 @@ mod tests {
// Should fail: .idx is in idx_dir, not wrong_dir
let result = write_ec_files(dat_dir, wrong_dir, "", VolumeId(1), 10, 4);
assert!(result.is_err(), "should fail when idx_dir doesn't contain .idx");
assert!(
result.is_err(),
"should fail when idx_dir doesn't contain .idx"
);
}
}

16
seaweed-volume/src/storage/erasure_coding/ec_volume.rs

@ -72,8 +72,7 @@ impl EcVolume {
// Read expire_at_sec from .vif if present
let expire_at_sec = {
let base =
crate::storage::volume::volume_file_name(dir, collection, volume_id);
let base = crate::storage::volume::volume_file_name(dir, collection, volume_id);
let vif_path = format!("{}.vif", base);
if let Ok(vif_content) = std::fs::read_to_string(&vif_path) {
if let Ok(vif_info) =
@ -130,9 +129,8 @@ impl EcVolume {
}
// Open .ecj file (deletion journal) — use ecx_actual_dir for consistency
let ecj_base = crate::storage::volume::volume_file_name(
&vol.ecx_actual_dir, collection, volume_id,
);
let ecj_base =
crate::storage::volume::volume_file_name(&vol.ecx_actual_dir, collection, volume_id);
let ecj_path = format!("{}.ecj", ecj_base);
let ecj_file = OpenOptions::new()
.read(true)
@ -399,7 +397,9 @@ impl EcVolume {
}
// Remove .ecx/.ecj from ecx_actual_dir (where they were found)
let actual_base = crate::storage::volume::volume_file_name(
&self.ecx_actual_dir, &self.collection, self.volume_id,
&self.ecx_actual_dir,
&self.collection,
self.volume_id,
);
let _ = fs::remove_file(format!("{}.ecx", actual_base));
let _ = fs::remove_file(format!("{}.ecj", actual_base));
@ -410,7 +410,9 @@ impl EcVolume {
}
if self.ecx_actual_dir != self.dir && self.dir_idx != self.dir {
let data_base = crate::storage::volume::volume_file_name(
&self.dir, &self.collection, self.volume_id,
&self.dir,
&self.collection,
self.volume_id,
);
let _ = fs::remove_file(format!("{}.ecx", data_base));
let _ = fs::remove_file(format!("{}.ecj", data_base));

78
seaweed-volume/src/storage/needle_map.rs

@ -384,19 +384,21 @@ impl RedbNeedleMap {
io::Error::new(io::ErrorKind::Other, format!("redb insert meta: {}", e))
})?;
}
txn.commit()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb commit meta: {}", e)))?;
txn.commit().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb commit meta: {}", e))
})?;
Ok(())
}
/// Read the stored .idx file size from redb metadata.
fn read_idx_size_meta(&self) -> io::Result<Option<u64>> {
let txn = self.db.begin_read().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb begin_read: {}", e))
})?;
let meta = txn.open_table(META_TABLE).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb open meta: {}", e))
})?;
let txn = self
.db
.begin_read()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb begin_read: {}", e)))?;
let meta = txn
.open_table(META_TABLE)
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb open meta: {}", e)))?;
match meta.get(META_IDX_SIZE) {
Ok(Some(guard)) => Ok(Some(guard.value())),
Ok(None) => Ok(None),
@ -410,15 +412,16 @@ impl RedbNeedleMap {
/// Rebuild metrics by scanning all entries in the redb table.
/// Called when reusing an existing .rdb without a full rebuild.
fn rebuild_metrics_from_db(&self) -> io::Result<()> {
let txn = self.db.begin_read().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb begin_read: {}", e))
})?;
let table = txn.open_table(NEEDLE_TABLE).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e))
})?;
let iter = table.iter().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb iter: {}", e))
})?;
let txn = self
.db
.begin_read()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb begin_read: {}", e)))?;
let table = txn
.open_table(NEEDLE_TABLE)
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e)))?;
let iter = table
.iter()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb iter: {}", e)))?;
for entry in iter {
let (key_guard, val_guard) = entry.map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb iter next: {}", e))
@ -477,9 +480,8 @@ impl RedbNeedleMap {
reader: &mut R,
idx_size: u64,
) -> io::Result<Self> {
let db = Database::open(db_path).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb open: {}", e))
})?;
let db = Database::open(db_path)
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb open: {}", e)))?;
let nm = RedbNeedleMap {
db,
@ -488,9 +490,9 @@ impl RedbNeedleMap {
idx_file_offset: 0,
};
let stored_idx_size = nm.read_idx_size_meta()?.ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "no idx_size in redb meta")
})?;
let stored_idx_size = nm
.read_idx_size_meta()?
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no idx_size in redb meta"))?;
if stored_idx_size > idx_size {
// .idx shrank — corrupted or truncated, need full rebuild
@ -524,14 +526,12 @@ impl RedbNeedleMap {
size: Size(-(old.size.0)),
};
let packed = pack_needle_value(&deleted_nv);
table
.insert(key_u64, packed.as_slice())
.map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("redb insert: {}", e),
)
})?;
table.insert(key_u64, packed.as_slice()).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("redb insert: {}", e),
)
})?;
}
}
} else {
@ -539,22 +539,16 @@ impl RedbNeedleMap {
let old = nm.get_via_table(&table, key_u64).ok().flatten();
let nv = NeedleValue { offset, size };
let packed = pack_needle_value(&nv);
table
.insert(key_u64, packed.as_slice())
.map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("redb insert: {}", e),
)
})?;
table.insert(key_u64, packed.as_slice()).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb insert: {}", e))
})?;
nm.metric.on_put(key, old.as_ref(), size);
}
Ok(())
})?;
}
txn.commit().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e))
})?;
txn.commit()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e)))?;
nm.save_idx_size_meta(idx_size)?;
}

9
seaweed-volume/src/storage/needle_map/compact_map.rs

@ -10,8 +10,8 @@
use std::collections::HashMap;
use crate::storage::types::*;
use super::NeedleValue;
use crate::storage::types::*;
/// Maximum entries per segment. Must be <= u16::MAX (65535).
const SEGMENT_CHUNK_SIZE: u64 = 50_000;
@ -25,9 +25,9 @@ type Chunk = u64;
/// Compact entry: 10 bytes (2 + 4 + 4) vs 16 bytes for full NeedleId + NeedleValue.
#[derive(Clone, Copy)]
struct CompactEntry {
key: CompactKey, // 2 bytes
key: CompactKey, // 2 bytes
offset: [u8; OFFSET_SIZE], // 4 bytes
size: Size, // 4 bytes
size: Size, // 4 bytes
}
impl CompactEntry {
@ -186,7 +186,8 @@ impl CompactMap {
/// Insert or update. Returns old NeedleValue if updating.
pub fn set(&mut self, id: NeedleId, offset: Offset, size: Size) -> Option<NeedleValue> {
let chunk = id.0 / SEGMENT_CHUNK_SIZE;
let segment = self.segments
let segment = self
.segments
.entry(chunk)
.or_insert_with(|| Segment::new(chunk));
segment.set(id, offset, size)

97
seaweed-volume/src/storage/store.rs

@ -217,7 +217,15 @@ impl Store {
let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid);
let dat_path = format!("{}.dat", base);
if std::path::Path::new(&dat_path).exists() {
return loc.create_volume(vid, collection, self.needle_map_kind, None, None, 0, Version::current());
return loc.create_volume(
vid,
collection,
self.needle_map_kind,
None,
None,
0,
Version::current(),
);
}
}
Err(VolumeError::Io(io::Error::new(
@ -275,11 +283,7 @@ impl Store {
/// Configure a volume's replica placement on disk.
/// The volume must already be unmounted. This opens the .dat file directly,
/// modifies the replica_placement byte (offset 1), and writes it back.
pub fn configure_volume(
&self,
vid: VolumeId,
rp: ReplicaPlacement,
) -> Result<(), VolumeError> {
pub fn configure_volume(&self, vid: VolumeId, rp: ReplicaPlacement) -> Result<(), VolumeError> {
let (_, base_path, _) = self.find_volume_file_base(vid).ok_or_else(|| {
VolumeError::Io(io::Error::new(
io::ErrorKind::NotFound,
@ -586,13 +590,15 @@ impl Store {
pub fn find_ec_dir(&self, vid: VolumeId, collection: &str) -> Option<String> {
for loc in &self.locations {
// Check idx directory first
let idx_base = crate::storage::volume::volume_file_name(&loc.idx_directory, collection, vid);
let idx_base =
crate::storage::volume::volume_file_name(&loc.idx_directory, collection, vid);
if std::path::Path::new(&format!("{}.ecx", idx_base)).exists() {
return Some(loc.directory.clone());
}
// Fall back to data directory if .ecx was created before -dir.idx was configured
if loc.idx_directory != loc.directory {
let data_base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid);
let data_base =
crate::storage::volume::volume_file_name(&loc.directory, collection, vid);
if std::path::Path::new(&format!("{}.ecx", data_base)).exists() {
return Some(loc.directory.clone());
}
@ -750,9 +756,8 @@ fn load_vif_volume_info(path: &str) -> Result<VifVolumeInfo, VolumeError> {
}
fn save_vif_volume_info(path: &str, info: &VifVolumeInfo) -> Result<(), VolumeError> {
let content = serde_json::to_string_pretty(info).map_err(|e| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, e.to_string()))
})?;
let content = serde_json::to_string_pretty(info)
.map_err(|e| VolumeError::Io(io::Error::new(io::ErrorKind::Other, e.to_string())))?;
std::fs::write(path, content)?;
Ok(())
}
@ -811,7 +816,15 @@ mod tests {
let mut store = make_test_store(&[dir]);
store
.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive, Version::current())
.add_volume(
VolumeId(1),
"",
None,
None,
0,
DiskType::HardDrive,
Version::current(),
)
.unwrap();
assert!(store.has_volume(VolumeId(1)));
assert!(!store.has_volume(VolumeId(2)));
@ -824,7 +837,15 @@ mod tests {
let dir = tmp.path().to_str().unwrap();
let mut store = make_test_store(&[dir]);
store
.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive, Version::current())
.add_volume(
VolumeId(1),
"",
None,
None,
0,
DiskType::HardDrive,
Version::current(),
)
.unwrap();
// Write
@ -890,10 +911,26 @@ mod tests {
// Add volumes — should go to location with fewest volumes
store
.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive, Version::current())
.add_volume(
VolumeId(1),
"",
None,
None,
0,
DiskType::HardDrive,
Version::current(),
)
.unwrap();
store
.add_volume(VolumeId(2), "", None, None, 0, DiskType::HardDrive, Version::current())
.add_volume(
VolumeId(2),
"",
None,
None,
0,
DiskType::HardDrive,
Version::current(),
)
.unwrap();
assert_eq!(store.total_volume_count(), 2);
@ -909,13 +946,37 @@ mod tests {
let mut store = make_test_store(&[dir]);
store
.add_volume(VolumeId(1), "pics", None, None, 0, DiskType::HardDrive, Version::current())
.add_volume(
VolumeId(1),
"pics",
None,
None,
0,
DiskType::HardDrive,
Version::current(),
)
.unwrap();
store
.add_volume(VolumeId(2), "pics", None, None, 0, DiskType::HardDrive, Version::current())
.add_volume(
VolumeId(2),
"pics",
None,
None,
0,
DiskType::HardDrive,
Version::current(),
)
.unwrap();
store
.add_volume(VolumeId(3), "docs", None, None, 0, DiskType::HardDrive, Version::current())
.add_volume(
VolumeId(3),
"docs",
None,
None,
0,
DiskType::HardDrive,
Version::current(),
)
.unwrap();
assert_eq!(store.total_volume_count(), 3);

3
seaweed-volume/src/storage/types.rs

@ -582,7 +582,8 @@ mod tests {
{
assert_eq!(OFFSET_SIZE, 5);
assert_eq!(NEEDLE_MAP_ENTRY_SIZE, 17); // 8 + 5 + 4
assert_eq!(MAX_POSSIBLE_VOLUME_SIZE, 4 * 1024 * 1024 * 1024 * 8 * 256); // 8TB
assert_eq!(MAX_POSSIBLE_VOLUME_SIZE, 4 * 1024 * 1024 * 1024 * 8 * 256);
// 8TB
}
#[cfg(not(feature = "5bytes"))]
{

53
seaweed-volume/src/storage/volume.rs

@ -725,7 +725,11 @@ impl Volume {
self.read_needle_with_option(n, &mut read_option)
}
pub fn read_needle_with_option(&self, n: &mut Needle, read_option: &mut ReadOption) -> Result<i32, VolumeError> {
pub fn read_needle_with_option(
&self,
n: &mut Needle,
read_option: &mut ReadOption,
) -> Result<i32, VolumeError> {
let _guard = self.data_file_access_control.read_lock();
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?;
@ -796,11 +800,7 @@ impl Volume {
{
// Double-read: in 4-byte offset mode, the actual data may be
// beyond 32GB due to offset wrapping. Retry at offset + 32GB.
self.read_needle_blob_and_parse(
n,
offset + MAX_POSSIBLE_VOLUME_SIZE as i64,
size,
)
self.read_needle_blob_and_parse(n, offset + MAX_POSSIBLE_VOLUME_SIZE as i64, size)
}
Err(e) => Err(e),
}
@ -2714,7 +2714,11 @@ mod tests {
// After compaction, the revision should have changed
let new_rev = v.super_block.compaction_revision;
assert_eq!(new_rev, initial_rev + 1, "compaction should increment revision");
assert_eq!(
new_rev,
initial_rev + 1,
"compaction should increment revision"
);
// Re-lookup needle 1 — should still be found with the new revision
let (new_offset, relookup_rev) = v.re_lookup_needle_data_offset(NeedleId(1)).unwrap();
@ -2731,7 +2735,10 @@ mod tests {
// Deleted needle should not be found
let result = v.re_lookup_needle_data_offset(NeedleId(2));
assert!(result.is_err(), "deleted needle should not be found after compaction");
assert!(
result.is_err(),
"deleted needle should not be found after compaction"
);
}
#[test]
@ -2799,11 +2806,20 @@ mod tests {
v.destroy().unwrap();
// .dat and .idx should be gone
assert!(!std::path::Path::new(&dat_path).exists(), ".dat should be removed");
assert!(!std::path::Path::new(&idx_path).exists(), ".idx should be removed");
assert!(
!std::path::Path::new(&dat_path).exists(),
".dat should be removed"
);
assert!(
!std::path::Path::new(&idx_path).exists(),
".idx should be removed"
);
// .vif MUST be preserved for EC volumes
assert!(std::path::Path::new(&vif_path).exists(), ".vif must survive destroy");
assert!(
std::path::Path::new(&vif_path).exists(),
".vif must survive destroy"
);
}
/// Volume destroy with separate idx directory must clean up both dirs.
@ -2847,8 +2863,17 @@ mod tests {
v.destroy().unwrap();
assert!(!std::path::Path::new(&dat_path).exists(), ".dat removed from data dir");
assert!(!std::path::Path::new(&idx_path).exists(), ".idx removed from idx dir");
assert!(std::path::Path::new(&vif_path).exists(), ".vif preserved in data dir");
assert!(
!std::path::Path::new(&dat_path).exists(),
".dat removed from data dir"
);
assert!(
!std::path::Path::new(&idx_path).exists(),
".idx removed from idx dir"
);
assert!(
std::path::Path::new(&vif_path).exists(),
".vif preserved in data dir"
);
}
}

12
seaweed-volume/src/version.rs

@ -21,7 +21,9 @@ pub fn commit() -> &'static str {
pub fn version_number() -> &'static str {
static VERSION_NUMBER: OnceLock<String> = OnceLock::new();
VERSION_NUMBER
.get_or_init(|| parse_go_version_number().unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_string()))
.get_or_init(|| {
parse_go_version_number().unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_string())
})
.as_str()
}
@ -34,7 +36,8 @@ pub fn version() -> &'static str {
pub fn full_version() -> &'static str {
static FULL: OnceLock<String> = OnceLock::new();
FULL.get_or_init(|| format!("{} {}", version(), commit())).as_str()
FULL.get_or_init(|| format!("{} {}", version(), commit()))
.as_str()
}
pub fn server_header() -> &'static str {
@ -45,7 +48,10 @@ pub fn server_header() -> &'static str {
}
fn parse_go_version_number() -> Option<String> {
let src = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../weed/util/version/constants.go"));
let src = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/../weed/util/version/constants.go"
));
let mut major: Option<u32> = None;
let mut minor: Option<u32> = None;
for line in src.lines() {

Loading…
Cancel
Save