From 181b826f946d7f53b11ff741ca04668fbc7ed89b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Mar 2026 14:47:13 -0800 Subject: [PATCH] seaweed-volume: add CLI with all 40 flags and config resolution All CLI flags match Go volume server: --port, --port.grpc, --port.public, --ip, --id, --publicUrl, --ip.bind, --master, --mserver (deprecated compat), --preStopSeconds, --idleTimeout, --dataCenter, --rack, --index, --disk, --tags, --images.fix.orientation, --readMode, --cpuprofile, --memprofile, --compactionMBps, --maintenanceMBps, --fileSizeLimitMB, --concurrentUploadLimitMB, --concurrentDownloadLimitMB, --pprof, --metricsPort, --metricsIp, --dir, --dir.idx, --max, --whiteList, --minFreeSpacePercent, --minFreeSpace, --inflightUploadDataTimeout, --inflightDownloadDataTimeout, --hasSlowRead, --readBufferSizeMB, --index.leveldbTimeout, --debug, --debug.port. Includes config resolution logic matching Go's startVolumeServer(): - Comma-separated folder/max/disk/tags parsing with single-to-all expansion - MinFreeSpace parsing (percent or human-readable bytes) - IP auto-detection, port defaults, public URL resolution - 6 unit tests for duration, min-free-space, and tag parsing --- seaweed-volume/src/config.rs | 659 +++++++++++++++++++++++++++++++++++ seaweed-volume/src/main.rs | 7 + 2 files changed, 666 insertions(+) create mode 100644 seaweed-volume/src/config.rs create mode 100644 seaweed-volume/src/main.rs diff --git a/seaweed-volume/src/config.rs b/seaweed-volume/src/config.rs new file mode 100644 index 000000000..58461f51a --- /dev/null +++ b/seaweed-volume/src/config.rs @@ -0,0 +1,659 @@ +use clap::Parser; +use std::net::UdpSocket; + +/// SeaweedFS Volume Server (Rust implementation) +/// +/// Start a volume server to provide storage spaces. +#[derive(Parser, Debug)] +#[command(name = "seaweed-volume", version, about)] +pub struct Cli { + /// HTTP listen port + #[arg(long = "port", default_value_t = 8080)] + pub port: u16, + + /// gRPC listen port. If 0, defaults to port + 10000. + #[arg(long = "port.grpc", default_value_t = 0)] + pub port_grpc: u16, + + /// Port opened to public. If 0, defaults to same as --port. + #[arg(long = "port.public", default_value_t = 0)] + pub port_public: u16, + + /// IP or server name, also used as identifier. + /// If empty, auto-detected. + #[arg(long = "ip", default_value = "")] + pub ip: String, + + /// Volume server ID. If empty, defaults to ip:port. + #[arg(long = "id", default_value = "")] + pub id: String, + + /// Publicly accessible address. + #[arg(long = "publicUrl", default_value = "")] + pub public_url: String, + + /// IP address to bind to. If empty, defaults to same as --ip. + #[arg(long = "ip.bind", default_value = "")] + pub bind_ip: String, + + /// Comma-separated master server addresses. + #[arg(long = "master", default_value = "localhost:9333")] + pub master: String, + + /// Comma-separated master servers (deprecated, use --master instead). + #[arg(long = "mserver", default_value = "")] + pub mserver: String, + + /// Number of seconds between stop sending heartbeats and stopping the volume server. + #[arg(long = "preStopSeconds", default_value_t = 10)] + pub pre_stop_seconds: u32, + + /// Connection idle seconds. + #[arg(long = "idleTimeout", default_value_t = 30)] + pub idle_timeout: u32, + + /// Current volume server's data center name. + #[arg(long = "dataCenter", default_value = "")] + pub data_center: String, + + /// Current volume server's rack name. + #[arg(long = "rack", default_value = "")] + pub rack: String, + + /// Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance. + #[arg(long = "index", default_value = "memory")] + pub index: String, + + /// [hdd|ssd|] hard drive or solid state drive or any tag. + #[arg(long = "disk", default_value = "")] + pub disk: String, + + /// Comma-separated tag groups per data dir; each group uses ':' (e.g. fast:ssd,archive). + #[arg(long = "tags", default_value = "")] + pub tags: String, + + /// Adjust jpg orientation when uploading. + #[arg(long = "images.fix.orientation", default_value_t = false)] + pub fix_jpg_orientation: bool, + + /// [local|proxy|redirect] how to deal with non-local volume. + #[arg(long = "readMode", default_value = "proxy")] + pub read_mode: String, + + /// CPU profile output file. + #[arg(long = "cpuprofile", default_value = "")] + pub cpu_profile: String, + + /// Memory profile output file. + #[arg(long = "memprofile", default_value = "")] + pub mem_profile: String, + + /// Limit background compaction or copying speed in mega bytes per second. + #[arg(long = "compactionMBps", default_value_t = 0)] + pub compaction_mb_per_second: u32, + + /// Limit maintenance (replication/balance) IO rate in MB/s. 0 means no limit. + #[arg(long = "maintenanceMBps", default_value_t = 0)] + pub maintenance_mb_per_second: u32, + + /// Limit file size to avoid out of memory. + #[arg(long = "fileSizeLimitMB", default_value_t = 256)] + pub file_size_limit_mb: u32, + + /// Limit total concurrent upload size in MB, 0 means unlimited. + #[arg(long = "concurrentUploadLimitMB", default_value_t = 0)] + pub concurrent_upload_limit_mb: u32, + + /// Limit total concurrent download size in MB, 0 means unlimited. + #[arg(long = "concurrentDownloadLimitMB", default_value_t = 0)] + pub concurrent_download_limit_mb: u32, + + /// Enable pprof-equivalent HTTP handlers. Precludes --memprofile and --cpuprofile. + #[arg(long = "pprof", default_value_t = false)] + pub pprof: bool, + + /// Prometheus metrics listen port. + #[arg(long = "metricsPort", default_value_t = 0)] + pub metrics_port: u16, + + /// Metrics listen IP. If empty, defaults to same as --ip.bind. + #[arg(long = "metricsIp", default_value = "")] + pub metrics_ip: String, + + /// Directories to store data files. dir[,dir]... + #[arg(long = "dir", default_value = "/tmp")] + pub dir: String, + + /// Directory to store .idx files. + #[arg(long = "dir.idx", default_value = "")] + pub dir_idx: String, + + /// Maximum numbers of volumes, count[,count]... + /// If set to zero, the limit will be auto configured as free disk space divided by volume size. + #[arg(long = "max", default_value = "8")] + pub max: String, + + /// Comma separated IP addresses having write permission. No limit if empty. + #[arg(long = "whiteList", default_value = "")] + pub white_list: String, + + /// Minimum free disk space (default to 1%). Low disk space will mark all volumes as ReadOnly. + /// Deprecated: use --minFreeSpace instead. + #[arg(long = "minFreeSpacePercent", default_value = "1")] + pub min_free_space_percent: String, + + /// Min free disk space (value<=100 as percentage like 1, other as human readable bytes, like 10GiB). + /// Low disk space will mark all volumes as ReadOnly. + #[arg(long = "minFreeSpace", default_value = "")] + pub min_free_space: String, + + /// Inflight upload data wait timeout of volume servers. + #[arg(long = "inflightUploadDataTimeout", default_value = "60s")] + pub inflight_upload_data_timeout: String, + + /// Inflight download data wait timeout of volume servers. + #[arg(long = "inflightDownloadDataTimeout", default_value = "60s")] + pub inflight_download_data_timeout: String, + + /// if true, prevents slow reads from blocking other requests, + /// but large file read P99 latency will increase. + #[arg(long = "hasSlowRead", default_value_t = true)] + pub has_slow_read: bool, + + /// larger values can optimize query performance but will increase memory usage. + /// Use with hasSlowRead normally. + #[arg(long = "readBufferSizeMB", default_value_t = 4)] + pub read_buffer_size_mb: u32, + + /// Alive time for leveldb (default to 0). If leveldb of volume is not accessed in + /// ldbTimeout hours, it will be offloaded to reduce opened files and memory consumption. + #[arg(long = "index.leveldbTimeout", default_value_t = 0)] + pub ldb_timeout: i64, + + /// Serves runtime profiling data on the port specified by --debug.port. + #[arg(long = "debug", default_value_t = false)] + pub debug: bool, + + /// HTTP port for debugging. + #[arg(long = "debug.port", default_value_t = 6060)] + pub debug_port: u16, +} + +/// Resolved configuration after applying defaults and validation. +#[derive(Debug)] +pub struct VolumeServerConfig { + pub port: u16, + pub grpc_port: u16, + pub public_port: u16, + pub ip: String, + pub bind_ip: String, + pub public_url: String, + pub id: String, + pub masters: Vec, + pub pre_stop_seconds: u32, + pub idle_timeout: u32, + pub data_center: String, + pub rack: String, + pub index_type: NeedleMapKind, + pub disk_type: String, + pub folders: Vec, + pub folder_max_limits: Vec, + pub folder_tags: Vec>, + pub min_free_spaces: Vec, + pub disk_types: Vec, + pub idx_folder: String, + pub white_list: Vec, + pub fix_jpg_orientation: bool, + pub read_mode: ReadMode, + pub compaction_byte_per_second: i64, + pub maintenance_byte_per_second: i64, + pub file_size_limit_bytes: i64, + pub concurrent_upload_limit: i64, + pub concurrent_download_limit: i64, + pub inflight_upload_data_timeout: std::time::Duration, + pub inflight_download_data_timeout: std::time::Duration, + pub has_slow_read: bool, + pub read_buffer_size_mb: u32, + pub ldb_timeout: i64, + pub pprof: bool, + pub metrics_port: u16, + pub metrics_ip: String, + pub debug: bool, + pub debug_port: u16, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum NeedleMapKind { + InMemory, + LevelDb, + LevelDbMedium, + LevelDbLarge, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ReadMode { + Local, + Proxy, + Redirect, +} + +#[derive(Debug, Clone)] +pub enum MinFreeSpace { + Percent(f64), + Bytes(u64), +} + +/// Parse CLI arguments and resolve all defaults — mirroring Go's `runVolume()` + `startVolumeServer()`. +pub fn parse_cli() -> VolumeServerConfig { + let cli = Cli::parse(); + resolve_config(cli) +} + +/// Parse a duration string like "60s", "5m", "1h" into a std::time::Duration. +fn parse_duration(s: &str) -> std::time::Duration { + let s = s.trim(); + if s.is_empty() { + return std::time::Duration::from_secs(60); + } + if let Some(secs) = s.strip_suffix('s') { + if let Ok(v) = secs.parse::() { + return std::time::Duration::from_secs(v); + } + } + if let Some(mins) = s.strip_suffix('m') { + if let Ok(v) = mins.parse::() { + return std::time::Duration::from_secs(v * 60); + } + } + if let Some(hours) = s.strip_suffix('h') { + if let Ok(v) = hours.parse::() { + return std::time::Duration::from_secs(v * 3600); + } + } + // Fallback: try parsing as raw seconds + if let Ok(v) = s.parse::() { + return std::time::Duration::from_secs(v); + } + std::time::Duration::from_secs(60) +} + +/// Parse minFreeSpace / minFreeSpacePercent into MinFreeSpace values. +/// Mirrors Go's `util.MustParseMinFreeSpace()`. +fn parse_min_free_spaces(min_free_space: &str, min_free_space_percent: &str) -> Vec { + // If --minFreeSpace is provided, use it (takes precedence). + let source = if !min_free_space.is_empty() { + min_free_space + } else { + min_free_space_percent + }; + + source + .split(',') + .map(|s| { + let s = s.trim(); + // Try parsing as a percentage (value <= 100) + if let Ok(v) = s.parse::() { + if v <= 100.0 { + return MinFreeSpace::Percent(v); + } + // Treat as bytes if > 100 + return MinFreeSpace::Bytes(v as u64); + } + // Try parsing human-readable bytes: e.g. "10GiB", "500MiB", "1TiB" + let s_upper = s.to_uppercase(); + if let Some(rest) = s_upper.strip_suffix("TIB") { + if let Ok(v) = rest.trim().parse::() { + return MinFreeSpace::Bytes((v * 1024.0 * 1024.0 * 1024.0 * 1024.0) as u64); + } + } + if let Some(rest) = s_upper.strip_suffix("GIB") { + if let Ok(v) = rest.trim().parse::() { + return MinFreeSpace::Bytes((v * 1024.0 * 1024.0 * 1024.0) as u64); + } + } + if let Some(rest) = s_upper.strip_suffix("MIB") { + if let Ok(v) = rest.trim().parse::() { + return MinFreeSpace::Bytes((v * 1024.0 * 1024.0) as u64); + } + } + if let Some(rest) = s_upper.strip_suffix("KIB") { + if let Ok(v) = rest.trim().parse::() { + return MinFreeSpace::Bytes((v * 1024.0) as u64); + } + } + if let Some(rest) = s_upper.strip_suffix("TB") { + if let Ok(v) = rest.trim().parse::() { + return MinFreeSpace::Bytes((v * 1_000_000_000_000.0) as u64); + } + } + if let Some(rest) = s_upper.strip_suffix("GB") { + if let Ok(v) = rest.trim().parse::() { + return MinFreeSpace::Bytes((v * 1_000_000_000.0) as u64); + } + } + if let Some(rest) = s_upper.strip_suffix("MB") { + if let Ok(v) = rest.trim().parse::() { + return MinFreeSpace::Bytes((v * 1_000_000.0) as u64); + } + } + // Default: 1% + MinFreeSpace::Percent(1.0) + }) + .collect() +} + +/// Parse comma-separated tag groups like "fast:ssd,archive" into per-folder tag vectors. +/// Mirrors Go's `parseVolumeTags()`. +fn parse_volume_tags(tags_arg: &str, folder_count: usize) -> Vec> { + if folder_count == 0 { + return vec![]; + } + let tags_arg = tags_arg.trim(); + let tag_entries: Vec<&str> = if tags_arg.is_empty() { + vec![] + } else { + tags_arg.split(',').collect() + }; + + let mut folder_tags: Vec> = vec![vec![]; folder_count]; + + if tag_entries.len() == 1 && !tag_entries[0].is_empty() { + // Single entry: replicate to all folders + let normalized: Vec = tag_entries[0] + .split(':') + .map(|t| t.trim().to_lowercase()) + .filter(|t| !t.is_empty()) + .collect(); + for tags in folder_tags.iter_mut() { + *tags = normalized.clone(); + } + } else { + for (i, tags) in folder_tags.iter_mut().enumerate() { + if i < tag_entries.len() { + *tags = tag_entries[i] + .split(':') + .map(|t| t.trim().to_lowercase()) + .filter(|t| !t.is_empty()) + .collect(); + } + } + } + + folder_tags +} + +fn resolve_config(cli: Cli) -> VolumeServerConfig { + // Backward compatibility: --mserver overrides --master + let master_string = if !cli.mserver.is_empty() { + &cli.mserver + } else { + &cli.master + }; + let masters: Vec = master_string + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + + // Parse folders + let folders: Vec = cli + .dir + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + let folder_count = folders.len(); + + // Parse max volume counts + let mut folder_max_limits: Vec = cli + .max + .split(',') + .map(|s| { + s.trim() + .parse::() + .unwrap_or_else(|_| panic!("The max specified in --max is not a valid number: {}", s)) + }) + .collect(); + // Replicate single value to all folders + if folder_max_limits.len() == 1 && folder_count > 1 { + let v = folder_max_limits[0]; + folder_max_limits.resize(folder_count, v); + } + if folders.len() != folder_max_limits.len() { + panic!( + "{} directories by --dir, but only {} max is set by --max", + folders.len(), + folder_max_limits.len() + ); + } + + // Parse min free spaces + let mut min_free_spaces = parse_min_free_spaces(&cli.min_free_space, &cli.min_free_space_percent); + if min_free_spaces.len() == 1 && folder_count > 1 { + let v = min_free_spaces[0].clone(); + min_free_spaces.resize(folder_count, v); + } + if folders.len() != min_free_spaces.len() { + panic!( + "{} directories by --dir, but only {} minFreeSpace values", + folders.len(), + min_free_spaces.len() + ); + } + + // Parse disk types + let mut disk_types: Vec = cli + .disk + .split(',') + .map(|s| s.trim().to_string()) + .collect(); + if disk_types.len() == 1 && folder_count > 1 { + let v = disk_types[0].clone(); + disk_types.resize(folder_count, v); + } + if folders.len() != disk_types.len() { + panic!( + "{} directories by --dir, but only {} disk types by --disk", + folders.len(), + disk_types.len() + ); + } + + // Parse tags + let folder_tags = parse_volume_tags(&cli.tags, folder_count); + + // Resolve IP + let ip = if cli.ip.is_empty() { + detect_host_address() + } else { + cli.ip + }; + + // Resolve bind IP + let bind_ip = if cli.bind_ip.is_empty() { + ip.clone() + } else { + cli.bind_ip + }; + + // Resolve public port + let public_port = if cli.port_public == 0 { + cli.port + } else { + cli.port_public + }; + + // Resolve gRPC port + let grpc_port = if cli.port_grpc == 0 { + 10000 + cli.port + } else { + cli.port_grpc + }; + + // Resolve public URL + let public_url = if cli.public_url.is_empty() { + format!("{}:{}", ip, public_port) + } else { + cli.public_url + }; + + // Resolve volume server ID + let id = if cli.id.is_empty() { + format!("{}:{}", ip, cli.port) + } else { + cli.id + }; + + // Resolve metrics IP + let metrics_ip = if !cli.metrics_ip.is_empty() { + cli.metrics_ip + } else if !bind_ip.is_empty() { + bind_ip.clone() + } else { + ip.clone() + }; + + // Parse index type + let index_type = match cli.index.as_str() { + "memory" => NeedleMapKind::InMemory, + "leveldb" => NeedleMapKind::LevelDb, + "leveldbMedium" => NeedleMapKind::LevelDbMedium, + "leveldbLarge" => NeedleMapKind::LevelDbLarge, + other => panic!("Unknown index type: {}. Use memory|leveldb|leveldbMedium|leveldbLarge", other), + }; + + // Parse read mode + let read_mode = match cli.read_mode.as_str() { + "local" => ReadMode::Local, + "proxy" => ReadMode::Proxy, + "redirect" => ReadMode::Redirect, + other => panic!("Unknown readMode: {}. Use local|proxy|redirect", other), + }; + + // Parse whitelist + let white_list: Vec = cli + .white_list + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + + // Parse durations + let inflight_upload_data_timeout = parse_duration(&cli.inflight_upload_data_timeout); + let inflight_download_data_timeout = parse_duration(&cli.inflight_download_data_timeout); + + VolumeServerConfig { + port: cli.port, + grpc_port, + public_port, + ip, + bind_ip, + public_url, + id, + masters, + pre_stop_seconds: cli.pre_stop_seconds, + idle_timeout: cli.idle_timeout, + data_center: cli.data_center, + rack: cli.rack, + index_type, + disk_type: cli.disk, + folders, + folder_max_limits, + folder_tags, + min_free_spaces, + disk_types, + idx_folder: cli.dir_idx, + white_list, + fix_jpg_orientation: cli.fix_jpg_orientation, + read_mode, + compaction_byte_per_second: cli.compaction_mb_per_second as i64 * 1024 * 1024, + maintenance_byte_per_second: cli.maintenance_mb_per_second as i64 * 1024 * 1024, + file_size_limit_bytes: cli.file_size_limit_mb as i64 * 1024 * 1024, + concurrent_upload_limit: cli.concurrent_upload_limit_mb as i64 * 1024 * 1024, + concurrent_download_limit: cli.concurrent_download_limit_mb as i64 * 1024 * 1024, + inflight_upload_data_timeout, + inflight_download_data_timeout, + has_slow_read: cli.has_slow_read, + read_buffer_size_mb: cli.read_buffer_size_mb, + ldb_timeout: cli.ldb_timeout, + pprof: cli.pprof, + metrics_port: cli.metrics_port, + metrics_ip, + debug: cli.debug, + debug_port: cli.debug_port, + } +} + +/// Detect the host's IP address. +/// Mirrors Go's `util.DetectedHostAddress()`. +fn detect_host_address() -> String { + // Connect to a remote address to determine the local outbound IP + if let Ok(socket) = UdpSocket::bind("0.0.0.0:0") { + if socket.connect("8.8.8.8:80").is_ok() { + if let Ok(addr) = socket.local_addr() { + return addr.ip().to_string(); + } + } + } + "localhost".to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_duration() { + assert_eq!(parse_duration("60s"), std::time::Duration::from_secs(60)); + assert_eq!(parse_duration("5m"), std::time::Duration::from_secs(300)); + assert_eq!(parse_duration("1h"), std::time::Duration::from_secs(3600)); + assert_eq!(parse_duration("30"), std::time::Duration::from_secs(30)); + assert_eq!(parse_duration(""), std::time::Duration::from_secs(60)); + } + + #[test] + fn test_parse_min_free_spaces_percent() { + let result = parse_min_free_spaces("", "1"); + assert_eq!(result.len(), 1); + match &result[0] { + MinFreeSpace::Percent(v) => assert!((v - 1.0).abs() < f64::EPSILON), + _ => panic!("Expected Percent"), + } + } + + #[test] + fn test_parse_min_free_spaces_bytes() { + let result = parse_min_free_spaces("10GiB", ""); + assert_eq!(result.len(), 1); + match &result[0] { + MinFreeSpace::Bytes(v) => assert_eq!(*v, 10 * 1024 * 1024 * 1024), + _ => panic!("Expected Bytes"), + } + } + + #[test] + fn test_parse_volume_tags_single() { + let tags = parse_volume_tags("fast:ssd", 3); + assert_eq!(tags.len(), 3); + assert_eq!(tags[0], vec!["fast", "ssd"]); + assert_eq!(tags[1], vec!["fast", "ssd"]); + assert_eq!(tags[2], vec!["fast", "ssd"]); + } + + #[test] + fn test_parse_volume_tags_multi() { + let tags = parse_volume_tags("fast:ssd,archive", 3); + assert_eq!(tags.len(), 3); + assert_eq!(tags[0], vec!["fast", "ssd"]); + assert_eq!(tags[1], vec!["archive"]); + assert_eq!(tags[2], Vec::::new()); + } + + #[test] + fn test_parse_volume_tags_empty() { + let tags = parse_volume_tags("", 2); + assert_eq!(tags.len(), 2); + assert_eq!(tags[0], Vec::::new()); + assert_eq!(tags[1], Vec::::new()); + } +} diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs new file mode 100644 index 000000000..c1af3dfc3 --- /dev/null +++ b/seaweed-volume/src/main.rs @@ -0,0 +1,7 @@ +mod config; + +fn main() { + let cli = config::parse_cli(); + println!("SeaweedFS Volume Server (Rust)"); + println!("Configuration: {:#?}", cli); +}