diff --git a/seaweed-volume/Cargo.lock b/seaweed-volume/Cargo.lock index df5fe6c84..9742a2e14 100644 --- a/seaweed-volume/Cargo.lock +++ b/seaweed-volume/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "addr2line" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b5d307320b3181d6d7954e663bd7c774a838b8220fe0593c86d9fb09f498b4b" +dependencies = [ + "gimli", +] + [[package]] name = "adler2" version = "2.0.1" @@ -28,6 +37,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "aligned-vec" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc890384c8602f339876ded803c97ad529f3842aba97f6392b3dba0dd171769b" +dependencies = [ + "equator", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -670,6 +688,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "backtrace" +version = "0.3.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb531853791a215d7c62a30daf0dde835f381ab5de4589cfe7c649d2cbe92bd6" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-link", +] + [[package]] name = "base16ct" version = "0.1.1" @@ -889,6 +922,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpp_demangle" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0667304c32ea56cb4cd6d2d7c0cfe9a2f8041229db8c033af7f8d69492429def" +dependencies = [ + "cfg-if", +] + [[package]] name = "cpufeatures" version = "0.2.17" @@ -1044,6 +1086,15 @@ dependencies = [ "parking_lot_core 0.9.12", ] +[[package]] +name = "debugid" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d" +dependencies = [ + "uuid", +] + [[package]] name = "der" version = "0.6.1" @@ -1209,6 +1260,26 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "equator" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4711b213838dfee0117e3be6ac926007d7f433d7bbe33595975d4190cb07e6fc" +dependencies = [ + "equator-macro", +] + +[[package]] +name = "equator-macro" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -1293,6 +1364,24 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "findshlibs" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40b9e59cd0f7e0806cca4be089683ecb6434e602038df21fe6bf6711b2f07f64" +dependencies = [ + "cc", + "lazy_static", + "libc", + "winapi", +] + +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "fixedbitset" version = "0.5.7" @@ -1510,6 +1599,12 @@ dependencies = [ "weezl", ] +[[package]] +name = "gimli" +version = "0.32.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" + [[package]] name = "group" version = "0.12.1" @@ -2041,6 +2136,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -2114,7 +2218,7 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" dependencies = [ - "spin", + "spin 0.9.8", ] [[package]] @@ -2292,7 +2396,7 @@ dependencies = [ "httparse", "memchr", "mime", - "spin", + "spin 0.9.8", "version_check", ] @@ -2325,6 +2429,17 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nix" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" +dependencies = [ + "bitflags 1.3.2", + "cfg-if", + "libc", +] + [[package]] name = "ntapi" version = "0.4.3" @@ -2405,6 +2520,15 @@ dependencies = [ "libm", ] +[[package]] +name = "object" +version = "0.37.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -2575,13 +2699,23 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset 0.4.2", + "indexmap 2.13.0", +] + [[package]] name = "petgraph" version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" dependencies = [ - "fixedbitset", + "fixedbitset 0.5.7", "indexmap 2.13.0", ] @@ -2682,6 +2816,31 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +[[package]] +name = "pprof" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38a01da47675efa7673b032bf8efd8214f1917d89685e07e395ab125ea42b187" +dependencies = [ + "aligned-vec", + "backtrace", + "cfg-if", + "findshlibs", + "libc", + "log", + "nix", + "once_cell", + "prost 0.12.6", + "prost-build 0.12.6", + "prost-derive 0.12.6", + "sha2", + "smallvec", + "spin 0.10.0", + "symbolic-demangle", + "tempfile", + "thiserror 2.0.18", +] + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -2758,6 +2917,16 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes", + "prost-derive 0.12.6", +] + [[package]] name = "prost" version = "0.13.5" @@ -2765,7 +2934,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.13.5", +] + +[[package]] +name = "prost-build" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" +dependencies = [ + "bytes", + "heck", + "itertools 0.12.1", + "log", + "multimap", + "once_cell", + "petgraph 0.6.5", + "prettyplease", + "prost 0.12.6", + "prost-types 0.12.6", + "regex", + "syn", + "tempfile", ] [[package]] @@ -2775,19 +2965,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck", - "itertools", + "itertools 0.14.0", "log", "multimap", "once_cell", - "petgraph", + "petgraph 0.7.1", "prettyplease", - "prost", - "prost-types", + "prost 0.13.5", + "prost-types 0.13.5", "regex", "syn", "tempfile", ] +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools 0.12.1", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "prost-derive" version = "0.13.5" @@ -2795,19 +2998,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools", + "itertools 0.14.0", "proc-macro2", "quote", "syn", ] +[[package]] +name = "prost-types" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" +dependencies = [ + "prost 0.12.6", +] + [[package]] name = "prost-types" version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" dependencies = [ - "prost", + "prost 0.13.5", ] [[package]] @@ -3008,7 +3220,7 @@ dependencies = [ "lru 0.7.8", "parking_lot 0.11.2", "smallvec", - "spin", + "spin 0.9.8", ] [[package]] @@ -3149,6 +3361,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustc-demangle" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b50b8869d9fc858ce7266cce0194bd74df58b9d0e3f6df3a9fc8eb470d95c09d" + [[package]] name = "rustc-hash" version = "2.1.1" @@ -3354,9 +3572,10 @@ dependencies = [ "md-5", "memmap2", "parking_lot 0.12.5", + "pprof", "prometheus", - "prost", - "prost-types", + "prost 0.13.5", + "prost-types 0.13.5", "rand 0.8.5", "redb", "reed-solomon-erasure", @@ -3644,6 +3863,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spin" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.6.0" @@ -3682,6 +3910,29 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "symbolic-common" +version = "12.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "751a2823d606b5d0a7616499e4130a516ebd01a44f39811be2b9600936509c23" +dependencies = [ + "debugid", + "memmap2", + "stable_deref_trait", + "uuid", +] + +[[package]] +name = "symbolic-demangle" +version = "12.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79b237cfbe320601dd24b4ac817a5b68bb28f5508e33f08d42be0682cadc8ac9" +dependencies = [ + "cpp_demangle", + "rustc-demangle", + "symbolic-common", +] + [[package]] name = "syn" version = "2.0.117" @@ -4009,7 +4260,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", + "prost 0.13.5", "rustls-pemfile", "socket2 0.5.10", "tokio", @@ -4029,8 +4280,8 @@ checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" dependencies = [ "prettyplease", "proc-macro2", - "prost-build", - "prost-types", + "prost-build 0.13.5", + "prost-types 0.13.5", "quote", "syn", ] diff --git a/seaweed-volume/Cargo.toml b/seaweed-volume/Cargo.toml index e255286df..46fdc86ee 100644 --- a/seaweed-volume/Cargo.toml +++ b/seaweed-volume/Cargo.toml @@ -50,6 +50,7 @@ reed-solomon-erasure = "6" # Logging tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +pprof = { version = "0.15", features = ["prost-codec"] } # Config toml = "0.8" diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index 17a6d875f..da5d4bc85 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -6,8 +6,11 @@ use seaweed_volume::config::{self, VolumeServerConfig}; use seaweed_volume::metrics; use seaweed_volume::pb::volume_server_pb::volume_server_server::VolumeServerServer; use seaweed_volume::security::{Guard, SigningKey}; +use seaweed_volume::server::debug::build_debug_router; use seaweed_volume::server::grpc_server::VolumeGrpcService; -use seaweed_volume::server::volume_server::VolumeServerState; +use seaweed_volume::server::volume_server::{ + build_metrics_router, RuntimeMetricsConfig, VolumeServerState, +}; use seaweed_volume::server::write_queue::WriteQueue; use seaweed_volume::storage::store::Store; use seaweed_volume::storage::types::DiskType; @@ -136,6 +139,10 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box Result<(), Box Result<(), Box 0 { + let metrics_router = build_metrics_router(); + let metrics_addr = format!("{}:{}", config.metrics_ip, config.metrics_port); + info!("Metrics server listening on {}", metrics_addr); + let listener = tokio::net::TcpListener::bind(&metrics_addr) + .await + .unwrap_or_else(|e| panic!("Failed to bind metrics HTTP to {}: {}", metrics_addr, e)); + let mut shutdown_rx = shutdown_tx.subscribe(); + Some(tokio::spawn(async move { + if let Err(e) = axum::serve(listener, metrics_router) + .with_graceful_shutdown(async move { + let _ = shutdown_rx.recv().await; + }) + .await + { + error!("Metrics HTTP server error: {}", e); + } + })) + } else { + None + }; + + let debug_handle = if config.debug { + let debug_addr = format!("0.0.0.0:{}", config.debug_port); + info!("Debug pprof server listening on {}", debug_addr); + let listener = tokio::net::TcpListener::bind(&debug_addr) + .await + .unwrap_or_else(|e| panic!("Failed to bind debug HTTP to {}: {}", debug_addr, e)); + let debug_router = build_debug_router(); + let mut shutdown_rx = shutdown_tx.subscribe(); + Some(tokio::spawn(async move { + if let Err(e) = axum::serve(listener, debug_router) + .with_graceful_shutdown(async move { + let _ = shutdown_rx.recv().await; + }) + .await + { + error!("Debug HTTP server error: {}", e); + } + })) + } else { + None + }; + + let metrics_push_handle = { + let push_state = state.clone(); + let push_instance = format!("{}:{}", config.ip, config.port); + let push_shutdown = shutdown_tx.subscribe(); + Some(tokio::spawn(async move { + run_metrics_push_loop(push_state, push_instance, push_shutdown).await; + })) + }; + // Wait for all servers let _ = http_handle.await; let _ = grpc_handle.await; if let Some(h) = public_handle { let _ = h.await; } + if let Some(h) = metrics_handle { + let _ = h.await; + } + if let Some(h) = debug_handle { + let _ = h.await; + } if let Some(h) = heartbeat_handle { let _ = h.await; } + if let Some(h) = metrics_push_handle { + let _ = h.await; + } info!("Volume server stopped."); Ok(()) } +async fn run_metrics_push_loop( + state: Arc, + instance: String, + mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, +) { + loop { + let push_cfg = { state.metrics_runtime.read().unwrap().push_gateway.clone() }; + + if push_cfg.address.is_empty() || push_cfg.interval_seconds == 0 { + tokio::select! { + _ = state.metrics_notify.notified() => continue, + _ = shutdown_rx.recv() => return, + } + } + + if let Err(e) = metrics::push_metrics_once( + &state.http_client, + &push_cfg.address, + "volumeServer", + &instance, + ) + .await + { + info!("could not push metrics to {}: {}", push_cfg.address, e); + } + + let interval = std::time::Duration::from_secs(push_cfg.interval_seconds.max(1) as u64); + tokio::select! { + _ = tokio::time::sleep(interval) => {} + _ = state.metrics_notify.notified() => {} + _ = shutdown_rx.recv() => return, + } + } +} + /// Serve an axum Router over TLS using tokio-rustls. /// Accepts TCP connections, performs TLS handshake, then serves HTTP over the encrypted stream. async fn serve_https( diff --git a/seaweed-volume/src/metrics.rs b/seaweed-volume/src/metrics.rs index 711a3e76c..41444c418 100644 --- a/seaweed-volume/src/metrics.rs +++ b/seaweed-volume/src/metrics.rs @@ -6,6 +6,13 @@ use prometheus::{ self, Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder, }; +use std::sync::Once; + +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct PushGatewayConfig { + pub address: String, + pub interval_seconds: u32, +} lazy_static::lazy_static! { pub static ref REGISTRY: Registry = Registry::new(); @@ -59,33 +66,37 @@ lazy_static::lazy_static! { ).expect("metric can be created"); } +static REGISTER_METRICS: Once = Once::new(); + /// Register all metrics with the custom registry. /// Call this once at startup. pub fn register_metrics() { - REGISTRY - .register(Box::new(REQUEST_COUNTER.clone())) - .expect("REQUEST_COUNTER registered"); - REGISTRY - .register(Box::new(REQUEST_DURATION.clone())) - .expect("REQUEST_DURATION registered"); - REGISTRY - .register(Box::new(VOLUMES_TOTAL.clone())) - .expect("VOLUMES_TOTAL registered"); - REGISTRY - .register(Box::new(MAX_VOLUMES.clone())) - .expect("MAX_VOLUMES registered"); - REGISTRY - .register(Box::new(DISK_SIZE_BYTES.clone())) - .expect("DISK_SIZE_BYTES registered"); - REGISTRY - .register(Box::new(DISK_FREE_BYTES.clone())) - .expect("DISK_FREE_BYTES registered"); - REGISTRY - .register(Box::new(INFLIGHT_REQUESTS.clone())) - .expect("INFLIGHT_REQUESTS registered"); - REGISTRY - .register(Box::new(VOLUME_FILE_COUNT.clone())) - .expect("VOLUME_FILE_COUNT registered"); + REGISTER_METRICS.call_once(|| { + REGISTRY + .register(Box::new(REQUEST_COUNTER.clone())) + .expect("REQUEST_COUNTER registered"); + REGISTRY + .register(Box::new(REQUEST_DURATION.clone())) + .expect("REQUEST_DURATION registered"); + REGISTRY + .register(Box::new(VOLUMES_TOTAL.clone())) + .expect("VOLUMES_TOTAL registered"); + REGISTRY + .register(Box::new(MAX_VOLUMES.clone())) + .expect("MAX_VOLUMES registered"); + REGISTRY + .register(Box::new(DISK_SIZE_BYTES.clone())) + .expect("DISK_SIZE_BYTES registered"); + REGISTRY + .register(Box::new(DISK_FREE_BYTES.clone())) + .expect("DISK_FREE_BYTES registered"); + REGISTRY + .register(Box::new(INFLIGHT_REQUESTS.clone())) + .expect("INFLIGHT_REQUESTS registered"); + REGISTRY + .register(Box::new(VOLUME_FILE_COUNT.clone())) + .expect("VOLUME_FILE_COUNT registered"); + }); } /// Gather all metrics and encode them in Prometheus text exposition format. @@ -99,9 +110,49 @@ pub fn gather_metrics() -> String { String::from_utf8(buffer).expect("metrics are valid UTF-8") } +pub fn build_pushgateway_url(address: &str, job: &str, instance: &str) -> String { + let base = if address.starts_with("http://") || address.starts_with("https://") { + address.to_string() + } else { + format!("http://{}", address) + }; + let base = base.trim_end_matches('/'); + format!("{}/metrics/job/{}/instance/{}", base, job, instance) +} + +pub async fn push_metrics_once( + client: &reqwest::Client, + address: &str, + job: &str, + instance: &str, +) -> Result<(), String> { + let url = build_pushgateway_url(address, job, instance); + let response = client + .put(&url) + .header( + reqwest::header::CONTENT_TYPE, + "text/plain; version=0.0.4; charset=utf-8", + ) + .body(gather_metrics()) + .send() + .await + .map_err(|e| format!("push metrics request failed: {}", e))?; + + if response.status().is_success() { + Ok(()) + } else { + Err(format!( + "push metrics failed with status {}", + response.status() + )) + } +} + #[cfg(test)] mod tests { use super::*; + use axum::{routing::put, Router}; + use std::sync::{Arc, Mutex}; #[test] fn test_gather_metrics_returns_text() { @@ -110,4 +161,56 @@ mod tests { let output = gather_metrics(); assert!(output.contains("volume_server_request_counter")); } + + #[test] + fn test_build_pushgateway_url() { + assert_eq!( + build_pushgateway_url("localhost:9091", "volumeServer", "test-instance"), + "http://localhost:9091/metrics/job/volumeServer/instance/test-instance" + ); + assert_eq!( + build_pushgateway_url("https://push.example", "volumeServer", "node-a"), + "https://push.example/metrics/job/volumeServer/instance/node-a" + ); + } + + #[tokio::test] + async fn test_push_metrics_once() { + register_metrics(); + + let captured = Arc::new(Mutex::new(None::)); + let captured_clone = captured.clone(); + + let app = Router::new().route( + "/metrics/job/volumeServer/instance/test-instance", + put(move |body: String| { + let captured = captured_clone.clone(); + async move { + *captured.lock().unwrap() = Some(body); + "ok" + } + }), + ); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let server = tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + let client = reqwest::Client::new(); + push_metrics_once( + &client, + &format!("127.0.0.1:{}", addr.port()), + "volumeServer", + "test-instance", + ) + .await + .unwrap(); + + let body = captured.lock().unwrap().clone().unwrap(); + assert!(body.contains("volume_server_request_counter")); + + server.abort(); + } } diff --git a/seaweed-volume/src/server/debug.rs b/seaweed-volume/src/server/debug.rs new file mode 100644 index 000000000..dd1b69cf1 --- /dev/null +++ b/seaweed-volume/src/server/debug.rs @@ -0,0 +1,159 @@ +use axum::body::Body; +use axum::extract::Query; +use axum::http::{header, StatusCode}; +use axum::response::{IntoResponse, Response}; +use axum::routing::{any, get}; +use axum::Router; +use pprof::protos::Message; +use serde::Deserialize; + +#[derive(Deserialize, Default)] +struct ProfileQuery { + seconds: Option, +} + +pub fn build_debug_router() -> Router { + Router::new() + .route("/debug/pprof/", get(pprof_index_handler)) + .route("/debug/pprof/cmdline", get(pprof_cmdline_handler)) + .route("/debug/pprof/profile", get(pprof_profile_handler)) + .route("/debug/pprof/symbol", any(pprof_symbol_handler)) + .route("/debug/pprof/trace", get(pprof_trace_handler)) +} + +async fn pprof_index_handler() -> Response { + let body = concat!( + "/debug/pprof/", + "cmdline
", + "profile
", + "symbol
", + "trace
", + "", + ); + ( + StatusCode::OK, + [(header::CONTENT_TYPE, "text/html; charset=utf-8")], + body, + ) + .into_response() +} + +async fn pprof_cmdline_handler() -> Response { + let body = std::env::args().collect::>().join("\0"); + ( + StatusCode::OK, + [(header::CONTENT_TYPE, "text/plain; charset=utf-8")], + body, + ) + .into_response() +} + +async fn pprof_profile_handler(Query(query): Query) -> Response { + let seconds = query.seconds.unwrap_or(30).clamp(1, 300); + let guard = match pprof::ProfilerGuard::new(100) { + Ok(guard) => guard, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to start profiler: {}", e), + ) + .into_response(); + } + }; + + tokio::time::sleep(std::time::Duration::from_secs(seconds)).await; + + let report = match guard.report().build() { + Ok(report) => report, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to build profile report: {}", e), + ) + .into_response(); + } + }; + + let profile = match report.pprof() { + Ok(profile) => profile, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to encode profile: {}", e), + ) + .into_response(); + } + }; + + let mut bytes = Vec::new(); + if let Err(e) = profile.encode(&mut bytes) { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to serialize profile: {}", e), + ) + .into_response(); + } + + ( + StatusCode::OK, + [(header::CONTENT_TYPE, "application/octet-stream")], + bytes, + ) + .into_response() +} + +async fn pprof_symbol_handler() -> Response { + ( + StatusCode::OK, + [(header::CONTENT_TYPE, "text/plain; charset=utf-8")], + "num_symbols: 0\n", + ) + .into_response() +} + +async fn pprof_trace_handler(Query(query): Query) -> Response { + let seconds = query.seconds.unwrap_or(1).clamp(1, 30); + tokio::time::sleep(std::time::Duration::from_secs(seconds)).await; + Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/octet-stream") + .body(Body::from(Vec::::new())) + .unwrap() +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::http::Request; + use tower::ServiceExt; + + #[tokio::test] + async fn test_debug_index_route() { + let app = build_debug_router(); + let response = app + .oneshot( + Request::builder() + .uri("/debug/pprof/") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn test_debug_cmdline_route() { + let app = build_debug_router(); + let response = app + .oneshot( + Request::builder() + .uri("/debug/pprof/cmdline") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + } +} diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index e215ec8f5..f4c111736 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -136,6 +136,14 @@ async fn do_heartbeat( std::sync::atomic::Ordering::Relaxed, ); } + let metrics_changed = apply_metrics_push_settings( + state, + &hb_resp.metrics_address, + hb_resp.metrics_interval_seconds, + ); + if metrics_changed { + state.metrics_notify.notify_waiters(); + } if !hb_resp.leader.is_empty() { return Ok(Some(hb_resp.leader)); } @@ -196,6 +204,22 @@ async fn do_heartbeat( } } +fn apply_metrics_push_settings( + state: &VolumeServerState, + address: &str, + interval_seconds: u32, +) -> bool { + let mut runtime = state.metrics_runtime.write().unwrap(); + if runtime.push_gateway.address == address + && runtime.push_gateway.interval_seconds == interval_seconds + { + return false; + } + runtime.push_gateway.address = address.to_string(); + runtime.push_gateway.interval_seconds = interval_seconds; + true +} + /// Collect volume information into a Heartbeat message. fn collect_heartbeat( config: &HeartbeatConfig, @@ -315,8 +339,12 @@ fn collect_ec_heartbeat(state: &Arc) -> master_pb::Heartbeat mod tests { use super::*; use crate::config::MinFreeSpace; + use crate::config::ReadMode; + use crate::remote_storage::s3_tier::S3TierRegistry; + use crate::security::{Guard, SigningKey}; use crate::storage::needle_map::NeedleMapKind; use crate::storage::types::{DiskType, VolumeId}; + use std::sync::RwLock; fn test_config() -> HeartbeatConfig { HeartbeatConfig { @@ -331,6 +359,41 @@ mod tests { } } + fn test_state_with_store(store: Store) -> Arc { + Arc::new(VolumeServerState { + store: RwLock::new(store), + guard: Guard::new(&[], SigningKey(vec![]), 0, SigningKey(vec![]), 0), + is_stopping: RwLock::new(false), + maintenance: std::sync::atomic::AtomicBool::new(false), + state_version: std::sync::atomic::AtomicU32::new(0), + concurrent_upload_limit: 0, + concurrent_download_limit: 0, + inflight_upload_data_timeout: std::time::Duration::from_secs(60), + inflight_download_data_timeout: std::time::Duration::from_secs(60), + inflight_upload_bytes: std::sync::atomic::AtomicI64::new(0), + inflight_download_bytes: std::sync::atomic::AtomicI64::new(0), + upload_notify: tokio::sync::Notify::new(), + download_notify: tokio::sync::Notify::new(), + data_center: String::new(), + rack: String::new(), + file_size_limit_bytes: 0, + is_heartbeating: std::sync::atomic::AtomicBool::new(false), + has_master: true, + pre_stop_seconds: 0, + volume_state_notify: tokio::sync::Notify::new(), + write_queue: std::sync::OnceLock::new(), + s3_tier_registry: std::sync::RwLock::new(S3TierRegistry::new()), + read_mode: ReadMode::Local, + master_url: String::new(), + self_url: String::new(), + http_client: reqwest::Client::new(), + metrics_runtime: std::sync::RwLock::new(Default::default()), + metrics_notify: tokio::sync::Notify::new(), + has_slow_read: true, + read_buffer_size_bytes: 4 * 1024 * 1024, + }) + } + #[test] fn test_build_heartbeat_includes_store_identity_and_disk_metadata() { let temp_dir = tempfile::tempdir().unwrap(); @@ -392,4 +455,19 @@ mod tests { assert!(heartbeat.volumes.is_empty()); assert!(heartbeat.has_no_volumes); } + + #[test] + fn test_apply_metrics_push_settings_updates_runtime_state() { + let store = Store::new(NeedleMapKind::InMemory); + let state = test_state_with_store(store); + + assert!(apply_metrics_push_settings(&state, "pushgateway:9091", 15,)); + { + let runtime = state.metrics_runtime.read().unwrap(); + assert_eq!(runtime.push_gateway.address, "pushgateway:9091"); + assert_eq!(runtime.push_gateway.interval_seconds, 15); + } + + assert!(!apply_metrics_push_settings(&state, "pushgateway:9091", 15,)); + } } diff --git a/seaweed-volume/src/server/mod.rs b/seaweed-volume/src/server/mod.rs index 85a02500e..6c15dd112 100644 --- a/seaweed-volume/src/server/mod.rs +++ b/seaweed-volume/src/server/mod.rs @@ -1,5 +1,6 @@ -pub mod volume_server; -pub mod handlers; +pub mod debug; pub mod grpc_server; +pub mod handlers; pub mod heartbeat; +pub mod volume_server; pub mod write_queue; diff --git a/seaweed-volume/src/server/volume_server.rs b/seaweed-volume/src/server/volume_server.rs index e8eed2f52..5fb136d35 100644 --- a/seaweed-volume/src/server/volume_server.rs +++ b/seaweed-volume/src/server/volume_server.rs @@ -9,16 +9,16 @@ //! //! Matches Go's server/volume_server.go. -use std::sync::{Arc, RwLock}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering}; +use std::sync::{Arc, RwLock}; use axum::{ - Router, - routing::{get, any}, - middleware::{self, Next}, extract::{Request, State}, + http::{HeaderValue, Method, StatusCode}, + middleware::{self, Next}, response::{IntoResponse, Response}, - http::{StatusCode, HeaderValue, Method}, + routing::{any, get}, + Router, }; use crate::config::ReadMode; @@ -28,6 +28,11 @@ use crate::storage::store::Store; use super::handlers; use super::write_queue::WriteQueue; +#[derive(Clone, Debug, Default)] +pub struct RuntimeMetricsConfig { + pub push_gateway: crate::metrics::PushGatewayConfig, +} + /// Shared state for the volume server. pub struct VolumeServerState { pub store: RwLock, @@ -74,6 +79,12 @@ pub struct VolumeServerState { pub self_url: String, /// HTTP client for proxy requests and master lookups. pub http_client: reqwest::Client, + /// Metrics push settings learned from master heartbeat responses. + pub metrics_runtime: std::sync::RwLock, + pub metrics_notify: tokio::sync::Notify, + /// Read tuning flags for large-file streaming. + pub has_slow_read: bool, + pub read_buffer_size_bytes: usize, } impl VolumeServerState { @@ -86,6 +97,10 @@ impl VolumeServerState { } } +pub fn build_metrics_router() -> Router { + Router::new().route("/metrics", get(handlers::metrics_handler)) +} + /// Middleware: set Server header, echo x-amz-request-id, set CORS if Origin present. async fn common_headers_middleware(request: Request, next: Next) -> Response { let origin = request.headers().get("origin").cloned(); @@ -94,10 +109,7 @@ async fn common_headers_middleware(request: Request, next: Next) -> Response { let mut response = next.run(request).await; let headers = response.headers_mut(); - headers.insert( - "Server", - HeaderValue::from_static("SeaweedFS Volume 0.1.0"), - ); + headers.insert("Server", HeaderValue::from_static("SeaweedFS Volume 0.1.0")); if let Some(rid) = request_id { headers.insert("x-amz-request-id", rid); @@ -110,7 +122,10 @@ async fn common_headers_middleware(request: Request, next: Next) -> Response { if origin.is_some() { headers.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*")); - headers.insert("Access-Control-Allow-Credentials", HeaderValue::from_static("true")); + headers.insert( + "Access-Control-Allow-Credentials", + HeaderValue::from_static("true"), + ); } response @@ -119,28 +134,30 @@ async fn common_headers_middleware(request: Request, next: Next) -> Response { /// Admin store handler — dispatches based on HTTP method. /// Matches Go's privateStoreHandler: GET/HEAD → read, POST/PUT → write, /// DELETE → delete, OPTIONS → CORS headers, anything else → 400. -async fn admin_store_handler( - state: State>, - request: Request, -) -> Response { +async fn admin_store_handler(state: State>, request: Request) -> Response { match request.method().clone() { - Method::GET | Method::HEAD => handlers::get_or_head_handler_from_request(state, request).await, + Method::GET | Method::HEAD => { + handlers::get_or_head_handler_from_request(state, request).await + } Method::POST | Method::PUT => handlers::post_handler(state, request).await, Method::DELETE => handlers::delete_handler(state, request).await, Method::OPTIONS => admin_options_response(), - _ => (StatusCode::BAD_REQUEST, format!("{{\"error\":\"unsupported method {}\"}}", request.method())).into_response(), + _ => ( + StatusCode::BAD_REQUEST, + format!("{{\"error\":\"unsupported method {}\"}}", request.method()), + ) + .into_response(), } } /// Public store handler — dispatches based on HTTP method. /// Matches Go's publicReadOnlyHandler: GET/HEAD → read, OPTIONS → CORS, /// anything else → 200 (passthrough no-op). -async fn public_store_handler( - state: State>, - request: Request, -) -> Response { +async fn public_store_handler(state: State>, request: Request) -> Response { match request.method().clone() { - Method::GET | Method::HEAD => handlers::get_or_head_handler_from_request(state, request).await, + Method::GET | Method::HEAD => { + handlers::get_or_head_handler_from_request(state, request).await + } Method::OPTIONS => public_options_response(), _ => StatusCode::OK.into_response(), } @@ -181,20 +198,31 @@ pub fn build_admin_router(state: Arc) -> Router { Router::new() .route("/status", get(handlers::status_handler)) .route("/healthz", get(handlers::healthz_handler)) - .route("/metrics", get(handlers::metrics_handler)) .route("/stats/counter", get(handlers::stats_counter_handler)) .route("/stats/memory", get(handlers::stats_memory_handler)) .route("/stats/disk", get(handlers::stats_disk_handler)) .route("/favicon.ico", get(handlers::favicon_handler)) - .route("/seaweedfsstatic/*path", get(handlers::static_asset_handler)) + .route( + "/seaweedfsstatic/*path", + get(handlers::static_asset_handler), + ) .route("/ui/index.html", get(handlers::ui_handler)) - .route("/", any(|_state: State>, request: Request| async move { - match request.method().clone() { - Method::OPTIONS => admin_options_response(), - Method::GET => StatusCode::OK.into_response(), - _ => (StatusCode::BAD_REQUEST, format!("{{\"error\":\"unsupported method {}\"}}", request.method())).into_response(), - } - })) + .route( + "/", + any( + |_state: State>, request: Request| async move { + match request.method().clone() { + Method::OPTIONS => admin_options_response(), + Method::GET => StatusCode::OK.into_response(), + _ => ( + StatusCode::BAD_REQUEST, + format!("{{\"error\":\"unsupported method {}\"}}", request.method()), + ) + .into_response(), + } + }, + ), + ) .route("/:path", any(admin_store_handler)) .route("/:vid/:fid", any(admin_store_handler)) .route("/:vid/:fid/:filename", any(admin_store_handler)) @@ -207,14 +235,22 @@ pub fn build_public_router(state: Arc) -> Router { Router::new() .route("/healthz", get(handlers::healthz_handler)) .route("/favicon.ico", get(handlers::favicon_handler)) - .route("/seaweedfsstatic/*path", get(handlers::static_asset_handler)) - .route("/", any(|_state: State>, request: Request| async move { - match request.method().clone() { - Method::OPTIONS => public_options_response(), - Method::GET => StatusCode::OK.into_response(), - _ => StatusCode::OK.into_response(), - } - })) + .route( + "/seaweedfsstatic/*path", + get(handlers::static_asset_handler), + ) + .route( + "/", + any( + |_state: State>, request: Request| async move { + match request.method().clone() { + Method::OPTIONS => public_options_response(), + Method::GET => StatusCode::OK.into_response(), + _ => StatusCode::OK.into_response(), + } + }, + ), + ) .route("/:path", any(public_store_handler)) .route("/:vid/:fid", any(public_store_handler)) .route("/:vid/:fid/:filename", any(public_store_handler)) diff --git a/seaweed-volume/src/server/write_queue.rs b/seaweed-volume/src/server/write_queue.rs index 6fd67eb07..b42c5921f 100644 --- a/seaweed-volume/src/server/write_queue.rs +++ b/seaweed-volume/src/server/write_queue.rs @@ -42,10 +42,7 @@ impl WriteQueue { /// The worker holds a reference to `state` for accessing the store. pub fn new(state: Arc, capacity: usize) -> Self { let (tx, rx) = mpsc::channel(capacity); - let worker = WriteQueueWorker { - rx, - state, - }; + let worker = WriteQueueWorker { rx, state }; tokio::spawn(worker.run()); WriteQueue { tx } } @@ -53,11 +50,7 @@ impl WriteQueue { /// Submit a write request and wait for the result. /// /// Returns `Err` if the worker has shut down or the response channel was dropped. - pub async fn submit( - &self, - volume_id: VolumeId, - needle: Needle, - ) -> WriteResult { + pub async fn submit(&self, volume_id: VolumeId, needle: Needle) -> WriteResult { let (response_tx, response_rx) = oneshot::channel(); let request = WriteRequest { volume_id, @@ -166,20 +159,15 @@ mod tests { /// Helper to create a minimal VolumeServerState for testing. fn make_test_state() -> Arc { - use std::sync::RwLock; - use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32}; use crate::security::{Guard, SigningKey}; - use crate::storage::store::Store; + use crate::server::volume_server::RuntimeMetricsConfig; use crate::storage::needle_map::NeedleMapKind; + use crate::storage::store::Store; + use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32}; + use std::sync::RwLock; let store = Store::new(NeedleMapKind::InMemory); - let guard = Guard::new( - &[], - SigningKey(vec![]), - 0, - SigningKey(vec![]), - 0, - ); + let guard = Guard::new(&[], SigningKey(vec![]), 0, SigningKey(vec![]), 0); Arc::new(VolumeServerState { store: RwLock::new(store), @@ -203,11 +191,17 @@ mod tests { pre_stop_seconds: 0, volume_state_notify: tokio::sync::Notify::new(), write_queue: std::sync::OnceLock::new(), - s3_tier_registry: std::sync::RwLock::new(crate::remote_storage::s3_tier::S3TierRegistry::new()), + s3_tier_registry: std::sync::RwLock::new( + crate::remote_storage::s3_tier::S3TierRegistry::new(), + ), read_mode: crate::config::ReadMode::Local, master_url: String::new(), self_url: String::new(), http_client: reqwest::Client::new(), + metrics_runtime: std::sync::RwLock::new(RuntimeMetricsConfig::default()), + metrics_notify: tokio::sync::Notify::new(), + has_slow_read: true, + read_buffer_size_bytes: 4 * 1024 * 1024, }) } diff --git a/seaweed-volume/tests/http_integration.rs b/seaweed-volume/tests/http_integration.rs index 6130248ed..9b0bddbb5 100644 --- a/seaweed-volume/tests/http_integration.rs +++ b/seaweed-volume/tests/http_integration.rs @@ -10,7 +10,9 @@ use axum::http::{Request, StatusCode}; use tower::ServiceExt; // for `oneshot` use seaweed_volume::security::{Guard, SigningKey}; -use seaweed_volume::server::volume_server::{build_admin_router, VolumeServerState}; +use seaweed_volume::server::volume_server::{ + build_admin_router, build_metrics_router, VolumeServerState, +}; use seaweed_volume::storage::needle_map::NeedleMapKind; use seaweed_volume::storage::store::Store; use seaweed_volume::storage::types::{DiskType, VolumeId}; @@ -68,6 +70,12 @@ fn test_state() -> (Arc, TempDir) { master_url: String::new(), self_url: String::new(), http_client: reqwest::Client::new(), + metrics_runtime: std::sync::RwLock::new( + seaweed_volume::server::volume_server::RuntimeMetricsConfig::default(), + ), + metrics_notify: tokio::sync::Notify::new(), + has_slow_read: false, + read_buffer_size_bytes: 1024 * 1024, }); (state, tmp) } @@ -163,6 +171,41 @@ async fn status_returns_json_with_version_and_volumes() { assert_eq!(volumes[0]["Id"], 1); } +#[tokio::test] +async fn admin_router_does_not_expose_metrics() { + let (state, _tmp) = test_state(); + let app = build_admin_router(state); + + let response = app + .oneshot( + Request::builder() + .uri("/metrics") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); +} + +#[tokio::test] +async fn metrics_router_serves_metrics() { + let app = build_metrics_router(); + + let response = app + .oneshot( + Request::builder() + .uri("/metrics") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); +} + // ============================================================================ // 4. POST writes data, then GET reads it back // ============================================================================