From 88bfa5cc1892ad9715fb4d7729913b1b75099f6c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Mar 2026 15:37:31 -0800 Subject: [PATCH] Wire up main.rs with async startup, HTTP + gRPC servers, and graceful shutdown Initializes Store from CLI config, creates DiskLocations, starts axum HTTP server (admin + optional public) and tonic gRPC server concurrently. Handles SIGINT/SIGTERM for graceful shutdown. Removes duplicate NeedleMapKind enum from config.rs in favor of the canonical one in storage::needle_map. --- seaweed-volume/src/config.rs | 8 +- seaweed-volume/src/main.rs | 169 ++++++++++++++++++++++++++++++++++- 2 files changed, 167 insertions(+), 10 deletions(-) diff --git a/seaweed-volume/src/config.rs b/seaweed-volume/src/config.rs index 58461f51a..31162086a 100644 --- a/seaweed-volume/src/config.rs +++ b/seaweed-volume/src/config.rs @@ -222,13 +222,7 @@ pub struct VolumeServerConfig { pub debug_port: u16, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum NeedleMapKind { - InMemory, - LevelDb, - LevelDbMedium, - LevelDbLarge, -} +pub use crate::storage::needle_map::NeedleMapKind; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ReadMode { diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index d9c6a3f50..6a0d58c0d 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -16,8 +16,171 @@ pub mod pb { } } +use std::sync::{Arc, RwLock}; + +use tracing::{info, error}; + +use crate::config::VolumeServerConfig; +use crate::security::{Guard, SigningKey}; +use crate::server::grpc_server::VolumeGrpcService; +use crate::server::volume_server::VolumeServerState; +use crate::storage::store::Store; +use crate::storage::types::DiskType; +use crate::pb::volume_server_pb::volume_server_server::VolumeServerServer; + fn main() { - let cli = config::parse_cli(); - println!("SeaweedFS Volume Server (Rust)"); - println!("Configuration: {:#?}", cli); + // Initialize tracing + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .init(); + + let config = config::parse_cli(); + info!("SeaweedFS Volume Server (Rust) v{}", env!("CARGO_PKG_VERSION")); + + // Build the tokio runtime and run the async entry point + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed to build tokio runtime"); + + rt.block_on(run(config)); +} + +async fn run(config: VolumeServerConfig) { + // Initialize the store + let mut store = Store::new(config.index_type); + store.ip = config.ip.clone(); + store.port = config.port; + store.grpc_port = config.grpc_port; + store.public_url = config.public_url.clone(); + store.data_center = config.data_center.clone(); + store.rack = config.rack.clone(); + + // Add disk locations + for (i, dir) in config.folders.iter().enumerate() { + let idx_dir = if config.idx_folder.is_empty() { + dir.as_str() + } else { + config.idx_folder.as_str() + }; + let max_volumes = config.folder_max_limits[i]; + let disk_type = DiskType::from_string(&config.disk_types[i]); + + info!( + "Adding storage location: {} (max_volumes={}, disk_type={:?})", + dir, max_volumes, disk_type + ); + if let Err(e) = store.add_location(dir, idx_dir, max_volumes, disk_type) { + error!("Failed to add storage location {}: {}", dir, e); + return; + } + } + + // Build shared state + let guard = Guard::new( + &config.white_list, + SigningKey(vec![]), + 0, + SigningKey(vec![]), + 0, + ); + let state = Arc::new(VolumeServerState { + store: RwLock::new(store), + guard, + is_stopping: RwLock::new(false), + }); + + // Build HTTP routers + let admin_router = server::volume_server::build_admin_router(state.clone()); + let admin_addr = format!("{}:{}", config.bind_ip, config.port); + + let public_port = config.public_port; + let needs_public = public_port != config.port; + + // Build gRPC service + let grpc_service = VolumeGrpcService { + state: state.clone(), + }; + let grpc_addr = format!("{}:{}", config.bind_ip, config.grpc_port); + + info!("Starting HTTP server on {}", admin_addr); + info!("Starting gRPC server on {}", grpc_addr); + if needs_public { + info!("Starting public HTTP server on {}:{}", config.bind_ip, public_port); + } + + // Set up graceful shutdown via SIGINT/SIGTERM + let state_shutdown = state.clone(); + let shutdown = async move { + let ctrl_c = tokio::signal::ctrl_c(); + #[cfg(unix)] + { + let mut sigterm = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("Failed to install SIGTERM handler"); + tokio::select! { + _ = ctrl_c => { info!("Received SIGINT, shutting down..."); } + _ = sigterm.recv() => { info!("Received SIGTERM, shutting down..."); } + } + } + #[cfg(not(unix))] + { + ctrl_c.await.ok(); + info!("Received shutdown signal..."); + } + *state_shutdown.is_stopping.write().unwrap() = true; + }; + + // Spawn all servers concurrently + let admin_listener = tokio::net::TcpListener::bind(&admin_addr) + .await + .unwrap_or_else(|e| panic!("Failed to bind HTTP to {}: {}", admin_addr, e)); + info!("HTTP server listening on {}", admin_addr); + + let http_handle = tokio::spawn(async move { + if let Err(e) = axum::serve(admin_listener, admin_router) + .with_graceful_shutdown(shutdown) + .await + { + error!("HTTP server error: {}", e); + } + }); + + let grpc_handle = tokio::spawn(async move { + let addr = grpc_addr.parse().expect("Invalid gRPC address"); + info!("gRPC server listening on {}", addr); + tonic::transport::Server::builder() + .add_service(VolumeServerServer::new(grpc_service)) + .serve(addr) + .await + .unwrap_or_else(|e| error!("gRPC server error: {}", e)); + }); + + let public_handle = if needs_public { + let public_router = server::volume_server::build_public_router(state.clone()); + let public_addr = format!("{}:{}", config.bind_ip, public_port); + let listener = tokio::net::TcpListener::bind(&public_addr) + .await + .unwrap_or_else(|e| panic!("Failed to bind public HTTP to {}: {}", public_addr, e)); + info!("Public HTTP server listening on {}", public_addr); + Some(tokio::spawn(async move { + if let Err(e) = axum::serve(listener, public_router).await { + error!("Public HTTP server error: {}", e); + } + })) + } else { + None + }; + + // Wait for all servers + let _ = http_handle.await; + let _ = grpc_handle.await; + if let Some(h) = public_handle { + let _ = h.await; + } + + info!("Volume server stopped."); }