From 8ade1c51d43bd3f1a7a9d4bc0e458a84eb3dfa54 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Mar 2026 19:19:56 -0800 Subject: [PATCH] fix: send go-compatible heartbeat metadata --- seaweed-volume/src/main.rs | 91 +++++--- seaweed-volume/src/server/heartbeat.rs | 161 ++++++++++++-- seaweed-volume/src/storage/disk_location.rs | 142 +++++++++++-- seaweed-volume/src/storage/store.rs | 221 +++++++++++++++----- seaweed-volume/tests/http_integration.rs | 41 ++-- 5 files changed, 509 insertions(+), 147 deletions(-) diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index b52fe5bcd..17a6d875f 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -1,16 +1,16 @@ use std::sync::{Arc, RwLock}; -use tracing::{info, error}; +use tracing::{error, info}; use seaweed_volume::config::{self, VolumeServerConfig}; use seaweed_volume::metrics; +use seaweed_volume::pb::volume_server_pb::volume_server_server::VolumeServerServer; use seaweed_volume::security::{Guard, SigningKey}; use seaweed_volume::server::grpc_server::VolumeGrpcService; use seaweed_volume::server::volume_server::VolumeServerState; use seaweed_volume::server::write_queue::WriteQueue; use seaweed_volume::storage::store::Store; use seaweed_volume::storage::types::DiskType; -use seaweed_volume::pb::volume_server_pb::volume_server_server::VolumeServerServer; use tokio_rustls::TlsAcceptor; @@ -24,7 +24,10 @@ fn main() { .init(); let config = config::parse_cli(); - info!("SeaweedFS Volume Server (Rust) v{}", env!("CARGO_PKG_VERSION")); + info!( + "SeaweedFS Volume Server (Rust) v{}", + env!("CARGO_PKG_VERSION") + ); // Register Prometheus metrics metrics::register_metrics(); @@ -64,6 +67,7 @@ fn load_rustls_config(cert_path: &str, key_path: &str) -> rustls::ServerConfig { async fn run(config: VolumeServerConfig) -> Result<(), Box> { // Initialize the store let mut store = Store::new(config.index_type); + store.id = config.id.clone(); store.ip = config.ip.clone(); store.port = config.port; store.grpc_port = config.grpc_port; @@ -80,13 +84,15 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box, mut shutdown_rx: broadcast::Receiver<()>, ) { - info!("Starting heartbeat to master nodes: {:?}", config.master_addresses); + info!( + "Starting heartbeat to master nodes: {:?}", + config.master_addresses + ); let pulse = Duration::from_secs(config.pulse_seconds.max(1)); @@ -164,17 +168,24 @@ async fn do_heartbeat( _ = shutdown_rx.recv() => { state.is_heartbeating.store(false, Ordering::Relaxed); - let empty = master_pb::Heartbeat { - ip: config.ip.clone(), - port: config.port as u32, - public_url: config.public_url.clone(), - max_file_key: 0, - data_center: config.data_center.clone(), - rack: config.rack.clone(), - has_no_volumes: true, - has_no_ec_shards: true, - grpc_port: config.grpc_port as u32, - ..Default::default() + let empty = { + let store = state.store.read().unwrap(); + let (location_uuids, disk_tags) = collect_location_metadata(&store); + master_pb::Heartbeat { + id: store.id.clone(), + ip: config.ip.clone(), + port: config.port as u32, + public_url: config.public_url.clone(), + max_file_key: 0, + data_center: config.data_center.clone(), + rack: config.rack.clone(), + has_no_volumes: true, + has_no_ec_shards: true, + grpc_port: config.grpc_port as u32, + location_uuids, + disk_tags, + ..Default::default() + } }; let _ = tx.send(empty).await; tokio::time::sleep(Duration::from_millis(200)).await; @@ -186,16 +197,42 @@ async fn do_heartbeat( } /// Collect volume information into a Heartbeat message. -fn collect_heartbeat(config: &HeartbeatConfig, state: &Arc) -> master_pb::Heartbeat { +fn collect_heartbeat( + config: &HeartbeatConfig, + state: &Arc, +) -> master_pb::Heartbeat { let store = state.store.read().unwrap(); + build_heartbeat(config, &store) +} + +fn collect_location_metadata(store: &Store) -> (Vec, Vec) { + let location_uuids = store + .locations + .iter() + .map(|loc| loc.directory_uuid.clone()) + .collect(); + let disk_tags = store + .locations + .iter() + .enumerate() + .map(|(disk_id, loc)| master_pb::DiskTag { + disk_id: disk_id as u32, + tags: loc.tags.clone(), + }) + .collect(); + (location_uuids, disk_tags) +} +fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartbeat { let mut volumes = Vec::new(); let mut max_file_key = NeedleId(0); let mut max_volume_counts: HashMap = HashMap::new(); for loc in &store.locations { let disk_type_str = loc.disk_type.to_string(); - let max_count = loc.max_volume_count.load(std::sync::atomic::Ordering::Relaxed); + let max_count = loc + .max_volume_count + .load(std::sync::atomic::Ordering::Relaxed); *max_volume_counts.entry(disk_type_str).or_insert(0) += max_count as u32; for (_, vol) in loc.iter_volumes() { @@ -222,8 +259,11 @@ fn collect_heartbeat(config: &HeartbeatConfig, state: &Arc) - }); } } + let has_no_volumes = volumes.is_empty(); + let (location_uuids, disk_tags) = collect_location_metadata(store); master_pb::Heartbeat { + id: store.id.clone(), ip: config.ip.clone(), port: config.port as u32, public_url: config.public_url.clone(), @@ -232,9 +272,11 @@ fn collect_heartbeat(config: &HeartbeatConfig, state: &Arc) - rack: config.rack.clone(), admin_port: config.port as u32, volumes, - has_no_volumes: false, + has_no_volumes, max_volume_counts, grpc_port: config.grpc_port as u32, + location_uuids, + disk_tags, ..Default::default() } } @@ -268,3 +310,86 @@ fn collect_ec_heartbeat(state: &Arc) -> master_pb::Heartbeat ..Default::default() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::MinFreeSpace; + use crate::storage::needle_map::NeedleMapKind; + use crate::storage::types::{DiskType, VolumeId}; + + fn test_config() -> HeartbeatConfig { + HeartbeatConfig { + ip: "127.0.0.1".to_string(), + port: 8080, + grpc_port: 18080, + public_url: "127.0.0.1:8080".to_string(), + data_center: "dc1".to_string(), + rack: "rack1".to_string(), + master_addresses: Vec::new(), + pulse_seconds: 5, + } + } + + #[test] + fn test_build_heartbeat_includes_store_identity_and_disk_metadata() { + let temp_dir = tempfile::tempdir().unwrap(); + let dir = temp_dir.path().to_str().unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store.id = "volume-node-a".to_string(); + store + .add_location( + dir, + dir, + 3, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + vec!["fast".to_string(), "ssd".to_string()], + ) + .unwrap(); + store + .add_volume(VolumeId(7), "pics", None, None, 0, DiskType::HardDrive) + .unwrap(); + + let heartbeat = build_heartbeat(&test_config(), &store); + + assert_eq!(heartbeat.id, "volume-node-a"); + assert_eq!(heartbeat.volumes.len(), 1); + assert!(!heartbeat.has_no_volumes); + assert_eq!( + heartbeat.location_uuids, + vec![store.locations[0].directory_uuid.clone()] + ); + assert_eq!(heartbeat.disk_tags.len(), 1); + assert_eq!(heartbeat.disk_tags[0].disk_id, 0); + assert_eq!( + heartbeat.disk_tags[0].tags, + vec!["fast".to_string(), "ssd".to_string()] + ); + } + + #[test] + fn test_build_heartbeat_marks_empty_store_as_has_no_volumes() { + let temp_dir = tempfile::tempdir().unwrap(); + let dir = temp_dir.path().to_str().unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store.id = "volume-node-b".to_string(); + store + .add_location( + dir, + dir, + 2, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + + let heartbeat = build_heartbeat(&test_config(), &store); + + assert!(heartbeat.volumes.is_empty()); + assert!(heartbeat.has_no_volumes); + } +} diff --git a/seaweed-volume/src/storage/disk_location.rs b/seaweed-volume/src/storage/disk_location.rs index 47cd85892..2515ff0c4 100644 --- a/seaweed-volume/src/storage/disk_location.rs +++ b/seaweed-volume/src/storage/disk_location.rs @@ -19,7 +19,9 @@ use crate::storage::volume::{Volume, VolumeError}; pub struct DiskLocation { pub directory: String, pub idx_directory: String, + pub directory_uuid: String, pub disk_type: DiskType, + pub tags: Vec, pub max_volume_count: AtomicI32, pub original_max_volume_count: i32, volumes: HashMap, @@ -29,30 +31,53 @@ pub struct DiskLocation { } impl DiskLocation { + const UUID_FILE_NAME: &'static str = "vol_dir.uuid"; + pub fn new( directory: &str, idx_directory: &str, max_volume_count: i32, disk_type: DiskType, min_free_space: MinFreeSpace, - ) -> Self { + tags: Vec, + ) -> io::Result { + fs::create_dir_all(directory)?; + let idx_dir = if idx_directory.is_empty() { directory.to_string() } else { + fs::create_dir_all(idx_directory)?; idx_directory.to_string() }; + let directory_uuid = Self::generate_directory_uuid(directory)?; - DiskLocation { + Ok(DiskLocation { directory: directory.to_string(), idx_directory: idx_dir, + directory_uuid, disk_type, + tags, max_volume_count: AtomicI32::new(max_volume_count), original_max_volume_count: max_volume_count, volumes: HashMap::new(), is_disk_space_low: AtomicBool::new(false), available_space: AtomicU64::new(0), min_free_space, + }) + } + + fn generate_directory_uuid(directory: &str) -> io::Result { + let path = std::path::Path::new(directory).join(Self::UUID_FILE_NAME); + if path.exists() { + let existing = fs::read_to_string(&path)?; + if !existing.trim().is_empty() { + return Ok(existing); + } } + + let dir_uuid = uuid::Uuid::new_v4().to_string(); + fs::write(path, &dir_uuid)?; + Ok(dir_uuid) } // ---- Volume management ---- @@ -313,12 +338,18 @@ mod tests { fn test_disk_location_create_volume() { let tmp = TempDir::new().unwrap(); let dir = tmp.path().to_str().unwrap(); - let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive, MinFreeSpace::Percent(1.0)); - - loc.create_volume( - VolumeId(1), "", NeedleMapKind::InMemory, - None, None, 0, - ).unwrap(); + let mut loc = DiskLocation::new( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + + loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0) + .unwrap(); assert_eq!(loc.volumes_len(), 1); assert!(loc.find_volume(VolumeId(1)).is_some()); @@ -333,14 +364,32 @@ mod tests { // Create volumes { - let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive, MinFreeSpace::Percent(1.0)); - loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0).unwrap(); - loc.create_volume(VolumeId(2), "test", NeedleMapKind::InMemory, None, None, 0).unwrap(); + let mut loc = DiskLocation::new( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0) + .unwrap(); + loc.create_volume(VolumeId(2), "test", NeedleMapKind::InMemory, None, None, 0) + .unwrap(); loc.close(); } // Reload - let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive, MinFreeSpace::Percent(1.0)); + let mut loc = DiskLocation::new( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); loc.load_existing_volumes(NeedleMapKind::InMemory).unwrap(); assert_eq!(loc.volumes_len(), 2); @@ -353,10 +402,20 @@ mod tests { fn test_disk_location_delete_volume() { let tmp = TempDir::new().unwrap(); let dir = tmp.path().to_str().unwrap(); - let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive, MinFreeSpace::Percent(1.0)); - - loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0).unwrap(); - loc.create_volume(VolumeId(2), "", NeedleMapKind::InMemory, None, None, 0).unwrap(); + let mut loc = DiskLocation::new( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + + loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0) + .unwrap(); + loc.create_volume(VolumeId(2), "", NeedleMapKind::InMemory, None, None, 0) + .unwrap(); assert_eq!(loc.volumes_len(), 2); loc.delete_volume(VolumeId(1)).unwrap(); @@ -368,15 +427,56 @@ mod tests { fn test_disk_location_delete_collection() { let tmp = TempDir::new().unwrap(); let dir = tmp.path().to_str().unwrap(); - let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive, MinFreeSpace::Percent(1.0)); - - loc.create_volume(VolumeId(1), "pics", NeedleMapKind::InMemory, None, None, 0).unwrap(); - loc.create_volume(VolumeId(2), "pics", NeedleMapKind::InMemory, None, None, 0).unwrap(); - loc.create_volume(VolumeId(3), "docs", NeedleMapKind::InMemory, None, None, 0).unwrap(); + let mut loc = DiskLocation::new( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + + loc.create_volume(VolumeId(1), "pics", NeedleMapKind::InMemory, None, None, 0) + .unwrap(); + loc.create_volume(VolumeId(2), "pics", NeedleMapKind::InMemory, None, None, 0) + .unwrap(); + loc.create_volume(VolumeId(3), "docs", NeedleMapKind::InMemory, None, None, 0) + .unwrap(); assert_eq!(loc.volumes_len(), 3); loc.delete_collection("pics"); assert_eq!(loc.volumes_len(), 1); assert!(loc.find_volume(VolumeId(3)).is_some()); } + + #[test] + fn test_disk_location_persists_directory_uuid_and_tags() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + + let loc = DiskLocation::new( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + vec!["fast".to_string(), "ssd".to_string()], + ) + .unwrap(); + let directory_uuid = loc.directory_uuid.clone(); + assert_eq!(loc.tags, vec!["fast".to_string(), "ssd".to_string()]); + drop(loc); + + let reloaded = DiskLocation::new( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + assert_eq!(reloaded.directory_uuid, directory_uuid); + } } diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index 1e9e52d0c..9a4faea05 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -10,8 +10,8 @@ use std::sync::atomic::{AtomicU64, Ordering}; use crate::config::MinFreeSpace; use crate::storage::disk_location::DiskLocation; -use crate::storage::erasure_coding::ec_volume::EcVolume; use crate::storage::erasure_coding::ec_shard::EcVolumeShard; +use crate::storage::erasure_coding::ec_volume::EcVolume; use crate::storage::needle::needle::Needle; use crate::storage::needle_map::NeedleMapKind; use crate::storage::super_block::ReplicaPlacement; @@ -23,6 +23,7 @@ pub struct Store { pub locations: Vec, pub needle_map_kind: NeedleMapKind, pub volume_size_limit: AtomicU64, + pub id: String, pub ip: String, pub port: u16, pub grpc_port: u16, @@ -38,6 +39,7 @@ impl Store { locations: Vec::new(), needle_map_kind, volume_size_limit: AtomicU64::new(0), + id: String::new(), ip: String::new(), port: 0, grpc_port: 0, @@ -56,8 +58,16 @@ impl Store { max_volume_count: i32, disk_type: DiskType, min_free_space: MinFreeSpace, + tags: Vec, ) -> io::Result<()> { - let mut loc = DiskLocation::new(directory, idx_directory, max_volume_count, disk_type, min_free_space); + let mut loc = DiskLocation::new( + directory, + idx_directory, + max_volume_count, + disk_type, + min_free_space, + tags, + )?; loc.load_existing_volumes(self.needle_map_kind)?; // Check for duplicate volume IDs across existing locations @@ -65,7 +75,10 @@ impl Store { if self.find_volume(vid).is_some() { return Err(io::Error::new( io::ErrorKind::AlreadyExists, - format!("volume {} already exists in another location, conflicting dir: {}", vid, directory), + format!( + "volume {} already exists in another location, conflicting dir: {}", + vid, directory + ), )); } } @@ -87,7 +100,10 @@ impl Store { } /// Find which location contains a volume (mutable). - pub fn find_volume_mut(&mut self, vid: VolumeId) -> Option<(usize, &mut crate::storage::volume::Volume)> { + pub fn find_volume_mut( + &mut self, + vid: VolumeId, + ) -> Option<(usize, &mut crate::storage::volume::Volume)> { for (i, loc) in self.locations.iter_mut().enumerate() { if let Some(v) = loc.find_volume_mut(vid) { return Some((i, v)); @@ -145,8 +161,12 @@ impl Store { })?; self.locations[loc_idx].create_volume( - vid, collection, self.needle_map_kind, - replica_placement, ttl, preallocate, + vid, + collection, + self.needle_map_kind, + replica_placement, + ttl, + preallocate, ) } @@ -185,15 +205,10 @@ impl Store { if &loc.disk_type != &disk_type { continue; } - let base = crate::storage::volume::volume_file_name( - &loc.directory, collection, vid, - ); + let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid); let dat_path = format!("{}.dat", base); if std::path::Path::new(&dat_path).exists() { - return loc.create_volume( - vid, collection, self.needle_map_kind, - None, None, 0, - ); + return loc.create_volume(vid, collection, self.needle_map_kind, None, None, 0); } } Err(VolumeError::Io(io::Error::new( @@ -211,14 +226,22 @@ impl Store { } /// Read a needle from a volume, optionally reading deleted needles. - pub fn read_volume_needle_opt(&self, vid: VolumeId, n: &mut Needle, read_deleted: bool) -> Result { + pub fn read_volume_needle_opt( + &self, + vid: VolumeId, + n: &mut Needle, + read_deleted: bool, + ) -> Result { let (_, vol) = self.find_volume(vid).ok_or(VolumeError::NotFound)?; vol.read_needle_opt(n, read_deleted) } /// Read needle metadata and return streaming info for large file reads. pub fn read_volume_needle_stream_info( - &self, vid: VolumeId, n: &mut Needle, read_deleted: bool, + &self, + vid: VolumeId, + n: &mut Needle, + read_deleted: bool, ) -> Result { let (_, vol) = self.find_volume(vid).ok_or(VolumeError::NotFound)?; vol.read_needle_stream_info(n, read_deleted) @@ -226,12 +249,20 @@ impl Store { /// Write a needle to a volume. pub fn write_volume_needle( - &mut self, vid: VolumeId, n: &mut Needle, + &mut self, + vid: VolumeId, + n: &mut Needle, ) -> Result<(u64, Size, bool), VolumeError> { // Check disk space on the location containing this volume. // We do this before the mutable borrow to avoid borrow conflicts. - let loc_idx = self.find_volume(vid).map(|(i, _)| i).ok_or(VolumeError::NotFound)?; - if self.locations[loc_idx].is_disk_space_low.load(Ordering::Relaxed) { + let loc_idx = self + .find_volume(vid) + .map(|(i, _)| i) + .ok_or(VolumeError::NotFound)?; + if self.locations[loc_idx] + .is_disk_space_low + .load(Ordering::Relaxed) + { return Err(VolumeError::ReadOnly); } @@ -241,7 +272,9 @@ impl Store { /// Delete a needle from a volume. pub fn delete_volume_needle( - &mut self, vid: VolumeId, n: &mut Needle, + &mut self, + vid: VolumeId, + n: &mut Needle, ) -> Result { let (_, vol) = self.find_volume_mut(vid).ok_or(VolumeError::NotFound)?; vol.delete_needle(n) @@ -265,21 +298,25 @@ impl Store { /// Total max volumes across all locations. pub fn max_volume_count(&self) -> i32 { - self.locations.iter() + self.locations + .iter() .map(|loc| loc.max_volume_count.load(Ordering::Relaxed)) .sum() } /// Free volume slots across all locations. pub fn free_volume_count(&self) -> i32 { - self.locations.iter() + self.locations + .iter() .map(|loc| loc.free_volume_count()) .sum() } /// All volume IDs across all locations. pub fn all_volume_ids(&self) -> Vec { - let mut ids: Vec = self.locations.iter() + let mut ids: Vec = self + .locations + .iter() .flat_map(|loc| loc.volume_ids()) .collect(); ids.sort(); @@ -297,15 +334,17 @@ impl Store { shard_ids: &[u32], ) -> Result<(), VolumeError> { // Find the directory where the EC files live - let dir = self.find_ec_dir(vid, collection) - .ok_or_else(|| VolumeError::Io(io::Error::new( + let dir = self.find_ec_dir(vid, collection).ok_or_else(|| { + VolumeError::Io(io::Error::new( io::ErrorKind::NotFound, format!("ec volume {} shards not found on disk", vid), - )))?; + )) + })?; - let ec_vol = self.ec_volumes.entry(vid).or_insert_with(|| { - EcVolume::new(&dir, &dir, collection, vid).unwrap() - }); + let ec_vol = self + .ec_volumes + .entry(vid) + .or_insert_with(|| EcVolume::new(&dir, &dir, collection, vid).unwrap()); for &shard_id in shard_ids { let shard = EcVolumeShard::new(&dir, collection, vid, shard_id as u8); @@ -316,11 +355,7 @@ impl Store { } /// Unmount EC shards for a volume. - pub fn unmount_ec_shards( - &mut self, - vid: VolumeId, - shard_ids: &[u32], - ) { + pub fn unmount_ec_shards(&mut self, vid: VolumeId, shard_ids: &[u32]) { if let Some(ec_vol) = self.ec_volumes.get_mut(&vid) { for &shard_id in shard_ids { ec_vol.remove_shard(shard_id as u8); @@ -333,12 +368,7 @@ impl Store { } /// Delete EC shard files from disk. - pub fn delete_ec_shards( - &mut self, - vid: VolumeId, - collection: &str, - shard_ids: &[u32], - ) { + pub fn delete_ec_shards(&mut self, vid: VolumeId, collection: &str, shard_ids: &[u32]) { // Delete shard files from disk for loc in &self.locations { for &shard_id in shard_ids { @@ -355,7 +385,8 @@ impl Store { let all_gone = self.check_all_ec_shards_deleted(vid, collection); if all_gone { for loc in &self.locations { - let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid); + let base = + crate::storage::volume::volume_file_name(&loc.directory, collection, vid); let _ = std::fs::remove_file(format!("{}.ecx", base)); let _ = std::fs::remove_file(format!("{}.ecj", base)); } @@ -388,7 +419,12 @@ impl Store { } /// Find the directory containing a specific EC shard file. - pub fn find_ec_shard_dir(&self, vid: VolumeId, collection: &str, shard_id: u8) -> Option { + pub fn find_ec_shard_dir( + &self, + vid: VolumeId, + collection: &str, + shard_id: u8, + ) -> Option { for loc in &self.locations { let shard = EcVolumeShard::new(&loc.directory, collection, vid, shard_id); if std::path::Path::new(&shard.file_name()).exists() { @@ -405,7 +441,10 @@ impl Store { if let Some((_, v)) = self.find_volume(vid) { Ok(v.garbage_level()) } else { - Err(format!("volume id {} is not found during check compact", vid.0)) + Err(format!( + "volume id {} is not found during check compact", + vid.0 + )) } } @@ -437,7 +476,10 @@ impl Store { let volume_size = v.dat_file_size().unwrap_or(0); Ok((is_read_only, volume_size)) } else { - Err(format!("volume id {} is not found during commit compact", vid.0)) + Err(format!( + "volume id {} is not found during commit compact", + vid.0 + )) } } @@ -447,7 +489,10 @@ impl Store { v.cleanup_compact() .map_err(|e| format!("cleanup volume {}: {}", vid.0, e)) } else { - Err(format!("volume id {} is not found during cleaning up", vid.0)) + Err(format!( + "volume id {} is not found during cleaning up", + vid.0 + )) } } @@ -475,7 +520,16 @@ mod tests { fn make_test_store(dirs: &[&str]) -> Store { let mut store = Store::new(NeedleMapKind::InMemory); for dir in dirs { - store.add_location(dir, dir, 10, DiskType::HardDrive, MinFreeSpace::Percent(1.0)).unwrap(); + store + .add_location( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); } store } @@ -486,7 +540,16 @@ mod tests { let dir = tmp.path().to_str().unwrap(); let mut store = Store::new(NeedleMapKind::InMemory); - store.add_location(dir, dir, 10, DiskType::HardDrive, MinFreeSpace::Percent(1.0)).unwrap(); + store + .add_location( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); assert_eq!(store.locations.len(), 1); assert_eq!(store.max_volume_count(), 10); } @@ -497,7 +560,9 @@ mod tests { let dir = tmp.path().to_str().unwrap(); let mut store = make_test_store(&[dir]); - store.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive).unwrap(); + store + .add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive) + .unwrap(); assert!(store.has_volume(VolumeId(1))); assert!(!store.has_volume(VolumeId(2))); assert_eq!(store.total_volume_count(), 1); @@ -508,7 +573,9 @@ mod tests { let tmp = TempDir::new().unwrap(); let dir = tmp.path().to_str().unwrap(); let mut store = make_test_store(&[dir]); - store.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive).unwrap(); + store + .add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive) + .unwrap(); // Write let mut n = Needle { @@ -523,13 +590,20 @@ mod tests { assert!(offset > 0); // Read - let mut read_n = Needle { id: NeedleId(1), ..Needle::default() }; + let mut read_n = Needle { + id: NeedleId(1), + ..Needle::default() + }; let count = store.read_volume_needle(VolumeId(1), &mut read_n).unwrap(); assert_eq!(count, 11); assert_eq!(read_n.data, b"hello store"); // Delete - let mut del_n = Needle { id: NeedleId(1), cookie: Cookie(0xaa), ..Needle::default() }; + let mut del_n = Needle { + id: NeedleId(1), + cookie: Cookie(0xaa), + ..Needle::default() + }; let deleted = store.delete_volume_needle(VolumeId(1), &mut del_n).unwrap(); assert!(deleted.0 > 0); } @@ -542,13 +616,35 @@ mod tests { let dir2 = tmp2.path().to_str().unwrap(); let mut store = Store::new(NeedleMapKind::InMemory); - store.add_location(dir1, dir1, 5, DiskType::HardDrive, MinFreeSpace::Percent(1.0)).unwrap(); - store.add_location(dir2, dir2, 5, DiskType::HardDrive, MinFreeSpace::Percent(1.0)).unwrap(); + store + .add_location( + dir1, + dir1, + 5, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + store + .add_location( + dir2, + dir2, + 5, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); assert_eq!(store.max_volume_count(), 10); // Add volumes — should go to location with fewest volumes - store.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive).unwrap(); - store.add_volume(VolumeId(2), "", None, None, 0, DiskType::HardDrive).unwrap(); + store + .add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive) + .unwrap(); + store + .add_volume(VolumeId(2), "", None, None, 0, DiskType::HardDrive) + .unwrap(); assert_eq!(store.total_volume_count(), 2); // Both locations should have 1 volume each (load-balanced) @@ -562,9 +658,15 @@ mod tests { let dir = tmp.path().to_str().unwrap(); let mut store = make_test_store(&[dir]); - store.add_volume(VolumeId(1), "pics", None, None, 0, DiskType::HardDrive).unwrap(); - store.add_volume(VolumeId(2), "pics", None, None, 0, DiskType::HardDrive).unwrap(); - store.add_volume(VolumeId(3), "docs", None, None, 0, DiskType::HardDrive).unwrap(); + store + .add_volume(VolumeId(1), "pics", None, None, 0, DiskType::HardDrive) + .unwrap(); + store + .add_volume(VolumeId(2), "pics", None, None, 0, DiskType::HardDrive) + .unwrap(); + store + .add_volume(VolumeId(3), "docs", None, None, 0, DiskType::HardDrive) + .unwrap(); assert_eq!(store.total_volume_count(), 3); store.delete_collection("pics"); @@ -578,7 +680,10 @@ mod tests { let dir = tmp.path().to_str().unwrap(); let store = make_test_store(&[dir]); - let mut n = Needle { id: NeedleId(1), ..Needle::default() }; + let mut n = Needle { + id: NeedleId(1), + ..Needle::default() + }; let err = store.read_volume_needle(VolumeId(99), &mut n); assert!(matches!(err, Err(VolumeError::NotFound))); } diff --git a/seaweed-volume/tests/http_integration.rs b/seaweed-volume/tests/http_integration.rs index 0bc1cae8f..6130248ed 100644 --- a/seaweed-volume/tests/http_integration.rs +++ b/seaweed-volume/tests/http_integration.rs @@ -10,7 +10,7 @@ use axum::http::{Request, StatusCode}; use tower::ServiceExt; // for `oneshot` use seaweed_volume::security::{Guard, SigningKey}; -use seaweed_volume::server::volume_server::{VolumeServerState, build_admin_router}; +use seaweed_volume::server::volume_server::{build_admin_router, VolumeServerState}; use seaweed_volume::storage::needle_map::NeedleMapKind; use seaweed_volume::storage::store::Store; use seaweed_volume::storage::types::{DiskType, VolumeId}; @@ -25,7 +25,14 @@ fn test_state() -> (Arc, TempDir) { let mut store = Store::new(NeedleMapKind::InMemory); store - .add_location(dir, dir, 10, DiskType::HardDrive, seaweed_volume::config::MinFreeSpace::Percent(1.0)) + .add_location( + dir, + dir, + 10, + DiskType::HardDrive, + seaweed_volume::config::MinFreeSpace::Percent(1.0), + Vec::new(), + ) .expect("failed to add location"); store .add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive) @@ -145,16 +152,10 @@ async fn status_returns_json_with_version_and_volumes() { serde_json::from_slice(&body).expect("response is not valid JSON"); assert!(json.get("Version").is_some(), "missing 'Version' field"); - assert!( - json["Version"].is_string(), - "'Version' should be a string" - ); + assert!(json["Version"].is_string(), "'Version' should be a string"); assert!(json.get("Volumes").is_some(), "missing 'Volumes' field"); - assert!( - json["Volumes"].is_array(), - "'Volumes' should be an array" - ); + assert!(json["Volumes"].is_array(), "'Volumes' should be an array"); // We created one volume in test_state, so the array should have one entry let volumes = json["Volumes"].as_array().unwrap(); @@ -201,12 +202,7 @@ async fn write_then_read_needle() { // --- GET (read back) --- let app = build_admin_router(state.clone()); let response = app - .oneshot( - Request::builder() - .uri(uri) - .body(Body::empty()) - .unwrap(), - ) + .oneshot(Request::builder().uri(uri).body(Body::empty()).unwrap()) .await .unwrap(); @@ -261,12 +257,7 @@ async fn delete_then_get_returns_404() { // GET should now return 404 let app = build_admin_router(state.clone()); let response = app - .oneshot( - Request::builder() - .uri(uri) - .body(Body::empty()) - .unwrap(), - ) + .oneshot(Request::builder().uri(uri).body(Body::empty()).unwrap()) .await .unwrap(); assert_eq!( @@ -325,7 +316,11 @@ async fn head_returns_headers_without_body() { .unwrap() .parse() .expect("Content-Length should be a number"); - assert_eq!(len, payload.len(), "Content-Length should match payload size"); + assert_eq!( + len, + payload.len(), + "Content-Length should match payload size" + ); // Body should be empty for HEAD let body = body_bytes(response).await;