Browse Source

fix: send go-compatible heartbeat metadata

rust-volume-server
Chris Lu 6 days ago
parent
commit
8ade1c51d4
  1. 91
      seaweed-volume/src/main.rs
  2. 161
      seaweed-volume/src/server/heartbeat.rs
  3. 142
      seaweed-volume/src/storage/disk_location.rs
  4. 221
      seaweed-volume/src/storage/store.rs
  5. 41
      seaweed-volume/tests/http_integration.rs

91
seaweed-volume/src/main.rs

@ -1,16 +1,16 @@
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use tracing::{info, error};
use tracing::{error, info};
use seaweed_volume::config::{self, VolumeServerConfig}; use seaweed_volume::config::{self, VolumeServerConfig};
use seaweed_volume::metrics; use seaweed_volume::metrics;
use seaweed_volume::pb::volume_server_pb::volume_server_server::VolumeServerServer;
use seaweed_volume::security::{Guard, SigningKey}; use seaweed_volume::security::{Guard, SigningKey};
use seaweed_volume::server::grpc_server::VolumeGrpcService; use seaweed_volume::server::grpc_server::VolumeGrpcService;
use seaweed_volume::server::volume_server::VolumeServerState; use seaweed_volume::server::volume_server::VolumeServerState;
use seaweed_volume::server::write_queue::WriteQueue; use seaweed_volume::server::write_queue::WriteQueue;
use seaweed_volume::storage::store::Store; use seaweed_volume::storage::store::Store;
use seaweed_volume::storage::types::DiskType; use seaweed_volume::storage::types::DiskType;
use seaweed_volume::pb::volume_server_pb::volume_server_server::VolumeServerServer;
use tokio_rustls::TlsAcceptor; use tokio_rustls::TlsAcceptor;
@ -24,7 +24,10 @@ fn main() {
.init(); .init();
let config = config::parse_cli(); let config = config::parse_cli();
info!("SeaweedFS Volume Server (Rust) v{}", env!("CARGO_PKG_VERSION"));
info!(
"SeaweedFS Volume Server (Rust) v{}",
env!("CARGO_PKG_VERSION")
);
// Register Prometheus metrics // Register Prometheus metrics
metrics::register_metrics(); metrics::register_metrics();
@ -64,6 +67,7 @@ fn load_rustls_config(cert_path: &str, key_path: &str) -> rustls::ServerConfig {
async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error>> { async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error>> {
// Initialize the store // Initialize the store
let mut store = Store::new(config.index_type); let mut store = Store::new(config.index_type);
store.id = config.id.clone();
store.ip = config.ip.clone(); store.ip = config.ip.clone();
store.port = config.port; store.port = config.port;
store.grpc_port = config.grpc_port; store.grpc_port = config.grpc_port;
@ -80,13 +84,15 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
}; };
let max_volumes = config.folder_max_limits[i]; let max_volumes = config.folder_max_limits[i];
let disk_type = DiskType::from_string(&config.disk_types[i]); let disk_type = DiskType::from_string(&config.disk_types[i]);
let tags = config.folder_tags.get(i).cloned().unwrap_or_default();
info!( info!(
"Adding storage location: {} (max_volumes={}, disk_type={:?})", "Adding storage location: {} (max_volumes={}, disk_type={:?})",
dir, max_volumes, disk_type dir, max_volumes, disk_type
); );
let min_free_space = config.min_free_spaces[i].clone(); let min_free_space = config.min_free_spaces[i].clone();
store.add_location(dir, idx_dir, max_volumes, disk_type, min_free_space)
store
.add_location(dir, idx_dir, max_volumes, disk_type, min_free_space, tags)
.map_err(|e| format!("Failed to add storage location {}: {}", dir, e))?; .map_err(|e| format!("Failed to add storage location {}: {}", dir, e))?;
} }
@ -179,7 +185,10 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
info!("Starting HTTP server on {}", admin_addr); info!("Starting HTTP server on {}", admin_addr);
info!("Starting gRPC server on {}", grpc_addr); info!("Starting gRPC server on {}", grpc_addr);
if needs_public { if needs_public {
info!("Starting public HTTP server on {}:{}", config.bind_ip, public_port);
info!(
"Starting public HTTP server on {}:{}",
config.bind_ip, public_port
);
} }
// Set up graceful shutdown via SIGINT/SIGTERM using broadcast channel // Set up graceful shutdown via SIGINT/SIGTERM using broadcast channel
@ -217,19 +226,27 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
}); });
// Build optional TLS acceptor for HTTPS // Build optional TLS acceptor for HTTPS
let https_tls_acceptor = if !config.https_cert_file.is_empty() && !config.https_key_file.is_empty() {
info!("TLS enabled for HTTP server (cert={}, key={})", config.https_cert_file, config.https_key_file);
let tls_config = load_rustls_config(&config.https_cert_file, &config.https_key_file);
Some(TlsAcceptor::from(Arc::new(tls_config)))
} else {
None
};
let https_tls_acceptor =
if !config.https_cert_file.is_empty() && !config.https_key_file.is_empty() {
info!(
"TLS enabled for HTTP server (cert={}, key={})",
config.https_cert_file, config.https_key_file
);
let tls_config = load_rustls_config(&config.https_cert_file, &config.https_key_file);
Some(TlsAcceptor::from(Arc::new(tls_config)))
} else {
None
};
// Spawn all servers concurrently // Spawn all servers concurrently
let admin_listener = tokio::net::TcpListener::bind(&admin_addr) let admin_listener = tokio::net::TcpListener::bind(&admin_addr)
.await .await
.unwrap_or_else(|e| panic!("Failed to bind HTTP to {}: {}", admin_addr, e)); .unwrap_or_else(|e| panic!("Failed to bind HTTP to {}: {}", admin_addr, e));
let scheme = if https_tls_acceptor.is_some() { "HTTPS" } else { "HTTP" };
let scheme = if https_tls_acceptor.is_some() {
"HTTPS"
} else {
"HTTP"
};
info!("{} server listening on {}", scheme, admin_addr); info!("{} server listening on {}", scheme, admin_addr);
let http_handle = if let Some(tls_acceptor) = https_tls_acceptor.clone() { let http_handle = if let Some(tls_acceptor) = https_tls_acceptor.clone() {
@ -237,13 +254,16 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
tokio::spawn(async move { tokio::spawn(async move {
serve_https(admin_listener, admin_router, tls_acceptor, async move { serve_https(admin_listener, admin_router, tls_acceptor, async move {
let _ = shutdown_rx.recv().await; let _ = shutdown_rx.recv().await;
}).await;
})
.await;
}) })
} else { } else {
let mut shutdown_rx = shutdown_tx.subscribe(); let mut shutdown_rx = shutdown_tx.subscribe();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = axum::serve(admin_listener, admin_router) if let Err(e) = axum::serve(admin_listener, admin_router)
.with_graceful_shutdown(async move { let _ = shutdown_rx.recv().await; })
.with_graceful_shutdown(async move {
let _ = shutdown_rx.recv().await;
})
.await .await
{ {
error!("HTTP server error: {}", e); error!("HTTP server error: {}", e);
@ -260,17 +280,21 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
let use_tls = !grpc_cert_file.is_empty() && !grpc_key_file.is_empty(); let use_tls = !grpc_cert_file.is_empty() && !grpc_key_file.is_empty();
if use_tls { if use_tls {
info!("gRPC server listening on {} (TLS enabled)", addr); info!("gRPC server listening on {} (TLS enabled)", addr);
let cert = std::fs::read_to_string(&grpc_cert_file)
.unwrap_or_else(|e| panic!("Failed to read gRPC cert '{}': {}", grpc_cert_file, e));
let key = std::fs::read_to_string(&grpc_key_file)
.unwrap_or_else(|e| panic!("Failed to read gRPC key '{}': {}", grpc_key_file, e));
let cert = std::fs::read_to_string(&grpc_cert_file).unwrap_or_else(|e| {
panic!("Failed to read gRPC cert '{}': {}", grpc_cert_file, e)
});
let key = std::fs::read_to_string(&grpc_key_file).unwrap_or_else(|e| {
panic!("Failed to read gRPC key '{}': {}", grpc_key_file, e)
});
let identity = tonic::transport::Identity::from_pem(cert, key); let identity = tonic::transport::Identity::from_pem(cert, key);
let tls_config = tonic::transport::ServerTlsConfig::new().identity(identity); let tls_config = tonic::transport::ServerTlsConfig::new().identity(identity);
if let Err(e) = tonic::transport::Server::builder() if let Err(e) = tonic::transport::Server::builder()
.tls_config(tls_config) .tls_config(tls_config)
.expect("Failed to configure gRPC TLS") .expect("Failed to configure gRPC TLS")
.add_service(VolumeServerServer::new(grpc_service)) .add_service(VolumeServerServer::new(grpc_service))
.serve_with_shutdown(addr, async move { let _ = shutdown_rx.recv().await; })
.serve_with_shutdown(addr, async move {
let _ = shutdown_rx.recv().await;
})
.await .await
{ {
error!("gRPC server error: {}", e); error!("gRPC server error: {}", e);
@ -279,7 +303,9 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
info!("gRPC server listening on {}", addr); info!("gRPC server listening on {}", addr);
if let Err(e) = tonic::transport::Server::builder() if let Err(e) = tonic::transport::Server::builder()
.add_service(VolumeServerServer::new(grpc_service)) .add_service(VolumeServerServer::new(grpc_service))
.serve_with_shutdown(addr, async move { let _ = shutdown_rx.recv().await; })
.serve_with_shutdown(addr, async move {
let _ = shutdown_rx.recv().await;
})
.await .await
{ {
error!("gRPC server error: {}", e); error!("gRPC server error: {}", e);
@ -307,8 +333,11 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
info!("Will send heartbeats to master: {:?}", master_addrs); info!("Will send heartbeats to master: {:?}", master_addrs);
Some(tokio::spawn(async move { Some(tokio::spawn(async move {
seaweed_volume::server::heartbeat::run_heartbeat_with_state( seaweed_volume::server::heartbeat::run_heartbeat_with_state(
hb_config, hb_state, hb_shutdown
).await;
hb_config,
hb_state,
hb_shutdown,
)
.await;
})) }))
} else { } else {
None None
@ -316,25 +345,33 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
}; };
let public_handle = if needs_public { let public_handle = if needs_public {
let public_router = seaweed_volume::server::volume_server::build_public_router(state.clone());
let public_router =
seaweed_volume::server::volume_server::build_public_router(state.clone());
let public_addr = format!("{}:{}", config.bind_ip, public_port); let public_addr = format!("{}:{}", config.bind_ip, public_port);
let listener = tokio::net::TcpListener::bind(&public_addr) let listener = tokio::net::TcpListener::bind(&public_addr)
.await .await
.unwrap_or_else(|e| panic!("Failed to bind public HTTP to {}: {}", public_addr, e)); .unwrap_or_else(|e| panic!("Failed to bind public HTTP to {}: {}", public_addr, e));
let pub_scheme = if https_tls_acceptor.is_some() { "HTTPS" } else { "HTTP" };
let pub_scheme = if https_tls_acceptor.is_some() {
"HTTPS"
} else {
"HTTP"
};
info!("Public {} server listening on {}", pub_scheme, public_addr); info!("Public {} server listening on {}", pub_scheme, public_addr);
if let Some(tls_acceptor) = https_tls_acceptor { if let Some(tls_acceptor) = https_tls_acceptor {
let mut shutdown_rx = shutdown_tx.subscribe(); let mut shutdown_rx = shutdown_tx.subscribe();
Some(tokio::spawn(async move { Some(tokio::spawn(async move {
serve_https(listener, public_router, tls_acceptor, async move { serve_https(listener, public_router, tls_acceptor, async move {
let _ = shutdown_rx.recv().await; let _ = shutdown_rx.recv().await;
}).await;
})
.await;
})) }))
} else { } else {
let mut shutdown_rx = shutdown_tx.subscribe(); let mut shutdown_rx = shutdown_tx.subscribe();
Some(tokio::spawn(async move { Some(tokio::spawn(async move {
if let Err(e) = axum::serve(listener, public_router) if let Err(e) = axum::serve(listener, public_router)
.with_graceful_shutdown(async move { let _ = shutdown_rx.recv().await; })
.with_graceful_shutdown(async move {
let _ = shutdown_rx.recv().await;
})
.await .await
{ {
error!("Public HTTP server error: {}", e); error!("Public HTTP server error: {}", e);

161
seaweed-volume/src/server/heartbeat.rs

@ -4,18 +4,19 @@
//! matching Go's `server/volume_grpc_client_to_master.go`. //! matching Go's `server/volume_grpc_client_to_master.go`.
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tonic::transport::Channel; use tonic::transport::Channel;
use tracing::{info, warn, error};
use tracing::{error, info, warn};
use super::volume_server::VolumeServerState;
use crate::pb::master_pb; use crate::pb::master_pb;
use crate::pb::master_pb::seaweed_client::SeaweedClient; use crate::pb::master_pb::seaweed_client::SeaweedClient;
use crate::storage::store::Store;
use crate::storage::types::NeedleId; use crate::storage::types::NeedleId;
use super::volume_server::VolumeServerState;
/// Configuration for the heartbeat client. /// Configuration for the heartbeat client.
pub struct HeartbeatConfig { pub struct HeartbeatConfig {
@ -35,7 +36,10 @@ pub async fn run_heartbeat_with_state(
state: Arc<VolumeServerState>, state: Arc<VolumeServerState>,
mut shutdown_rx: broadcast::Receiver<()>, mut shutdown_rx: broadcast::Receiver<()>,
) { ) {
info!("Starting heartbeat to master nodes: {:?}", config.master_addresses);
info!(
"Starting heartbeat to master nodes: {:?}",
config.master_addresses
);
let pulse = Duration::from_secs(config.pulse_seconds.max(1)); let pulse = Duration::from_secs(config.pulse_seconds.max(1));
@ -164,17 +168,24 @@ async fn do_heartbeat(
_ = shutdown_rx.recv() => { _ = shutdown_rx.recv() => {
state.is_heartbeating.store(false, Ordering::Relaxed); state.is_heartbeating.store(false, Ordering::Relaxed);
let empty = master_pb::Heartbeat {
ip: config.ip.clone(),
port: config.port as u32,
public_url: config.public_url.clone(),
max_file_key: 0,
data_center: config.data_center.clone(),
rack: config.rack.clone(),
has_no_volumes: true,
has_no_ec_shards: true,
grpc_port: config.grpc_port as u32,
..Default::default()
let empty = {
let store = state.store.read().unwrap();
let (location_uuids, disk_tags) = collect_location_metadata(&store);
master_pb::Heartbeat {
id: store.id.clone(),
ip: config.ip.clone(),
port: config.port as u32,
public_url: config.public_url.clone(),
max_file_key: 0,
data_center: config.data_center.clone(),
rack: config.rack.clone(),
has_no_volumes: true,
has_no_ec_shards: true,
grpc_port: config.grpc_port as u32,
location_uuids,
disk_tags,
..Default::default()
}
}; };
let _ = tx.send(empty).await; let _ = tx.send(empty).await;
tokio::time::sleep(Duration::from_millis(200)).await; tokio::time::sleep(Duration::from_millis(200)).await;
@ -186,16 +197,42 @@ async fn do_heartbeat(
} }
/// Collect volume information into a Heartbeat message. /// Collect volume information into a Heartbeat message.
fn collect_heartbeat(config: &HeartbeatConfig, state: &Arc<VolumeServerState>) -> master_pb::Heartbeat {
fn collect_heartbeat(
config: &HeartbeatConfig,
state: &Arc<VolumeServerState>,
) -> master_pb::Heartbeat {
let store = state.store.read().unwrap(); let store = state.store.read().unwrap();
build_heartbeat(config, &store)
}
fn collect_location_metadata(store: &Store) -> (Vec<String>, Vec<master_pb::DiskTag>) {
let location_uuids = store
.locations
.iter()
.map(|loc| loc.directory_uuid.clone())
.collect();
let disk_tags = store
.locations
.iter()
.enumerate()
.map(|(disk_id, loc)| master_pb::DiskTag {
disk_id: disk_id as u32,
tags: loc.tags.clone(),
})
.collect();
(location_uuids, disk_tags)
}
fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartbeat {
let mut volumes = Vec::new(); let mut volumes = Vec::new();
let mut max_file_key = NeedleId(0); let mut max_file_key = NeedleId(0);
let mut max_volume_counts: HashMap<String, u32> = HashMap::new(); let mut max_volume_counts: HashMap<String, u32> = HashMap::new();
for loc in &store.locations { for loc in &store.locations {
let disk_type_str = loc.disk_type.to_string(); let disk_type_str = loc.disk_type.to_string();
let max_count = loc.max_volume_count.load(std::sync::atomic::Ordering::Relaxed);
let max_count = loc
.max_volume_count
.load(std::sync::atomic::Ordering::Relaxed);
*max_volume_counts.entry(disk_type_str).or_insert(0) += max_count as u32; *max_volume_counts.entry(disk_type_str).or_insert(0) += max_count as u32;
for (_, vol) in loc.iter_volumes() { for (_, vol) in loc.iter_volumes() {
@ -222,8 +259,11 @@ fn collect_heartbeat(config: &HeartbeatConfig, state: &Arc<VolumeServerState>) -
}); });
} }
} }
let has_no_volumes = volumes.is_empty();
let (location_uuids, disk_tags) = collect_location_metadata(store);
master_pb::Heartbeat { master_pb::Heartbeat {
id: store.id.clone(),
ip: config.ip.clone(), ip: config.ip.clone(),
port: config.port as u32, port: config.port as u32,
public_url: config.public_url.clone(), public_url: config.public_url.clone(),
@ -232,9 +272,11 @@ fn collect_heartbeat(config: &HeartbeatConfig, state: &Arc<VolumeServerState>) -
rack: config.rack.clone(), rack: config.rack.clone(),
admin_port: config.port as u32, admin_port: config.port as u32,
volumes, volumes,
has_no_volumes: false,
has_no_volumes,
max_volume_counts, max_volume_counts,
grpc_port: config.grpc_port as u32, grpc_port: config.grpc_port as u32,
location_uuids,
disk_tags,
..Default::default() ..Default::default()
} }
} }
@ -268,3 +310,86 @@ fn collect_ec_heartbeat(state: &Arc<VolumeServerState>) -> master_pb::Heartbeat
..Default::default() ..Default::default()
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::config::MinFreeSpace;
use crate::storage::needle_map::NeedleMapKind;
use crate::storage::types::{DiskType, VolumeId};
fn test_config() -> HeartbeatConfig {
HeartbeatConfig {
ip: "127.0.0.1".to_string(),
port: 8080,
grpc_port: 18080,
public_url: "127.0.0.1:8080".to_string(),
data_center: "dc1".to_string(),
rack: "rack1".to_string(),
master_addresses: Vec::new(),
pulse_seconds: 5,
}
}
#[test]
fn test_build_heartbeat_includes_store_identity_and_disk_metadata() {
let temp_dir = tempfile::tempdir().unwrap();
let dir = temp_dir.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory);
store.id = "volume-node-a".to_string();
store
.add_location(
dir,
dir,
3,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
vec!["fast".to_string(), "ssd".to_string()],
)
.unwrap();
store
.add_volume(VolumeId(7), "pics", None, None, 0, DiskType::HardDrive)
.unwrap();
let heartbeat = build_heartbeat(&test_config(), &store);
assert_eq!(heartbeat.id, "volume-node-a");
assert_eq!(heartbeat.volumes.len(), 1);
assert!(!heartbeat.has_no_volumes);
assert_eq!(
heartbeat.location_uuids,
vec![store.locations[0].directory_uuid.clone()]
);
assert_eq!(heartbeat.disk_tags.len(), 1);
assert_eq!(heartbeat.disk_tags[0].disk_id, 0);
assert_eq!(
heartbeat.disk_tags[0].tags,
vec!["fast".to_string(), "ssd".to_string()]
);
}
#[test]
fn test_build_heartbeat_marks_empty_store_as_has_no_volumes() {
let temp_dir = tempfile::tempdir().unwrap();
let dir = temp_dir.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory);
store.id = "volume-node-b".to_string();
store
.add_location(
dir,
dir,
2,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
let heartbeat = build_heartbeat(&test_config(), &store);
assert!(heartbeat.volumes.is_empty());
assert!(heartbeat.has_no_volumes);
}
}

142
seaweed-volume/src/storage/disk_location.rs

@ -19,7 +19,9 @@ use crate::storage::volume::{Volume, VolumeError};
pub struct DiskLocation { pub struct DiskLocation {
pub directory: String, pub directory: String,
pub idx_directory: String, pub idx_directory: String,
pub directory_uuid: String,
pub disk_type: DiskType, pub disk_type: DiskType,
pub tags: Vec<String>,
pub max_volume_count: AtomicI32, pub max_volume_count: AtomicI32,
pub original_max_volume_count: i32, pub original_max_volume_count: i32,
volumes: HashMap<VolumeId, Volume>, volumes: HashMap<VolumeId, Volume>,
@ -29,30 +31,53 @@ pub struct DiskLocation {
} }
impl DiskLocation { impl DiskLocation {
const UUID_FILE_NAME: &'static str = "vol_dir.uuid";
pub fn new( pub fn new(
directory: &str, directory: &str,
idx_directory: &str, idx_directory: &str,
max_volume_count: i32, max_volume_count: i32,
disk_type: DiskType, disk_type: DiskType,
min_free_space: MinFreeSpace, min_free_space: MinFreeSpace,
) -> Self {
tags: Vec<String>,
) -> io::Result<Self> {
fs::create_dir_all(directory)?;
let idx_dir = if idx_directory.is_empty() { let idx_dir = if idx_directory.is_empty() {
directory.to_string() directory.to_string()
} else { } else {
fs::create_dir_all(idx_directory)?;
idx_directory.to_string() idx_directory.to_string()
}; };
let directory_uuid = Self::generate_directory_uuid(directory)?;
DiskLocation {
Ok(DiskLocation {
directory: directory.to_string(), directory: directory.to_string(),
idx_directory: idx_dir, idx_directory: idx_dir,
directory_uuid,
disk_type, disk_type,
tags,
max_volume_count: AtomicI32::new(max_volume_count), max_volume_count: AtomicI32::new(max_volume_count),
original_max_volume_count: max_volume_count, original_max_volume_count: max_volume_count,
volumes: HashMap::new(), volumes: HashMap::new(),
is_disk_space_low: AtomicBool::new(false), is_disk_space_low: AtomicBool::new(false),
available_space: AtomicU64::new(0), available_space: AtomicU64::new(0),
min_free_space, min_free_space,
})
}
fn generate_directory_uuid(directory: &str) -> io::Result<String> {
let path = std::path::Path::new(directory).join(Self::UUID_FILE_NAME);
if path.exists() {
let existing = fs::read_to_string(&path)?;
if !existing.trim().is_empty() {
return Ok(existing);
}
} }
let dir_uuid = uuid::Uuid::new_v4().to_string();
fs::write(path, &dir_uuid)?;
Ok(dir_uuid)
} }
// ---- Volume management ---- // ---- Volume management ----
@ -313,12 +338,18 @@ mod tests {
fn test_disk_location_create_volume() { fn test_disk_location_create_volume() {
let tmp = TempDir::new().unwrap(); let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap(); let dir = tmp.path().to_str().unwrap();
let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive, MinFreeSpace::Percent(1.0));
loc.create_volume(
VolumeId(1), "", NeedleMapKind::InMemory,
None, None, 0,
).unwrap();
let mut loc = DiskLocation::new(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0)
.unwrap();
assert_eq!(loc.volumes_len(), 1); assert_eq!(loc.volumes_len(), 1);
assert!(loc.find_volume(VolumeId(1)).is_some()); assert!(loc.find_volume(VolumeId(1)).is_some());
@ -333,14 +364,32 @@ mod tests {
// Create volumes // Create volumes
{ {
let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive, MinFreeSpace::Percent(1.0));
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0).unwrap();
loc.create_volume(VolumeId(2), "test", NeedleMapKind::InMemory, None, None, 0).unwrap();
let mut loc = DiskLocation::new(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0)
.unwrap();
loc.create_volume(VolumeId(2), "test", NeedleMapKind::InMemory, None, None, 0)
.unwrap();
loc.close(); loc.close();
} }
// Reload // Reload
let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive, MinFreeSpace::Percent(1.0));
let mut loc = DiskLocation::new(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
loc.load_existing_volumes(NeedleMapKind::InMemory).unwrap(); loc.load_existing_volumes(NeedleMapKind::InMemory).unwrap();
assert_eq!(loc.volumes_len(), 2); assert_eq!(loc.volumes_len(), 2);
@ -353,10 +402,20 @@ mod tests {
fn test_disk_location_delete_volume() { fn test_disk_location_delete_volume() {
let tmp = TempDir::new().unwrap(); let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap(); let dir = tmp.path().to_str().unwrap();
let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive, MinFreeSpace::Percent(1.0));
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0).unwrap();
loc.create_volume(VolumeId(2), "", NeedleMapKind::InMemory, None, None, 0).unwrap();
let mut loc = DiskLocation::new(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0)
.unwrap();
loc.create_volume(VolumeId(2), "", NeedleMapKind::InMemory, None, None, 0)
.unwrap();
assert_eq!(loc.volumes_len(), 2); assert_eq!(loc.volumes_len(), 2);
loc.delete_volume(VolumeId(1)).unwrap(); loc.delete_volume(VolumeId(1)).unwrap();
@ -368,15 +427,56 @@ mod tests {
fn test_disk_location_delete_collection() { fn test_disk_location_delete_collection() {
let tmp = TempDir::new().unwrap(); let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap(); let dir = tmp.path().to_str().unwrap();
let mut loc = DiskLocation::new(dir, dir, 10, DiskType::HardDrive, MinFreeSpace::Percent(1.0));
loc.create_volume(VolumeId(1), "pics", NeedleMapKind::InMemory, None, None, 0).unwrap();
loc.create_volume(VolumeId(2), "pics", NeedleMapKind::InMemory, None, None, 0).unwrap();
loc.create_volume(VolumeId(3), "docs", NeedleMapKind::InMemory, None, None, 0).unwrap();
let mut loc = DiskLocation::new(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
loc.create_volume(VolumeId(1), "pics", NeedleMapKind::InMemory, None, None, 0)
.unwrap();
loc.create_volume(VolumeId(2), "pics", NeedleMapKind::InMemory, None, None, 0)
.unwrap();
loc.create_volume(VolumeId(3), "docs", NeedleMapKind::InMemory, None, None, 0)
.unwrap();
assert_eq!(loc.volumes_len(), 3); assert_eq!(loc.volumes_len(), 3);
loc.delete_collection("pics"); loc.delete_collection("pics");
assert_eq!(loc.volumes_len(), 1); assert_eq!(loc.volumes_len(), 1);
assert!(loc.find_volume(VolumeId(3)).is_some()); assert!(loc.find_volume(VolumeId(3)).is_some());
} }
#[test]
fn test_disk_location_persists_directory_uuid_and_tags() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let loc = DiskLocation::new(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
vec!["fast".to_string(), "ssd".to_string()],
)
.unwrap();
let directory_uuid = loc.directory_uuid.clone();
assert_eq!(loc.tags, vec!["fast".to_string(), "ssd".to_string()]);
drop(loc);
let reloaded = DiskLocation::new(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
assert_eq!(reloaded.directory_uuid, directory_uuid);
}
} }

221
seaweed-volume/src/storage/store.rs

@ -10,8 +10,8 @@ use std::sync::atomic::{AtomicU64, Ordering};
use crate::config::MinFreeSpace; use crate::config::MinFreeSpace;
use crate::storage::disk_location::DiskLocation; use crate::storage::disk_location::DiskLocation;
use crate::storage::erasure_coding::ec_volume::EcVolume;
use crate::storage::erasure_coding::ec_shard::EcVolumeShard; use crate::storage::erasure_coding::ec_shard::EcVolumeShard;
use crate::storage::erasure_coding::ec_volume::EcVolume;
use crate::storage::needle::needle::Needle; use crate::storage::needle::needle::Needle;
use crate::storage::needle_map::NeedleMapKind; use crate::storage::needle_map::NeedleMapKind;
use crate::storage::super_block::ReplicaPlacement; use crate::storage::super_block::ReplicaPlacement;
@ -23,6 +23,7 @@ pub struct Store {
pub locations: Vec<DiskLocation>, pub locations: Vec<DiskLocation>,
pub needle_map_kind: NeedleMapKind, pub needle_map_kind: NeedleMapKind,
pub volume_size_limit: AtomicU64, pub volume_size_limit: AtomicU64,
pub id: String,
pub ip: String, pub ip: String,
pub port: u16, pub port: u16,
pub grpc_port: u16, pub grpc_port: u16,
@ -38,6 +39,7 @@ impl Store {
locations: Vec::new(), locations: Vec::new(),
needle_map_kind, needle_map_kind,
volume_size_limit: AtomicU64::new(0), volume_size_limit: AtomicU64::new(0),
id: String::new(),
ip: String::new(), ip: String::new(),
port: 0, port: 0,
grpc_port: 0, grpc_port: 0,
@ -56,8 +58,16 @@ impl Store {
max_volume_count: i32, max_volume_count: i32,
disk_type: DiskType, disk_type: DiskType,
min_free_space: MinFreeSpace, min_free_space: MinFreeSpace,
tags: Vec<String>,
) -> io::Result<()> { ) -> io::Result<()> {
let mut loc = DiskLocation::new(directory, idx_directory, max_volume_count, disk_type, min_free_space);
let mut loc = DiskLocation::new(
directory,
idx_directory,
max_volume_count,
disk_type,
min_free_space,
tags,
)?;
loc.load_existing_volumes(self.needle_map_kind)?; loc.load_existing_volumes(self.needle_map_kind)?;
// Check for duplicate volume IDs across existing locations // Check for duplicate volume IDs across existing locations
@ -65,7 +75,10 @@ impl Store {
if self.find_volume(vid).is_some() { if self.find_volume(vid).is_some() {
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::AlreadyExists, io::ErrorKind::AlreadyExists,
format!("volume {} already exists in another location, conflicting dir: {}", vid, directory),
format!(
"volume {} already exists in another location, conflicting dir: {}",
vid, directory
),
)); ));
} }
} }
@ -87,7 +100,10 @@ impl Store {
} }
/// Find which location contains a volume (mutable). /// Find which location contains a volume (mutable).
pub fn find_volume_mut(&mut self, vid: VolumeId) -> Option<(usize, &mut crate::storage::volume::Volume)> {
pub fn find_volume_mut(
&mut self,
vid: VolumeId,
) -> Option<(usize, &mut crate::storage::volume::Volume)> {
for (i, loc) in self.locations.iter_mut().enumerate() { for (i, loc) in self.locations.iter_mut().enumerate() {
if let Some(v) = loc.find_volume_mut(vid) { if let Some(v) = loc.find_volume_mut(vid) {
return Some((i, v)); return Some((i, v));
@ -145,8 +161,12 @@ impl Store {
})?; })?;
self.locations[loc_idx].create_volume( self.locations[loc_idx].create_volume(
vid, collection, self.needle_map_kind,
replica_placement, ttl, preallocate,
vid,
collection,
self.needle_map_kind,
replica_placement,
ttl,
preallocate,
) )
} }
@ -185,15 +205,10 @@ impl Store {
if &loc.disk_type != &disk_type { if &loc.disk_type != &disk_type {
continue; continue;
} }
let base = crate::storage::volume::volume_file_name(
&loc.directory, collection, vid,
);
let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid);
let dat_path = format!("{}.dat", base); let dat_path = format!("{}.dat", base);
if std::path::Path::new(&dat_path).exists() { if std::path::Path::new(&dat_path).exists() {
return loc.create_volume(
vid, collection, self.needle_map_kind,
None, None, 0,
);
return loc.create_volume(vid, collection, self.needle_map_kind, None, None, 0);
} }
} }
Err(VolumeError::Io(io::Error::new( Err(VolumeError::Io(io::Error::new(
@ -211,14 +226,22 @@ impl Store {
} }
/// Read a needle from a volume, optionally reading deleted needles. /// Read a needle from a volume, optionally reading deleted needles.
pub fn read_volume_needle_opt(&self, vid: VolumeId, n: &mut Needle, read_deleted: bool) -> Result<i32, VolumeError> {
pub fn read_volume_needle_opt(
&self,
vid: VolumeId,
n: &mut Needle,
read_deleted: bool,
) -> Result<i32, VolumeError> {
let (_, vol) = self.find_volume(vid).ok_or(VolumeError::NotFound)?; let (_, vol) = self.find_volume(vid).ok_or(VolumeError::NotFound)?;
vol.read_needle_opt(n, read_deleted) vol.read_needle_opt(n, read_deleted)
} }
/// Read needle metadata and return streaming info for large file reads. /// Read needle metadata and return streaming info for large file reads.
pub fn read_volume_needle_stream_info( pub fn read_volume_needle_stream_info(
&self, vid: VolumeId, n: &mut Needle, read_deleted: bool,
&self,
vid: VolumeId,
n: &mut Needle,
read_deleted: bool,
) -> Result<crate::storage::volume::NeedleStreamInfo, VolumeError> { ) -> Result<crate::storage::volume::NeedleStreamInfo, VolumeError> {
let (_, vol) = self.find_volume(vid).ok_or(VolumeError::NotFound)?; let (_, vol) = self.find_volume(vid).ok_or(VolumeError::NotFound)?;
vol.read_needle_stream_info(n, read_deleted) vol.read_needle_stream_info(n, read_deleted)
@ -226,12 +249,20 @@ impl Store {
/// Write a needle to a volume. /// Write a needle to a volume.
pub fn write_volume_needle( pub fn write_volume_needle(
&mut self, vid: VolumeId, n: &mut Needle,
&mut self,
vid: VolumeId,
n: &mut Needle,
) -> Result<(u64, Size, bool), VolumeError> { ) -> Result<(u64, Size, bool), VolumeError> {
// Check disk space on the location containing this volume. // Check disk space on the location containing this volume.
// We do this before the mutable borrow to avoid borrow conflicts. // We do this before the mutable borrow to avoid borrow conflicts.
let loc_idx = self.find_volume(vid).map(|(i, _)| i).ok_or(VolumeError::NotFound)?;
if self.locations[loc_idx].is_disk_space_low.load(Ordering::Relaxed) {
let loc_idx = self
.find_volume(vid)
.map(|(i, _)| i)
.ok_or(VolumeError::NotFound)?;
if self.locations[loc_idx]
.is_disk_space_low
.load(Ordering::Relaxed)
{
return Err(VolumeError::ReadOnly); return Err(VolumeError::ReadOnly);
} }
@ -241,7 +272,9 @@ impl Store {
/// Delete a needle from a volume. /// Delete a needle from a volume.
pub fn delete_volume_needle( pub fn delete_volume_needle(
&mut self, vid: VolumeId, n: &mut Needle,
&mut self,
vid: VolumeId,
n: &mut Needle,
) -> Result<Size, VolumeError> { ) -> Result<Size, VolumeError> {
let (_, vol) = self.find_volume_mut(vid).ok_or(VolumeError::NotFound)?; let (_, vol) = self.find_volume_mut(vid).ok_or(VolumeError::NotFound)?;
vol.delete_needle(n) vol.delete_needle(n)
@ -265,21 +298,25 @@ impl Store {
/// Total max volumes across all locations. /// Total max volumes across all locations.
pub fn max_volume_count(&self) -> i32 { pub fn max_volume_count(&self) -> i32 {
self.locations.iter()
self.locations
.iter()
.map(|loc| loc.max_volume_count.load(Ordering::Relaxed)) .map(|loc| loc.max_volume_count.load(Ordering::Relaxed))
.sum() .sum()
} }
/// Free volume slots across all locations. /// Free volume slots across all locations.
pub fn free_volume_count(&self) -> i32 { pub fn free_volume_count(&self) -> i32 {
self.locations.iter()
self.locations
.iter()
.map(|loc| loc.free_volume_count()) .map(|loc| loc.free_volume_count())
.sum() .sum()
} }
/// All volume IDs across all locations. /// All volume IDs across all locations.
pub fn all_volume_ids(&self) -> Vec<VolumeId> { pub fn all_volume_ids(&self) -> Vec<VolumeId> {
let mut ids: Vec<VolumeId> = self.locations.iter()
let mut ids: Vec<VolumeId> = self
.locations
.iter()
.flat_map(|loc| loc.volume_ids()) .flat_map(|loc| loc.volume_ids())
.collect(); .collect();
ids.sort(); ids.sort();
@ -297,15 +334,17 @@ impl Store {
shard_ids: &[u32], shard_ids: &[u32],
) -> Result<(), VolumeError> { ) -> Result<(), VolumeError> {
// Find the directory where the EC files live // Find the directory where the EC files live
let dir = self.find_ec_dir(vid, collection)
.ok_or_else(|| VolumeError::Io(io::Error::new(
let dir = self.find_ec_dir(vid, collection).ok_or_else(|| {
VolumeError::Io(io::Error::new(
io::ErrorKind::NotFound, io::ErrorKind::NotFound,
format!("ec volume {} shards not found on disk", vid), format!("ec volume {} shards not found on disk", vid),
)))?;
))
})?;
let ec_vol = self.ec_volumes.entry(vid).or_insert_with(|| {
EcVolume::new(&dir, &dir, collection, vid).unwrap()
});
let ec_vol = self
.ec_volumes
.entry(vid)
.or_insert_with(|| EcVolume::new(&dir, &dir, collection, vid).unwrap());
for &shard_id in shard_ids { for &shard_id in shard_ids {
let shard = EcVolumeShard::new(&dir, collection, vid, shard_id as u8); let shard = EcVolumeShard::new(&dir, collection, vid, shard_id as u8);
@ -316,11 +355,7 @@ impl Store {
} }
/// Unmount EC shards for a volume. /// Unmount EC shards for a volume.
pub fn unmount_ec_shards(
&mut self,
vid: VolumeId,
shard_ids: &[u32],
) {
pub fn unmount_ec_shards(&mut self, vid: VolumeId, shard_ids: &[u32]) {
if let Some(ec_vol) = self.ec_volumes.get_mut(&vid) { if let Some(ec_vol) = self.ec_volumes.get_mut(&vid) {
for &shard_id in shard_ids { for &shard_id in shard_ids {
ec_vol.remove_shard(shard_id as u8); ec_vol.remove_shard(shard_id as u8);
@ -333,12 +368,7 @@ impl Store {
} }
/// Delete EC shard files from disk. /// Delete EC shard files from disk.
pub fn delete_ec_shards(
&mut self,
vid: VolumeId,
collection: &str,
shard_ids: &[u32],
) {
pub fn delete_ec_shards(&mut self, vid: VolumeId, collection: &str, shard_ids: &[u32]) {
// Delete shard files from disk // Delete shard files from disk
for loc in &self.locations { for loc in &self.locations {
for &shard_id in shard_ids { for &shard_id in shard_ids {
@ -355,7 +385,8 @@ impl Store {
let all_gone = self.check_all_ec_shards_deleted(vid, collection); let all_gone = self.check_all_ec_shards_deleted(vid, collection);
if all_gone { if all_gone {
for loc in &self.locations { for loc in &self.locations {
let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid);
let base =
crate::storage::volume::volume_file_name(&loc.directory, collection, vid);
let _ = std::fs::remove_file(format!("{}.ecx", base)); let _ = std::fs::remove_file(format!("{}.ecx", base));
let _ = std::fs::remove_file(format!("{}.ecj", base)); let _ = std::fs::remove_file(format!("{}.ecj", base));
} }
@ -388,7 +419,12 @@ impl Store {
} }
/// Find the directory containing a specific EC shard file. /// Find the directory containing a specific EC shard file.
pub fn find_ec_shard_dir(&self, vid: VolumeId, collection: &str, shard_id: u8) -> Option<String> {
pub fn find_ec_shard_dir(
&self,
vid: VolumeId,
collection: &str,
shard_id: u8,
) -> Option<String> {
for loc in &self.locations { for loc in &self.locations {
let shard = EcVolumeShard::new(&loc.directory, collection, vid, shard_id); let shard = EcVolumeShard::new(&loc.directory, collection, vid, shard_id);
if std::path::Path::new(&shard.file_name()).exists() { if std::path::Path::new(&shard.file_name()).exists() {
@ -405,7 +441,10 @@ impl Store {
if let Some((_, v)) = self.find_volume(vid) { if let Some((_, v)) = self.find_volume(vid) {
Ok(v.garbage_level()) Ok(v.garbage_level())
} else { } else {
Err(format!("volume id {} is not found during check compact", vid.0))
Err(format!(
"volume id {} is not found during check compact",
vid.0
))
} }
} }
@ -437,7 +476,10 @@ impl Store {
let volume_size = v.dat_file_size().unwrap_or(0); let volume_size = v.dat_file_size().unwrap_or(0);
Ok((is_read_only, volume_size)) Ok((is_read_only, volume_size))
} else { } else {
Err(format!("volume id {} is not found during commit compact", vid.0))
Err(format!(
"volume id {} is not found during commit compact",
vid.0
))
} }
} }
@ -447,7 +489,10 @@ impl Store {
v.cleanup_compact() v.cleanup_compact()
.map_err(|e| format!("cleanup volume {}: {}", vid.0, e)) .map_err(|e| format!("cleanup volume {}: {}", vid.0, e))
} else { } else {
Err(format!("volume id {} is not found during cleaning up", vid.0))
Err(format!(
"volume id {} is not found during cleaning up",
vid.0
))
} }
} }
@ -475,7 +520,16 @@ mod tests {
fn make_test_store(dirs: &[&str]) -> Store { fn make_test_store(dirs: &[&str]) -> Store {
let mut store = Store::new(NeedleMapKind::InMemory); let mut store = Store::new(NeedleMapKind::InMemory);
for dir in dirs { for dir in dirs {
store.add_location(dir, dir, 10, DiskType::HardDrive, MinFreeSpace::Percent(1.0)).unwrap();
store
.add_location(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
} }
store store
} }
@ -486,7 +540,16 @@ mod tests {
let dir = tmp.path().to_str().unwrap(); let dir = tmp.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory); let mut store = Store::new(NeedleMapKind::InMemory);
store.add_location(dir, dir, 10, DiskType::HardDrive, MinFreeSpace::Percent(1.0)).unwrap();
store
.add_location(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
assert_eq!(store.locations.len(), 1); assert_eq!(store.locations.len(), 1);
assert_eq!(store.max_volume_count(), 10); assert_eq!(store.max_volume_count(), 10);
} }
@ -497,7 +560,9 @@ mod tests {
let dir = tmp.path().to_str().unwrap(); let dir = tmp.path().to_str().unwrap();
let mut store = make_test_store(&[dir]); let mut store = make_test_store(&[dir]);
store.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive).unwrap();
store
.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive)
.unwrap();
assert!(store.has_volume(VolumeId(1))); assert!(store.has_volume(VolumeId(1)));
assert!(!store.has_volume(VolumeId(2))); assert!(!store.has_volume(VolumeId(2)));
assert_eq!(store.total_volume_count(), 1); assert_eq!(store.total_volume_count(), 1);
@ -508,7 +573,9 @@ mod tests {
let tmp = TempDir::new().unwrap(); let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap(); let dir = tmp.path().to_str().unwrap();
let mut store = make_test_store(&[dir]); let mut store = make_test_store(&[dir]);
store.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive).unwrap();
store
.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive)
.unwrap();
// Write // Write
let mut n = Needle { let mut n = Needle {
@ -523,13 +590,20 @@ mod tests {
assert!(offset > 0); assert!(offset > 0);
// Read // Read
let mut read_n = Needle { id: NeedleId(1), ..Needle::default() };
let mut read_n = Needle {
id: NeedleId(1),
..Needle::default()
};
let count = store.read_volume_needle(VolumeId(1), &mut read_n).unwrap(); let count = store.read_volume_needle(VolumeId(1), &mut read_n).unwrap();
assert_eq!(count, 11); assert_eq!(count, 11);
assert_eq!(read_n.data, b"hello store"); assert_eq!(read_n.data, b"hello store");
// Delete // Delete
let mut del_n = Needle { id: NeedleId(1), cookie: Cookie(0xaa), ..Needle::default() };
let mut del_n = Needle {
id: NeedleId(1),
cookie: Cookie(0xaa),
..Needle::default()
};
let deleted = store.delete_volume_needle(VolumeId(1), &mut del_n).unwrap(); let deleted = store.delete_volume_needle(VolumeId(1), &mut del_n).unwrap();
assert!(deleted.0 > 0); assert!(deleted.0 > 0);
} }
@ -542,13 +616,35 @@ mod tests {
let dir2 = tmp2.path().to_str().unwrap(); let dir2 = tmp2.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory); let mut store = Store::new(NeedleMapKind::InMemory);
store.add_location(dir1, dir1, 5, DiskType::HardDrive, MinFreeSpace::Percent(1.0)).unwrap();
store.add_location(dir2, dir2, 5, DiskType::HardDrive, MinFreeSpace::Percent(1.0)).unwrap();
store
.add_location(
dir1,
dir1,
5,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
store
.add_location(
dir2,
dir2,
5,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
assert_eq!(store.max_volume_count(), 10); assert_eq!(store.max_volume_count(), 10);
// Add volumes — should go to location with fewest volumes // Add volumes — should go to location with fewest volumes
store.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive).unwrap();
store.add_volume(VolumeId(2), "", None, None, 0, DiskType::HardDrive).unwrap();
store
.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive)
.unwrap();
store
.add_volume(VolumeId(2), "", None, None, 0, DiskType::HardDrive)
.unwrap();
assert_eq!(store.total_volume_count(), 2); assert_eq!(store.total_volume_count(), 2);
// Both locations should have 1 volume each (load-balanced) // Both locations should have 1 volume each (load-balanced)
@ -562,9 +658,15 @@ mod tests {
let dir = tmp.path().to_str().unwrap(); let dir = tmp.path().to_str().unwrap();
let mut store = make_test_store(&[dir]); let mut store = make_test_store(&[dir]);
store.add_volume(VolumeId(1), "pics", None, None, 0, DiskType::HardDrive).unwrap();
store.add_volume(VolumeId(2), "pics", None, None, 0, DiskType::HardDrive).unwrap();
store.add_volume(VolumeId(3), "docs", None, None, 0, DiskType::HardDrive).unwrap();
store
.add_volume(VolumeId(1), "pics", None, None, 0, DiskType::HardDrive)
.unwrap();
store
.add_volume(VolumeId(2), "pics", None, None, 0, DiskType::HardDrive)
.unwrap();
store
.add_volume(VolumeId(3), "docs", None, None, 0, DiskType::HardDrive)
.unwrap();
assert_eq!(store.total_volume_count(), 3); assert_eq!(store.total_volume_count(), 3);
store.delete_collection("pics"); store.delete_collection("pics");
@ -578,7 +680,10 @@ mod tests {
let dir = tmp.path().to_str().unwrap(); let dir = tmp.path().to_str().unwrap();
let store = make_test_store(&[dir]); let store = make_test_store(&[dir]);
let mut n = Needle { id: NeedleId(1), ..Needle::default() };
let mut n = Needle {
id: NeedleId(1),
..Needle::default()
};
let err = store.read_volume_needle(VolumeId(99), &mut n); let err = store.read_volume_needle(VolumeId(99), &mut n);
assert!(matches!(err, Err(VolumeError::NotFound))); assert!(matches!(err, Err(VolumeError::NotFound)));
} }

41
seaweed-volume/tests/http_integration.rs

@ -10,7 +10,7 @@ use axum::http::{Request, StatusCode};
use tower::ServiceExt; // for `oneshot` use tower::ServiceExt; // for `oneshot`
use seaweed_volume::security::{Guard, SigningKey}; use seaweed_volume::security::{Guard, SigningKey};
use seaweed_volume::server::volume_server::{VolumeServerState, build_admin_router};
use seaweed_volume::server::volume_server::{build_admin_router, VolumeServerState};
use seaweed_volume::storage::needle_map::NeedleMapKind; use seaweed_volume::storage::needle_map::NeedleMapKind;
use seaweed_volume::storage::store::Store; use seaweed_volume::storage::store::Store;
use seaweed_volume::storage::types::{DiskType, VolumeId}; use seaweed_volume::storage::types::{DiskType, VolumeId};
@ -25,7 +25,14 @@ fn test_state() -> (Arc<VolumeServerState>, TempDir) {
let mut store = Store::new(NeedleMapKind::InMemory); let mut store = Store::new(NeedleMapKind::InMemory);
store store
.add_location(dir, dir, 10, DiskType::HardDrive, seaweed_volume::config::MinFreeSpace::Percent(1.0))
.add_location(
dir,
dir,
10,
DiskType::HardDrive,
seaweed_volume::config::MinFreeSpace::Percent(1.0),
Vec::new(),
)
.expect("failed to add location"); .expect("failed to add location");
store store
.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive) .add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive)
@ -145,16 +152,10 @@ async fn status_returns_json_with_version_and_volumes() {
serde_json::from_slice(&body).expect("response is not valid JSON"); serde_json::from_slice(&body).expect("response is not valid JSON");
assert!(json.get("Version").is_some(), "missing 'Version' field"); assert!(json.get("Version").is_some(), "missing 'Version' field");
assert!(
json["Version"].is_string(),
"'Version' should be a string"
);
assert!(json["Version"].is_string(), "'Version' should be a string");
assert!(json.get("Volumes").is_some(), "missing 'Volumes' field"); assert!(json.get("Volumes").is_some(), "missing 'Volumes' field");
assert!(
json["Volumes"].is_array(),
"'Volumes' should be an array"
);
assert!(json["Volumes"].is_array(), "'Volumes' should be an array");
// We created one volume in test_state, so the array should have one entry // We created one volume in test_state, so the array should have one entry
let volumes = json["Volumes"].as_array().unwrap(); let volumes = json["Volumes"].as_array().unwrap();
@ -201,12 +202,7 @@ async fn write_then_read_needle() {
// --- GET (read back) --- // --- GET (read back) ---
let app = build_admin_router(state.clone()); let app = build_admin_router(state.clone());
let response = app let response = app
.oneshot(
Request::builder()
.uri(uri)
.body(Body::empty())
.unwrap(),
)
.oneshot(Request::builder().uri(uri).body(Body::empty()).unwrap())
.await .await
.unwrap(); .unwrap();
@ -261,12 +257,7 @@ async fn delete_then_get_returns_404() {
// GET should now return 404 // GET should now return 404
let app = build_admin_router(state.clone()); let app = build_admin_router(state.clone());
let response = app let response = app
.oneshot(
Request::builder()
.uri(uri)
.body(Body::empty())
.unwrap(),
)
.oneshot(Request::builder().uri(uri).body(Body::empty()).unwrap())
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@ -325,7 +316,11 @@ async fn head_returns_headers_without_body() {
.unwrap() .unwrap()
.parse() .parse()
.expect("Content-Length should be a number"); .expect("Content-Length should be a number");
assert_eq!(len, payload.len(), "Content-Length should match payload size");
assert_eq!(
len,
payload.len(),
"Content-Length should match payload size"
);
// Body should be empty for HEAD // Body should be empty for HEAD
let body = body_bytes(response).await; let body = body_bytes(response).await;

Loading…
Cancel
Save