Browse Source

Add DiskLocation and Store for multi-disk volume management

DiskLocation manages volumes in a single directory with load/create/
delete/collection operations and free-space tracking. Store coordinates
multiple DiskLocations with load-balanced volume placement, delegated
read/write/delete, and mount/unmount lifecycle. Matches Go's
disk_location.go and store.go. 11 unit tests covering volume CRUD,
multi-location balancing, and collection management.
rust-volume-server
Chris Lu 4 days ago
parent
commit
b72e68b99f
  1. 325
      seaweed-volume/src/storage/disk_location.rs
  2. 2
      seaweed-volume/src/storage/mod.rs
  3. 373
      seaweed-volume/src/storage/store.rs

325
seaweed-volume/src/storage/disk_location.rs

@ -0,0 +1,325 @@
//! DiskLocation: manages volumes on a single disk/directory.
//!
//! Each DiskLocation represents one storage directory containing .dat + .idx files.
//! A Store contains one or more DiskLocations (one per configured directory).
//! Matches Go's storage/disk_location.go.
use std::collections::HashMap;
use std::fs;
use std::io;
use std::path::Path;
use std::sync::atomic::{AtomicI32, AtomicU64, Ordering};
use crate::storage::needle_map::NeedleMapKind;
use crate::storage::super_block::ReplicaPlacement;
use crate::storage::types::*;
use crate::storage::volume::{Volume, VolumeError};
/// A single disk location managing volumes in one directory.
pub struct DiskLocation {
pub directory: String,
pub idx_directory: String,
pub disk_type: DiskType,
pub max_volume_count: AtomicI32,
pub original_max_volume_count: i32,
volumes: HashMap<VolumeId, Volume>,
pub is_disk_space_low: bool,
pub available_space: AtomicU64,
}
impl DiskLocation {
pub fn new(directory: &str, idx_directory: &str, max_volume_count: i32, disk_type: DiskType) -> Self {
let idx_dir = if idx_directory.is_empty() {
directory.to_string()
} else {
idx_directory.to_string()
};
DiskLocation {
directory: directory.to_string(),
idx_directory: idx_dir,
disk_type,
max_volume_count: AtomicI32::new(max_volume_count),
original_max_volume_count: max_volume_count,
volumes: HashMap::new(),
is_disk_space_low: false,
available_space: AtomicU64::new(0),
}
}
// ---- Volume management ----
/// Load existing volumes from the directory.
pub fn load_existing_volumes(&mut self, needle_map_kind: NeedleMapKind) -> io::Result<()> {
// Ensure directory exists
fs::create_dir_all(&self.directory)?;
if self.directory != self.idx_directory {
fs::create_dir_all(&self.idx_directory)?;
}
// Scan for .dat files
let entries = fs::read_dir(&self.directory)?;
let mut dat_files: Vec<(String, VolumeId)> = Vec::new();
for entry in entries {
let entry = entry?;
let name = entry.file_name().into_string().unwrap_or_default();
if name.ends_with(".dat") {
if let Some((collection, vid)) = parse_volume_filename(&name) {
dat_files.push((collection, vid));
}
}
}
for (collection, vid) in dat_files {
match Volume::new(
&self.directory,
&self.idx_directory,
&collection,
vid,
needle_map_kind,
None, // replica placement read from superblock
None, // TTL read from superblock
0, // no preallocate on load
Version::current(),
) {
Ok(v) => {
self.volumes.insert(vid, v);
}
Err(e) => {
eprintln!("Error loading volume {}: {}", vid, e);
}
}
}
Ok(())
}
/// Find a volume by ID.
pub fn find_volume(&self, vid: VolumeId) -> Option<&Volume> {
self.volumes.get(&vid)
}
/// Find a volume by ID (mutable).
pub fn find_volume_mut(&mut self, vid: VolumeId) -> Option<&mut Volume> {
self.volumes.get_mut(&vid)
}
/// Add a volume to this location.
pub fn set_volume(&mut self, vid: VolumeId, volume: Volume) {
self.volumes.insert(vid, volume);
}
/// Create a new volume in this location.
pub fn create_volume(
&mut self,
vid: VolumeId,
collection: &str,
needle_map_kind: NeedleMapKind,
replica_placement: Option<ReplicaPlacement>,
ttl: Option<crate::storage::needle::ttl::TTL>,
preallocate: u64,
) -> Result<(), VolumeError> {
let v = Volume::new(
&self.directory,
&self.idx_directory,
collection,
vid,
needle_map_kind,
replica_placement,
ttl,
preallocate,
Version::current(),
)?;
self.volumes.insert(vid, v);
Ok(())
}
/// Remove and close a volume.
pub fn unload_volume(&mut self, vid: VolumeId) -> Option<Volume> {
if let Some(mut v) = self.volumes.remove(&vid) {
v.close();
Some(v)
} else {
None
}
}
/// Remove, close, and delete all files for a volume.
pub fn delete_volume(&mut self, vid: VolumeId) -> Result<(), VolumeError> {
if let Some(mut v) = self.volumes.remove(&vid) {
v.destroy()?;
Ok(())
} else {
Err(VolumeError::NotFound)
}
}
/// Delete all volumes in a collection.
pub fn delete_collection(&mut self, collection: &str) {
let vids: Vec<VolumeId> = self
.volumes
.iter()
.filter(|(_, v)| v.collection == collection)
.map(|(vid, _)| *vid)
.collect();
for vid in vids {
if let Some(mut v) = self.volumes.remove(&vid) {
let _ = v.destroy();
}
}
}
// ---- Metrics ----
/// Number of volumes on this disk.
pub fn volumes_len(&self) -> usize {
self.volumes.len()
}
/// Get all volume IDs, sorted.
pub fn volume_ids(&self) -> Vec<VolumeId> {
let mut ids: Vec<VolumeId> = self.volumes.keys().copied().collect();
ids.sort();
ids
}
/// Number of free volume slots.
pub fn free_volume_count(&self) -> i32 {
let max = self.max_volume_count.load(Ordering::Relaxed);
let used = self.volumes.len() as i32;
if max > used {
max - used
} else {
0
}
}
/// Iterate over all volumes.
pub fn volumes(&self) -> impl Iterator<Item = (&VolumeId, &Volume)> {
self.volumes.iter()
}
/// Iterate over all volumes (mutable).
pub fn volumes_mut(&mut self) -> impl Iterator<Item = (&VolumeId, &mut Volume)> {
self.volumes.iter_mut()
}
/// Close all volumes.
pub fn close(&mut self) {
for (_, v) in self.volumes.iter_mut() {
v.close();
}
self.volumes.clear();
}
}
/// Parse a volume filename like "collection_42.dat" or "42.dat" into (collection, VolumeId).
fn parse_volume_filename(filename: &str) -> Option<(String, VolumeId)> {
let stem = filename.strip_suffix(".dat")?;
if let Some(pos) = stem.rfind('_') {
let collection = &stem[..pos];
let id_str = &stem[pos + 1..];
let id: u32 = id_str.parse().ok()?;
Some((collection.to_string(), VolumeId(id)))
} else {
let id: u32 = stem.parse().ok()?;
Some((String::new(), VolumeId(id)))
}
}
// ============================================================================
// Tests
// ============================================================================
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_parse_volume_filename() {
assert_eq!(
parse_volume_filename("42.dat"),
Some(("".to_string(), VolumeId(42)))
);
assert_eq!(
parse_volume_filename("pics_7.dat"),
Some(("pics".to_string(), VolumeId(7)))
);
assert_eq!(parse_volume_filename("notadat.idx"), None);
assert_eq!(parse_volume_filename("bad.dat"), None);
}
#[test]
fn test_disk_location_create_volume() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive);
loc.create_volume(
VolumeId(1), "", NeedleMapKind::InMemory,
None, None, 0,
).unwrap();
assert_eq!(loc.volumes_len(), 1);
assert!(loc.find_volume(VolumeId(1)).is_some());
assert!(loc.find_volume(VolumeId(99)).is_none());
assert_eq!(loc.free_volume_count(), 9);
}
#[test]
fn test_disk_location_load_existing() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
// Create volumes
{
let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive);
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0).unwrap();
loc.create_volume(VolumeId(2), "test", NeedleMapKind::InMemory, None, None, 0).unwrap();
loc.close();
}
// Reload
let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive);
loc.load_existing_volumes(NeedleMapKind::InMemory).unwrap();
assert_eq!(loc.volumes_len(), 2);
let ids = loc.volume_ids();
assert!(ids.contains(&VolumeId(1)));
assert!(ids.contains(&VolumeId(2)));
}
#[test]
fn test_disk_location_delete_volume() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive);
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0).unwrap();
loc.create_volume(VolumeId(2), "", NeedleMapKind::InMemory, None, None, 0).unwrap();
assert_eq!(loc.volumes_len(), 2);
loc.delete_volume(VolumeId(1)).unwrap();
assert_eq!(loc.volumes_len(), 1);
assert!(loc.find_volume(VolumeId(1)).is_none());
}
#[test]
fn test_disk_location_delete_collection() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive);
loc.create_volume(VolumeId(1), "pics", NeedleMapKind::InMemory, None, None, 0).unwrap();
loc.create_volume(VolumeId(2), "pics", NeedleMapKind::InMemory, None, None, 0).unwrap();
loc.create_volume(VolumeId(3), "docs", NeedleMapKind::InMemory, None, None, 0).unwrap();
assert_eq!(loc.volumes_len(), 3);
loc.delete_collection("pics");
assert_eq!(loc.volumes_len(), 1);
assert!(loc.find_volume(VolumeId(3)).is_some());
}
}

