diff --git a/seaweed-volume/src/storage/disk_location.rs b/seaweed-volume/src/storage/disk_location.rs new file mode 100644 index 000000000..035d6a936 --- /dev/null +++ b/seaweed-volume/src/storage/disk_location.rs @@ -0,0 +1,325 @@ +//! DiskLocation: manages volumes on a single disk/directory. +//! +//! Each DiskLocation represents one storage directory containing .dat + .idx files. +//! A Store contains one or more DiskLocations (one per configured directory). +//! Matches Go's storage/disk_location.go. + +use std::collections::HashMap; +use std::fs; +use std::io; +use std::path::Path; +use std::sync::atomic::{AtomicI32, AtomicU64, Ordering}; + +use crate::storage::needle_map::NeedleMapKind; +use crate::storage::super_block::ReplicaPlacement; +use crate::storage::types::*; +use crate::storage::volume::{Volume, VolumeError}; + +/// A single disk location managing volumes in one directory. +pub struct DiskLocation { + pub directory: String, + pub idx_directory: String, + pub disk_type: DiskType, + pub max_volume_count: AtomicI32, + pub original_max_volume_count: i32, + volumes: HashMap, + pub is_disk_space_low: bool, + pub available_space: AtomicU64, +} + +impl DiskLocation { + pub fn new(directory: &str, idx_directory: &str, max_volume_count: i32, disk_type: DiskType) -> Self { + let idx_dir = if idx_directory.is_empty() { + directory.to_string() + } else { + idx_directory.to_string() + }; + + DiskLocation { + directory: directory.to_string(), + idx_directory: idx_dir, + disk_type, + max_volume_count: AtomicI32::new(max_volume_count), + original_max_volume_count: max_volume_count, + volumes: HashMap::new(), + is_disk_space_low: false, + available_space: AtomicU64::new(0), + } + } + + // ---- Volume management ---- + + /// Load existing volumes from the directory. + pub fn load_existing_volumes(&mut self, needle_map_kind: NeedleMapKind) -> io::Result<()> { + // Ensure directory exists + fs::create_dir_all(&self.directory)?; + if self.directory != self.idx_directory { + fs::create_dir_all(&self.idx_directory)?; + } + + // Scan for .dat files + let entries = fs::read_dir(&self.directory)?; + let mut dat_files: Vec<(String, VolumeId)> = Vec::new(); + + for entry in entries { + let entry = entry?; + let name = entry.file_name().into_string().unwrap_or_default(); + if name.ends_with(".dat") { + if let Some((collection, vid)) = parse_volume_filename(&name) { + dat_files.push((collection, vid)); + } + } + } + + for (collection, vid) in dat_files { + match Volume::new( + &self.directory, + &self.idx_directory, + &collection, + vid, + needle_map_kind, + None, // replica placement read from superblock + None, // TTL read from superblock + 0, // no preallocate on load + Version::current(), + ) { + Ok(v) => { + self.volumes.insert(vid, v); + } + Err(e) => { + eprintln!("Error loading volume {}: {}", vid, e); + } + } + } + + Ok(()) + } + + /// Find a volume by ID. + pub fn find_volume(&self, vid: VolumeId) -> Option<&Volume> { + self.volumes.get(&vid) + } + + /// Find a volume by ID (mutable). + pub fn find_volume_mut(&mut self, vid: VolumeId) -> Option<&mut Volume> { + self.volumes.get_mut(&vid) + } + + /// Add a volume to this location. + pub fn set_volume(&mut self, vid: VolumeId, volume: Volume) { + self.volumes.insert(vid, volume); + } + + /// Create a new volume in this location. + pub fn create_volume( + &mut self, + vid: VolumeId, + collection: &str, + needle_map_kind: NeedleMapKind, + replica_placement: Option, + ttl: Option, + preallocate: u64, + ) -> Result<(), VolumeError> { + let v = Volume::new( + &self.directory, + &self.idx_directory, + collection, + vid, + needle_map_kind, + replica_placement, + ttl, + preallocate, + Version::current(), + )?; + self.volumes.insert(vid, v); + Ok(()) + } + + /// Remove and close a volume. + pub fn unload_volume(&mut self, vid: VolumeId) -> Option { + if let Some(mut v) = self.volumes.remove(&vid) { + v.close(); + Some(v) + } else { + None + } + } + + /// Remove, close, and delete all files for a volume. + pub fn delete_volume(&mut self, vid: VolumeId) -> Result<(), VolumeError> { + if let Some(mut v) = self.volumes.remove(&vid) { + v.destroy()?; + Ok(()) + } else { + Err(VolumeError::NotFound) + } + } + + /// Delete all volumes in a collection. + pub fn delete_collection(&mut self, collection: &str) { + let vids: Vec = self + .volumes + .iter() + .filter(|(_, v)| v.collection == collection) + .map(|(vid, _)| *vid) + .collect(); + + for vid in vids { + if let Some(mut v) = self.volumes.remove(&vid) { + let _ = v.destroy(); + } + } + } + + // ---- Metrics ---- + + /// Number of volumes on this disk. + pub fn volumes_len(&self) -> usize { + self.volumes.len() + } + + /// Get all volume IDs, sorted. + pub fn volume_ids(&self) -> Vec { + let mut ids: Vec = self.volumes.keys().copied().collect(); + ids.sort(); + ids + } + + /// Number of free volume slots. + pub fn free_volume_count(&self) -> i32 { + let max = self.max_volume_count.load(Ordering::Relaxed); + let used = self.volumes.len() as i32; + if max > used { + max - used + } else { + 0 + } + } + + /// Iterate over all volumes. + pub fn volumes(&self) -> impl Iterator { + self.volumes.iter() + } + + /// Iterate over all volumes (mutable). + pub fn volumes_mut(&mut self) -> impl Iterator { + self.volumes.iter_mut() + } + + /// Close all volumes. + pub fn close(&mut self) { + for (_, v) in self.volumes.iter_mut() { + v.close(); + } + self.volumes.clear(); + } +} + +/// Parse a volume filename like "collection_42.dat" or "42.dat" into (collection, VolumeId). +fn parse_volume_filename(filename: &str) -> Option<(String, VolumeId)> { + let stem = filename.strip_suffix(".dat")?; + if let Some(pos) = stem.rfind('_') { + let collection = &stem[..pos]; + let id_str = &stem[pos + 1..]; + let id: u32 = id_str.parse().ok()?; + Some((collection.to_string(), VolumeId(id))) + } else { + let id: u32 = stem.parse().ok()?; + Some((String::new(), VolumeId(id))) + } +} + +// ============================================================================ +// Tests +// ============================================================================ + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[test] + fn test_parse_volume_filename() { + assert_eq!( + parse_volume_filename("42.dat"), + Some(("".to_string(), VolumeId(42))) + ); + assert_eq!( + parse_volume_filename("pics_7.dat"), + Some(("pics".to_string(), VolumeId(7))) + ); + assert_eq!(parse_volume_filename("notadat.idx"), None); + assert_eq!(parse_volume_filename("bad.dat"), None); + } + + #[test] + fn test_disk_location_create_volume() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive); + + loc.create_volume( + VolumeId(1), "", NeedleMapKind::InMemory, + None, None, 0, + ).unwrap(); + + assert_eq!(loc.volumes_len(), 1); + assert!(loc.find_volume(VolumeId(1)).is_some()); + assert!(loc.find_volume(VolumeId(99)).is_none()); + assert_eq!(loc.free_volume_count(), 9); + } + + #[test] + fn test_disk_location_load_existing() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + + // Create volumes + { + let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive); + loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0).unwrap(); + loc.create_volume(VolumeId(2), "test", NeedleMapKind::InMemory, None, None, 0).unwrap(); + loc.close(); + } + + // Reload + let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive); + loc.load_existing_volumes(NeedleMapKind::InMemory).unwrap(); + assert_eq!(loc.volumes_len(), 2); + + let ids = loc.volume_ids(); + assert!(ids.contains(&VolumeId(1))); + assert!(ids.contains(&VolumeId(2))); + } + + #[test] + fn test_disk_location_delete_volume() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive); + + loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0).unwrap(); + loc.create_volume(VolumeId(2), "", NeedleMapKind::InMemory, None, None, 0).unwrap(); + assert_eq!(loc.volumes_len(), 2); + + loc.delete_volume(VolumeId(1)).unwrap(); + assert_eq!(loc.volumes_len(), 1); + assert!(loc.find_volume(VolumeId(1)).is_none()); + } + + #[test] + fn test_disk_location_delete_collection() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive); + + loc.create_volume(VolumeId(1), "pics", NeedleMapKind::InMemory, None, None, 0).unwrap(); + loc.create_volume(VolumeId(2), "pics", NeedleMapKind::InMemory, None, None, 0).unwrap(); + loc.create_volume(VolumeId(3), "docs", NeedleMapKind::InMemory, None, None, 0).unwrap(); + assert_eq!(loc.volumes_len(), 3); + + loc.delete_collection("pics"); + assert_eq!(loc.volumes_len(), 1); + assert!(loc.find_volume(VolumeId(3)).is_some()); + } +} diff --git a/seaweed-volume/src/storage/mod.rs b/seaweed-volume/src/storage/mod.rs index e8c78f36d..151b6055c 100644 --- a/seaweed-volume/src/storage/mod.rs +++ b/seaweed-volume/src/storage/mod.rs @@ -4,3 +4,5 @@ pub mod super_block; pub mod idx; pub mod needle_map; pub mod volume; +pub mod disk_location; +pub mod store; diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs new file mode 100644 index 000000000..06c3dfac8 --- /dev/null +++ b/seaweed-volume/src/storage/store.rs @@ -0,0 +1,373 @@ +//! Store: the top-level storage manager for a volume server. +//! +//! A Store manages multiple DiskLocations (one per configured directory). +//! It coordinates volume placement, lookup, and lifecycle operations. +//! Matches Go's storage/store.go. + +use std::io; +use std::sync::atomic::{AtomicU64, Ordering}; + +use crate::storage::disk_location::DiskLocation; +use crate::storage::needle::needle::Needle; +use crate::storage::needle_map::NeedleMapKind; +use crate::storage::super_block::ReplicaPlacement; +use crate::storage::types::*; +use crate::storage::volume::VolumeError; + +/// Top-level storage manager containing all disk locations and their volumes. +pub struct Store { + pub locations: Vec, + pub needle_map_kind: NeedleMapKind, + pub volume_size_limit: AtomicU64, + pub ip: String, + pub port: u16, + pub grpc_port: u16, + pub public_url: String, + pub data_center: String, + pub rack: String, +} + +impl Store { + pub fn new(needle_map_kind: NeedleMapKind) -> Self { + Store { + locations: Vec::new(), + needle_map_kind, + volume_size_limit: AtomicU64::new(0), + ip: String::new(), + port: 0, + grpc_port: 0, + public_url: String::new(), + data_center: String::new(), + rack: String::new(), + } + } + + /// Add a disk location and load existing volumes from it. + pub fn add_location( + &mut self, + directory: &str, + idx_directory: &str, + max_volume_count: i32, + disk_type: DiskType, + ) -> io::Result<()> { + let mut loc = DiskLocation::new(directory, idx_directory, max_volume_count, disk_type); + loc.load_existing_volumes(self.needle_map_kind)?; + self.locations.push(loc); + Ok(()) + } + + // ---- Volume lookup ---- + + /// Find which location contains a volume. + pub fn find_volume(&self, vid: VolumeId) -> Option<(usize, &crate::storage::volume::Volume)> { + for (i, loc) in self.locations.iter().enumerate() { + if let Some(v) = loc.find_volume(vid) { + return Some((i, v)); + } + } + None + } + + /// Find which location contains a volume (mutable). + fn find_volume_mut(&mut self, vid: VolumeId) -> Option<(usize, &mut crate::storage::volume::Volume)> { + for (i, loc) in self.locations.iter_mut().enumerate() { + if let Some(v) = loc.find_volume_mut(vid) { + return Some((i, v)); + } + } + None + } + + /// Check if a volume exists. + pub fn has_volume(&self, vid: VolumeId) -> bool { + self.find_volume(vid).is_some() + } + + // ---- Volume lifecycle ---- + + /// Find the location with fewest volumes (load-balance) of the given disk type. + fn find_free_location(&self, disk_type: &DiskType) -> Option { + let mut best: Option<(usize, usize)> = None; // (index, volume_count) + for (i, loc) in self.locations.iter().enumerate() { + if &loc.disk_type != disk_type { + continue; + } + if loc.free_volume_count() <= 0 { + continue; + } + if loc.is_disk_space_low { + continue; + } + let count = loc.volumes_len(); + if best.is_none() || count < best.unwrap().1 { + best = Some((i, count)); + } + } + best.map(|(i, _)| i) + } + + /// Create a new volume, placing it on the location with the most free space. + pub fn add_volume( + &mut self, + vid: VolumeId, + collection: &str, + replica_placement: Option, + ttl: Option, + preallocate: u64, + disk_type: DiskType, + ) -> Result<(), VolumeError> { + let loc_idx = self.find_free_location(&disk_type).ok_or_else(|| { + VolumeError::Io(io::Error::new( + io::ErrorKind::Other, + format!("no free location for disk type {:?}", disk_type), + )) + })?; + + self.locations[loc_idx].create_volume( + vid, collection, self.needle_map_kind, + replica_placement, ttl, preallocate, + ) + } + + /// Delete a volume from any location. + pub fn delete_volume(&mut self, vid: VolumeId) -> Result<(), VolumeError> { + for loc in &mut self.locations { + if loc.find_volume(vid).is_some() { + return loc.delete_volume(vid); + } + } + Err(VolumeError::NotFound) + } + + /// Unload (unmount) a volume without deleting its files. + pub fn unmount_volume(&mut self, vid: VolumeId) -> bool { + for loc in &mut self.locations { + if loc.unload_volume(vid).is_some() { + return true; + } + } + false + } + + /// Mount a volume from an existing .dat file. + pub fn mount_volume( + &mut self, + vid: VolumeId, + collection: &str, + disk_type: DiskType, + ) -> Result<(), VolumeError> { + // Find the location where the .dat file exists + for loc in &mut self.locations { + if &loc.disk_type != &disk_type { + continue; + } + let base = crate::storage::volume::volume_file_name( + &loc.directory, collection, vid, + ); + let dat_path = format!("{}.dat", base); + if std::path::Path::new(&dat_path).exists() { + return loc.create_volume( + vid, collection, self.needle_map_kind, + None, None, 0, + ); + } + } + Err(VolumeError::Io(io::Error::new( + io::ErrorKind::NotFound, + format!("volume {} not found on any disk", vid), + ))) + } + + // ---- Read / Write / Delete ---- + + /// Read a needle from a volume. + pub fn read_volume_needle(&self, vid: VolumeId, n: &mut Needle) -> Result { + let (_, vol) = self.find_volume(vid).ok_or(VolumeError::NotFound)?; + vol.read_needle(n) + } + + /// Write a needle to a volume. + pub fn write_volume_needle( + &mut self, vid: VolumeId, n: &mut Needle, + ) -> Result<(u64, Size, bool), VolumeError> { + let (_, vol) = self.find_volume_mut(vid).ok_or(VolumeError::NotFound)?; + vol.write_needle(n, true) + } + + /// Delete a needle from a volume. + pub fn delete_volume_needle( + &mut self, vid: VolumeId, n: &mut Needle, + ) -> Result { + let (_, vol) = self.find_volume_mut(vid).ok_or(VolumeError::NotFound)?; + vol.delete_needle(n) + } + + // ---- Collection operations ---- + + /// Delete all volumes in a collection. + pub fn delete_collection(&mut self, collection: &str) { + for loc in &mut self.locations { + loc.delete_collection(collection); + } + } + + // ---- Metrics ---- + + /// Total volume count across all locations. + pub fn total_volume_count(&self) -> usize { + self.locations.iter().map(|loc| loc.volumes_len()).sum() + } + + /// Total max volumes across all locations. + pub fn max_volume_count(&self) -> i32 { + self.locations.iter() + .map(|loc| loc.max_volume_count.load(Ordering::Relaxed)) + .sum() + } + + /// Free volume slots across all locations. + pub fn free_volume_count(&self) -> i32 { + self.locations.iter() + .map(|loc| loc.free_volume_count()) + .sum() + } + + /// All volume IDs across all locations. + pub fn all_volume_ids(&self) -> Vec { + let mut ids: Vec = self.locations.iter() + .flat_map(|loc| loc.volume_ids()) + .collect(); + ids.sort(); + ids.dedup(); + ids + } + + /// Close all locations and their volumes. + pub fn close(&mut self) { + for loc in &mut self.locations { + loc.close(); + } + } +} + +// ============================================================================ +// Tests +// ============================================================================ + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage::needle::needle::Needle; + use tempfile::TempDir; + + fn make_test_store(dirs: &[&str]) -> Store { + let mut store = Store::new(NeedleMapKind::InMemory); + for dir in dirs { + store.add_location(dir, dir, 10, DiskType::HardDrive).unwrap(); + } + store + } + + #[test] + fn test_store_add_location() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store.add_location(dir, dir, 10, DiskType::HardDrive).unwrap(); + assert_eq!(store.locations.len(), 1); + assert_eq!(store.max_volume_count(), 10); + } + + #[test] + fn test_store_add_volume() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let mut store = make_test_store(&[dir]); + + store.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive).unwrap(); + assert!(store.has_volume(VolumeId(1))); + assert!(!store.has_volume(VolumeId(2))); + assert_eq!(store.total_volume_count(), 1); + } + + #[test] + fn test_store_read_write_delete() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let mut store = make_test_store(&[dir]); + store.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive).unwrap(); + + // Write + let mut n = Needle { + id: NeedleId(1), + cookie: Cookie(0xaa), + data: b"hello store".to_vec(), + data_size: 11, + ..Needle::default() + }; + let (offset, size, unchanged) = store.write_volume_needle(VolumeId(1), &mut n).unwrap(); + assert!(!unchanged); + assert!(offset > 0); + + // Read + let mut read_n = Needle { id: NeedleId(1), ..Needle::default() }; + let count = store.read_volume_needle(VolumeId(1), &mut read_n).unwrap(); + assert_eq!(count, 11); + assert_eq!(read_n.data, b"hello store"); + + // Delete + let mut del_n = Needle { id: NeedleId(1), cookie: Cookie(0xaa), ..Needle::default() }; + let deleted = store.delete_volume_needle(VolumeId(1), &mut del_n).unwrap(); + assert!(deleted.0 > 0); + } + + #[test] + fn test_store_multi_location() { + let tmp1 = TempDir::new().unwrap(); + let tmp2 = TempDir::new().unwrap(); + let dir1 = tmp1.path().to_str().unwrap(); + let dir2 = tmp2.path().to_str().unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store.add_location(dir1, dir1, 5, DiskType::HardDrive).unwrap(); + store.add_location(dir2, dir2, 5, DiskType::HardDrive).unwrap(); + assert_eq!(store.max_volume_count(), 10); + + // Add volumes — should go to location with fewest volumes + store.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive).unwrap(); + store.add_volume(VolumeId(2), "", None, None, 0, DiskType::HardDrive).unwrap(); + + assert_eq!(store.total_volume_count(), 2); + // Both locations should have 1 volume each (load-balanced) + assert_eq!(store.locations[0].volumes_len(), 1); + assert_eq!(store.locations[1].volumes_len(), 1); + } + + #[test] + fn test_store_delete_collection() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let mut store = make_test_store(&[dir]); + + store.add_volume(VolumeId(1), "pics", None, None, 0, DiskType::HardDrive).unwrap(); + store.add_volume(VolumeId(2), "pics", None, None, 0, DiskType::HardDrive).unwrap(); + store.add_volume(VolumeId(3), "docs", None, None, 0, DiskType::HardDrive).unwrap(); + assert_eq!(store.total_volume_count(), 3); + + store.delete_collection("pics"); + assert_eq!(store.total_volume_count(), 1); + assert!(store.has_volume(VolumeId(3))); + } + + #[test] + fn test_store_volume_not_found() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let store = make_test_store(&[dir]); + + let mut n = Needle { id: NeedleId(1), ..Needle::default() }; + let err = store.read_volume_needle(VolumeId(99), &mut n); + assert!(matches!(err, Err(VolumeError::NotFound))); + } +}