Browse Source

feat: add -options flag, /metrics on admin port, and SIGHUP reload

Three operational improvements to match Go volume server behavior:

1. Options file (-options flag):
   Load CLI options from a file, one per line (key=value format).
   Supports comments (#), leading dashes stripped. CLI args override.

2. /metrics on admin port:
   Serve Prometheus metrics on the main admin HTTP port in addition
   to the separate metrics port, matching Go's behavior.

3. SIGHUP reload:
   On SIGHUP, reload security config (whitelist from security.toml)
   and scan disk locations for new volumes (LoadNewVolumes equivalent).
   Guard wrapped in RwLock for runtime whitelist updates.
rust-volume-server
Chris Lu 4 days ago
parent
commit
58aa5b172b
  1. 228
      seaweed-volume/src/config.rs
  2. 43
      seaweed-volume/src/main.rs
  3. 12
      seaweed-volume/src/server/handlers.rs
  4. 4
      seaweed-volume/src/server/heartbeat.rs
  5. 10
      seaweed-volume/src/server/volume_server.rs
  6. 4
      seaweed-volume/src/server/write_queue.rs
  7. 10
      seaweed-volume/src/storage/store.rs
  8. 8
      seaweed-volume/tests/http_integration.rs

228
seaweed-volume/src/config.rs

@ -181,6 +181,10 @@ pub struct Cli {
/// Path to security.toml configuration file for JWT signing keys.
#[arg(long = "securityFile", default_value = "")]
pub security_file: String,
/// A file of command line options, each line in optionName=optionValue format.
#[arg(long = "options", default_value = "")]
pub options: String,
}
/// Resolved configuration after applying defaults and validation.
@ -237,6 +241,8 @@ pub struct VolumeServerConfig {
pub grpc_ca_file: String,
/// Enable batched write queue for improved throughput under load.
pub enable_write_queue: bool,
/// Path to security.toml — stored for SIGHUP reload.
pub security_file: String,
}
pub use crate::storage::needle_map::NeedleMapKind;
@ -292,12 +298,124 @@ fn normalize_args_vec(args: Vec<String>) -> Vec<String> {
}
/// Parse CLI arguments and resolve all defaults — mirroring Go's `runVolume()` + `startVolumeServer()`.
///
/// Supports `-options <file>` to load defaults from a file (same format as Go's fla9).
/// CLI arguments take precedence over file values.
pub fn parse_cli() -> VolumeServerConfig {
let args: Vec<String> = std::env::args().collect();
let cli = Cli::parse_from(normalize_args_vec(args));
let normalized = normalize_args_vec(args);
let merged = merge_options_file(normalized);
let cli = Cli::parse_from(merged);
resolve_config(cli)
}
/// Find `-options`/`--options` in args, parse the referenced file, and inject
/// file-based defaults for any flags not already set on the command line.
///
/// File format (matching Go's fla9.ParseFile):
/// - One option per line: `key=value`, `key value`, or `key:value`
/// - Lines starting with `#` are comments; blank lines are ignored
/// - Leading `-` on key names is stripped
/// - CLI arguments take precedence over file values
fn merge_options_file(args: Vec<String>) -> Vec<String> {
// Find the options file path from the args
let options_path = find_options_arg(&args);
if options_path.is_empty() {
return args;
}
let content = match std::fs::read_to_string(&options_path) {
Ok(c) => c,
Err(e) => {
eprintln!("WARNING: could not read options file {}: {}", options_path, e);
return args;
}
};
// Collect which flags are already explicitly set on the command line.
let mut cli_flags: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut i = 1; // skip binary name
while i < args.len() {
let arg = &args[i];
if arg == "--" {
break;
}
if arg.starts_with("--") {
let key = if let Some(eq) = arg.find('=') {
arg[2..eq].to_string()
} else {
arg[2..].to_string()
};
cli_flags.insert(key);
} else if arg.starts_with('-') && arg.len() > 2 {
// Single-dash long option (already normalized to -- at this point,
// but handle both for safety)
let without_dash = &arg[1..];
let key = if let Some(eq) = without_dash.find('=') {
without_dash[..eq].to_string()
} else {
without_dash.to_string()
};
cli_flags.insert(key);
}
i += 1;
}
// Parse file and append missing options
let mut extra_args: Vec<String> = Vec::new();
for line in content.lines() {
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with('#') {
continue;
}
// Split on first `=`, ` `, or `:`
let (name, value) = if let Some(pos) = trimmed.find(|c: char| c == '=' || c == ' ' || c == ':') {
(trimmed[..pos].trim().to_string(), trimmed[pos + 1..].trim().to_string())
} else {
(trimmed.to_string(), String::new())
};
// Strip leading dashes from name
let name = name.trim_start_matches('-').to_string();
if name.is_empty() || name == "options" {
continue;
}
// Skip if already set on CLI
if cli_flags.contains(&name) {
continue;
}
extra_args.push(format!("--{}", name));
if !value.is_empty() {
extra_args.push(value);
}
}
let mut merged = args;
merged.extend(extra_args);
merged
}
/// Extract the options file path from args (looks for --options or -options).
fn find_options_arg(args: &[String]) -> String {
for i in 1..args.len() {
if args[i] == "--options" || args[i] == "-options" {
if i + 1 < args.len() {
return args[i + 1].clone();
}
}
if let Some(rest) = args[i].strip_prefix("--options=") {
return rest.to_string();
}
if let Some(rest) = args[i].strip_prefix("-options=") {
return rest.to_string();
}
}
String::new()
}
/// Parse a duration string like "60s", "5m", "1h" into a std::time::Duration.
fn parse_duration(s: &str) -> std::time::Duration {
let s = s.trim();
@ -648,6 +766,7 @@ fn resolve_config(cli: Cli) -> VolumeServerConfig {
enable_write_queue: std::env::var("SEAWEED_WRITE_QUEUE")
.map(|v| v == "1" || v == "true")
.unwrap_or(false),
security_file: cli.security_file,
}
}
@ -688,7 +807,7 @@ pub struct SecurityConfig {
/// cert = "/path/to/cert.pem"
/// key = "/path/to/key.pem"
/// ```
fn parse_security_config(path: &str) -> SecurityConfig {
pub fn parse_security_config(path: &str) -> SecurityConfig {
if path.is_empty() {
return SecurityConfig::default();
}
@ -936,4 +1055,109 @@ ui = true
assert_eq!(cfg.jwt_signing_key, b"secret");
assert!(cfg.access_ui);
}
#[test]
fn test_merge_options_file_basic() {
let tmp = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
tmp.path(),
"port=9999\ndir=/data\nmaster=localhost:9333\n",
)
.unwrap();
let args = vec![
"bin".into(),
"--options".into(),
tmp.path().to_str().unwrap().into(),
];
let merged = merge_options_file(args);
// Should contain the original args plus the file-based ones
assert!(merged.contains(&"--port".to_string()));
assert!(merged.contains(&"9999".to_string()));
assert!(merged.contains(&"--dir".to_string()));
assert!(merged.contains(&"/data".to_string()));
}
#[test]
fn test_merge_options_file_cli_precedence() {
let tmp = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
tmp.path(),
"port=9999\ndir=/data\n",
)
.unwrap();
let args = vec![
"bin".into(),
"--port".into(),
"8080".into(),
"--options".into(),
tmp.path().to_str().unwrap().into(),
];
let merged = merge_options_file(args);
// port should NOT be duplicated from file since CLI already set it
let port_count = merged.iter().filter(|a| *a == "--port").count();
assert_eq!(port_count, 1, "CLI port should take precedence, file port skipped");
// dir should be added from file
assert!(merged.contains(&"--dir".to_string()));
}
#[test]
fn test_merge_options_file_comments_and_blanks() {
let tmp = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
tmp.path(),
"# this is a comment\n\nport=9999\n# another comment\ndir=/data\n",
)
.unwrap();
let args = vec![
"bin".into(),
"--options".into(),
tmp.path().to_str().unwrap().into(),
];
let merged = merge_options_file(args);
assert!(merged.contains(&"--port".to_string()));
assert!(merged.contains(&"--dir".to_string()));
}
#[test]
fn test_merge_options_file_with_dashes_in_key() {
let tmp = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
tmp.path(),
"-port=9999\n--dir=/data\nip.bind=0.0.0.0\n",
)
.unwrap();
let args = vec![
"bin".into(),
"--options".into(),
tmp.path().to_str().unwrap().into(),
];
let merged = merge_options_file(args);
assert!(merged.contains(&"--port".to_string()));
assert!(merged.contains(&"--dir".to_string()));
assert!(merged.contains(&"--ip.bind".to_string()));
}
#[test]
fn test_find_options_arg() {
assert_eq!(
find_options_arg(&["bin".into(), "--options".into(), "/tmp/opts".into()]),
"/tmp/opts"
);
assert_eq!(
find_options_arg(&["bin".into(), "-options".into(), "/tmp/opts".into()]),
"/tmp/opts"
);
assert_eq!(
find_options_arg(&["bin".into(), "--options=/tmp/opts".into()]),
"/tmp/opts"
);
assert_eq!(
find_options_arg(&["bin".into(), "--port".into(), "8080".into()]),
""
);
}
}