2
seaweed-volume/src/storage/mod.rs

@ -4,3 +4,5 @@ pub mod super_block;
pub mod idx;
pub mod needle_map;
pub mod volume;
pub mod disk_location;
pub mod store;

373
seaweed-volume/src/storage/store.rs

@ -0,0 +1,373 @@
//! Store: the top-level storage manager for a volume server.
//!
//! A Store manages multiple DiskLocations (one per configured directory).
//! It coordinates volume placement, lookup, and lifecycle operations.
//! Matches Go's storage/store.go.
use std::io;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::storage::disk_location::DiskLocation;
use crate::storage::needle::needle::Needle;
use crate::storage::needle_map::NeedleMapKind;
use crate::storage::super_block::ReplicaPlacement;
use crate::storage::types::*;
use crate::storage::volume::VolumeError;
/// Top-level storage manager containing all disk locations and their volumes.
pub struct Store {
pub locations: Vec<DiskLocation>,
pub needle_map_kind: NeedleMapKind,
pub volume_size_limit: AtomicU64,
pub ip: String,
pub port: u16,
pub grpc_port: u16,
pub public_url: String,
pub data_center: String,
pub rack: String,
}
impl Store {
pub fn new(needle_map_kind: NeedleMapKind) -> Self {
Store {
locations: Vec::new(),
needle_map_kind,
volume_size_limit: AtomicU64::new(0),
ip: String::new(),
port: 0,
grpc_port: 0,
public_url: String::new(),
data_center: String::new(),
rack: String::new(),
}
}
/// Add a disk location and load existing volumes from it.
pub fn add_location(
&mut self,
directory: &str,
idx_directory: &str,
max_volume_count: i32,
disk_type: DiskType,
) -> io::Result<()> {
let mut loc = DiskLocation::new(directory, idx_directory, max_volume_count, disk_type);
loc.load_existing_volumes(self.needle_map_kind)?;
self.locations.push(loc);
Ok(())
}
// ---- Volume lookup ----
/// Find which location contains a volume.
pub fn find_volume(&self, vid: VolumeId) -> Option<(usize, &crate::storage::volume::Volume)> {
for (i, loc) in self.locations.iter().enumerate() {
if let Some(v) = loc.find_volume(vid) {
return Some((i, v));
}
}
None
}
/// Find which location contains a volume (mutable).
fn find_volume_mut(&mut self, vid: VolumeId) -> Option<(usize, &mut crate::storage::volume::Volume)> {
for (i, loc) in self.locations.iter_mut().enumerate() {
if let Some(v) = loc.find_volume_mut(vid) {
return Some((i, v));
}
}
None
}
/// Check if a volume exists.
pub fn has_volume(&self, vid: VolumeId) -> bool {
self.find_volume(vid).is_some()
}
// ---- Volume lifecycle ----
/// Find the location with fewest volumes (load-balance) of the given disk type.
fn find_free_location(&self, disk_type: &DiskType) -> Option<usize> {
let mut best: Option<(usize, usize)> = None; // (index, volume_count)
for (i, loc) in self.locations.iter().enumerate() {
if &loc.disk_type != disk_type {
continue;
}
if loc.free_volume_count() <= 0 {
continue;
}
if loc.is_disk_space_low {
continue;
}
let count = loc.volumes_len();
if best.is_none() || count < best.unwrap().1 {
best = Some((i, count));
}
}
best.map(|(i, _)| i)
}
/// Create a new volume, placing it on the location with the most free space.
pub fn add_volume(
&mut self,
vid: VolumeId,
collection: &str,
replica_placement: Option<ReplicaPlacement>,
ttl: Option<crate::storage::needle::ttl::TTL>,
preallocate: u64,
disk_type: DiskType,
) -> Result<(), VolumeError> {
let loc_idx = self.find_free_location(&disk_type).ok_or_else(|| {
VolumeError::Io(io::Error::new(
io::ErrorKind::Other,
format!("no free location for disk type {:?}", disk_type),
))
})?;
self.locations[loc_idx].create_volume(
vid, collection, self.needle_map_kind,
replica_placement, ttl, preallocate,
)
}
/// Delete a volume from any location.
pub fn delete_volume(&mut self, vid: VolumeId) -> Result<(), VolumeError> {
for loc in &mut self.locations {
if loc.find_volume(vid).is_some() {
return loc.delete_volume(vid);
}
}
Err(VolumeError::NotFound)
}
/// Unload (unmount) a volume without deleting its files.
pub fn unmount_volume(&mut self, vid: VolumeId) -> bool {
for loc in &mut self.locations {
if loc.unload_volume(vid).is_some() {
return true;
}
}
false
}
/// Mount a volume from an existing .dat file.
pub fn mount_volume(
&mut self,
vid: VolumeId,
collection: &str,
disk_type: DiskType,
) -> Result<(), VolumeError> {
// Find the location where the .dat file exists
for loc in &mut self.locations {
if &loc.disk_type != &disk_type {
continue;
}
let base = crate::storage::volume::volume_file_name(
&loc.directory, collection, vid,
);
let dat_path = format!("{}.dat", base);
if std::path::Path::new(&dat_path).exists() {
return loc.create_volume(
vid, collection, self.needle_map_kind,
None, None, 0,
);
}
}
Err(VolumeError::Io(io::Error::new(
io::ErrorKind::NotFound,
format!("volume {} not found on any disk", vid),
)))
}
// ---- Read / Write / Delete ----
/// Read a needle from a volume.
pub fn read_volume_needle(&self, vid: VolumeId, n: &mut Needle) -> Result<i32, VolumeError> {
let (_, vol) = self.find_volume(vid).ok_or(VolumeError::NotFound)?;
vol.read_needle(n)
}
/// Write a needle to a volume.
pub fn write_volume_needle(
&mut self, vid: VolumeId, n: &mut Needle,
) -> Result<(u64, Size, bool), VolumeError> {
let (_, vol) = self.find_volume_mut(vid).ok_or(VolumeError::NotFound)?;
vol.write_needle(n, true)
}
/// Delete a needle from a volume.
pub fn delete_volume_needle(
&mut self, vid: VolumeId, n: &mut Needle,
) -> Result<Size, VolumeError> {
let (_, vol) = self.find_volume_mut(vid).ok_or(VolumeError::NotFound)?;
vol.delete_needle(n)
}
// ---- Collection operations ----
/// Delete all volumes in a collection.
pub fn delete_collection(&mut self, collection: &str) {
for loc in &mut self.locations {
loc.delete_collection(collection);
}
}
// ---- Metrics ----
/// Total volume count across all locations.
pub fn total_volume_count(&self) -> usize {
self.locations.iter().map(|loc| loc.volumes_len()).sum()
}
/// Total max volumes across all locations.
pub fn max_volume_count(&self) -> i32 {
self.locations.iter()
.map(|loc| loc.max_volume_count.load(Ordering::Relaxed))
.sum()
}
/// Free volume slots across all locations.
pub fn free_volume_count(&self) -> i32 {
self.locations.iter()
.map(|loc| loc.free_volume_count())
.sum()
}
/// All volume IDs across all locations.
pub fn all_volume_ids(&self) -> Vec<VolumeId> {
let mut ids: Vec<VolumeId> = self.locations.iter()
.flat_map(|loc| loc.volume_ids())
.collect();
ids.sort();
ids.dedup();
ids
}
/// Close all locations and their volumes.
pub fn close(&mut self) {
for loc in &mut self.locations {
loc.close();
}
}
}
// ============================================================================
// Tests
// ============================================================================
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::needle::needle::Needle;
use tempfile::TempDir;
fn make_test_store(dirs: &[&str]) -> Store {
let mut store = Store::new(NeedleMapKind::InMemory);
for dir in dirs {
store.add_location(dir, dir, 10, DiskType::HardDrive).unwrap();
}
store
}
#[test]
fn test_store_add_location() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory);
store.add_location(dir, dir, 10, DiskType::HardDrive).unwrap();
assert_eq!(store.locations.len(), 1);
assert_eq!(store.max_volume_count(), 10);
}
#[test]
fn test_store_add_volume() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut store = make_test_store(&[dir]);
store.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive).unwrap();
assert!(store.has_volume(VolumeId(1)));
assert!(!store.has_volume(VolumeId(2)));
assert_eq!(store.total_volume_count(), 1);
}
#[test]
fn test_store_read_write_delete() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut store = make_test_store(&[dir]);
store.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive).unwrap();
// Write
let mut n = Needle {
id: NeedleId(1),
cookie: Cookie(0xaa),
data: b"hello store".to_vec(),
data_size: 11,
..Needle::default()
};
let (offset, size, unchanged) = store.write_volume_needle(VolumeId(1), &mut n).unwrap();
assert!(!unchanged);
assert!(offset > 0);
// Read
let mut read_n = Needle { id: NeedleId(1), ..Needle::default() };
let count = store.read_volume_needle(VolumeId(1), &mut read_n).unwrap();
assert_eq!(count, 11);
assert_eq!(read_n.data, b"hello store");
// Delete
let mut del_n = Needle { id: NeedleId(1), cookie: Cookie(0xaa), ..Needle::default() };
let deleted = store.delete_volume_needle(VolumeId(1), &mut del_n).unwrap();
assert!(deleted.0 > 0);
}
#[test]
fn test_store_multi_location() {
let tmp1 = TempDir::new().unwrap();
let tmp2 = TempDir::new().unwrap();
let dir1 = tmp1.path().to_str().unwrap();
let dir2 = tmp2.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory);
store.add_location(dir1, dir1, 5, DiskType::HardDrive).unwrap();
store.add_location(dir2, dir2, 5, DiskType::HardDrive).unwrap();
assert_eq!(store.max_volume_count(), 10);
// Add volumes — should go to location with fewest volumes
store.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive).unwrap();
store.add_volume(VolumeId(2), "", None, None, 0, DiskType::HardDrive).unwrap();
assert_eq!(store.total_volume_count(), 2);
// Both locations should have 1 volume each (load-balanced)
assert_eq!(store.locations[0].volumes_len(), 1);
assert_eq!(store.locations[1].volumes_len(), 1);
}
#[test]
fn test_store_delete_collection() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut store = make_test_store(&[dir]);
store.add_volume(VolumeId(1), "pics", None, None, 0, DiskType::HardDrive).unwrap();
store.add_volume(VolumeId(2), "pics", None, None, 0, DiskType::HardDrive).unwrap();
store.add_volume(VolumeId(3), "docs", None, None, 0, DiskType::HardDrive).unwrap();
assert_eq!(store.total_volume_count(), 3);
store.delete_collection("pics");
assert_eq!(store.total_volume_count(), 1);
assert!(store.has_volume(VolumeId(3)));
}
#[test]
fn test_store_volume_not_found() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let store = make_test_store(&[dir]);
let mut n = Needle { id: NeedleId(1), ..Needle::default() };
let err = store.read_volume_needle(VolumeId(99), &mut n);
assert!(matches!(err, Err(VolumeError::NotFound)));
}
}
Loading…
Cancel
Save