43
seaweed-volume/src/main.rs

@ -132,9 +132,12 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
let master_url = config.masters.first().cloned().unwrap_or_default();
let self_url = format!("{}:{}", config.ip, config.port);
let security_file = config.security_file.clone();
let cli_white_list = config.white_list.clone();
let state = Arc::new(VolumeServerState {
store: RwLock::new(store),
guard,
guard: RwLock::new(guard),
is_stopping: RwLock::new(false),
maintenance: std::sync::atomic::AtomicBool::new(false),
state_version: std::sync::atomic::AtomicU32::new(0),
@ -165,6 +168,8 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
metrics_notify: tokio::sync::Notify::new(),
has_slow_read: config.has_slow_read,
read_buffer_size_bytes: (config.read_buffer_size_mb.max(1) as usize) * 1024 * 1024,
security_file,
cli_white_list,
});
// Initialize the batched write queue if enabled
@ -272,6 +277,42 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
let _ = shutdown_tx_clone.send(());
});
// Set up SIGHUP handler for config reload (mirrors Go's grace.OnReload)
#[cfg(unix)]
{
let state_reload = state.clone();
tokio::spawn(async move {
let mut sighup =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
.expect("Failed to install SIGHUP handler");
loop {
sighup.recv().await;
info!("Received SIGHUP, reloading...");
// 1. Load new volumes from disk (Go's LoadNewVolumes)
{
info!("Loading new volume ids...");
let mut store = state_reload.store.write().unwrap();
store.load_new_volumes();
}
// 2. Reload security config (Go's Reload)
{
info!("Reloading security config...");
let sec = config::parse_security_config(&state_reload.security_file);
let mut whitelist = state_reload.cli_white_list.clone();
whitelist.extend(sec.guard_white_list.iter().cloned());
let mut guard = state_reload.guard.write().unwrap();
guard.update_whitelist(&whitelist);
}
// Trigger heartbeat to report new volumes
state_reload.volume_state_notify.notify_one();
info!("SIGHUP reload complete");
}
});
}
// Build optional TLS acceptor for HTTPS
let https_tls_acceptor =
if !config.https_cert_file.is_empty() && !config.https_key_file.is_empty() {

12
seaweed-volume/src/server/handlers.rs

@ -663,6 +663,8 @@ async fn get_or_head_handler_inner(
let token = extract_jwt(&headers, request.uri());
if let Err(e) = state
.guard
.read()
.unwrap()
.check_jwt_for_file(token.as_deref(), &file_id, false)
{
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
@ -1436,6 +1438,8 @@ pub async fn post_handler(
let token = extract_jwt(&headers, request.uri());
if let Err(e) = state
.guard
.read()
.unwrap()
.check_jwt_for_file(token.as_deref(), &file_id, true)
{
return json_error_with_query(
@ -1925,6 +1929,8 @@ pub async fn delete_handler(
let token = extract_jwt(&headers, request.uri());
if let Err(e) = state
.guard
.read()
.unwrap()
.check_jwt_for_file(token.as_deref(), &file_id, true)
{
return json_error_with_query(
@ -2323,11 +2329,13 @@ pub async fn ui_handler(
) -> Response {
// If JWT signing is enabled, require auth
let token = extract_jwt(&headers, &axum::http::Uri::from_static("/ui/index.html"));
if let Err(e) = state.guard.check_jwt(token.as_deref(), false) {
if state.guard.has_read_signing_key() {
let guard = state.guard.read().unwrap();
if let Err(e) = guard.check_jwt(token.as_deref(), false) {
if guard.has_read_signing_key() {
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
}
}
drop(guard);
let html = r#"<!DOCTYPE html>
<html><head><title>SeaweedFS Volume Server</title></head>

4
seaweed-volume/src/server/heartbeat.rs

@ -632,7 +632,7 @@ mod tests {
fn test_state_with_store(store: Store) -> Arc<VolumeServerState> {
Arc::new(VolumeServerState {
store: RwLock::new(store),
guard: Guard::new(&[], SigningKey(vec![]), 0, SigningKey(vec![]), 0),
guard: RwLock::new(Guard::new(&[], SigningKey(vec![]), 0, SigningKey(vec![]), 0)),
is_stopping: RwLock::new(false),
maintenance: std::sync::atomic::AtomicBool::new(false),
state_version: std::sync::atomic::AtomicU32::new(0),
@ -661,6 +661,8 @@ mod tests {
metrics_notify: tokio::sync::Notify::new(),
has_slow_read: true,
read_buffer_size_bytes: 4 * 1024 * 1024,
security_file: String::new(),
cli_white_list: vec![],
})
}

10
seaweed-volume/src/server/volume_server.rs

@ -36,7 +36,7 @@ pub struct RuntimeMetricsConfig {
/// Shared state for the volume server.
pub struct VolumeServerState {
pub store: RwLock<Store>,
pub guard: Guard,
pub guard: RwLock<Guard>,
pub is_stopping: RwLock<bool>,
/// Maintenance mode flag.
pub maintenance: AtomicBool,
@ -85,6 +85,10 @@ pub struct VolumeServerState {
/// Read tuning flags for large-file streaming.
pub has_slow_read: bool,
pub read_buffer_size_bytes: usize,
/// Path to security.toml — stored for SIGHUP reload.
pub security_file: String,
/// Original CLI whitelist entries — stored for SIGHUP reload.
pub cli_white_list: Vec<String>,
}
impl VolumeServerState {
@ -229,7 +233,9 @@ fn public_options_response() -> Response {
/// UI route is only registered when no signing keys are configured,
/// matching Go's `if signingKey == "" || enableUiAccess` check.
pub fn build_admin_router(state: Arc<VolumeServerState>) -> Router {
let ui_enabled = state.guard.signing_key.0.is_empty() && !state.guard.has_read_signing_key();
let guard = state.guard.read().unwrap();
let ui_enabled = guard.signing_key.0.is_empty() && !guard.has_read_signing_key();
drop(guard);
build_admin_router_with_ui(state, ui_enabled)
}

4
seaweed-volume/src/server/write_queue.rs

@ -171,7 +171,7 @@ mod tests {
Arc::new(VolumeServerState {
store: RwLock::new(store),
guard,
guard: RwLock::new(guard),
is_stopping: RwLock::new(false),
maintenance: AtomicBool::new(false),
state_version: AtomicU32::new(0),
@ -202,6 +202,8 @@ mod tests {
metrics_notify: tokio::sync::Notify::new(),
has_slow_read: true,
read_buffer_size_bytes: 4 * 1024 * 1024,
security_file: String::new(),
cli_white_list: vec![],
})
}

10
seaweed-volume/src/storage/store.rs

@ -84,6 +84,16 @@ impl Store {
Ok(())
}
/// Scan disk locations for new volume files and load them.
/// Mirrors Go's `Store.LoadNewVolumes()`.
pub fn load_new_volumes(&mut self) {
for loc in &mut self.locations {
if let Err(e) = loc.load_existing_volumes(self.needle_map_kind) {
tracing::error!("load_new_volumes error in {}: {}", loc.directory, e);
}
}
}
// ---- Volume lookup ----
/// Find which location contains a volume.

8
seaweed-volume/tests/http_integration.rs

@ -48,7 +48,7 @@ fn test_state_with_signing_key(signing_key: Vec<u8>) -> (Arc<VolumeServerState>,
let guard = Guard::new(&[], SigningKey(signing_key), 0, SigningKey(vec![]), 0);
let state = Arc::new(VolumeServerState {
store: RwLock::new(store),
guard,
guard: RwLock::new(guard),
is_stopping: RwLock::new(false),
maintenance: std::sync::atomic::AtomicBool::new(false),
state_version: std::sync::atomic::AtomicU32::new(0),
@ -81,6 +81,8 @@ fn test_state_with_signing_key(signing_key: Vec<u8>) -> (Arc<VolumeServerState>,
metrics_notify: tokio::sync::Notify::new(),
has_slow_read: false,
read_buffer_size_bytes: 1024 * 1024,
security_file: String::new(),
cli_white_list: vec![],
});
(state, tmp)
}
@ -177,7 +179,7 @@ async fn status_returns_json_with_version_and_volumes() {
}
#[tokio::test]
async fn admin_router_does_not_expose_metrics() {
async fn admin_router_exposes_metrics() {
let (state, _tmp) = test_state();
let app = build_admin_router(state);
@ -191,7 +193,7 @@ async fn admin_router_does_not_expose_metrics() {
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]

Loading…
Cancel
Save