Browse Source

fix: add runtime metrics and pprof parity

rust-volume-server
Chris Lu 2 days ago
parent
commit
fbe0e5829c
  1. 283
      seaweed-volume/Cargo.lock
  2. 1
      seaweed-volume/Cargo.toml
  3. 111
      seaweed-volume/src/main.rs
  4. 151
      seaweed-volume/src/metrics.rs
  5. 159
      seaweed-volume/src/server/debug.rs
  6. 78
      seaweed-volume/src/server/heartbeat.rs
  7. 5
      seaweed-volume/src/server/mod.rs
  8. 112
      seaweed-volume/src/server/volume_server.rs
  9. 34
      seaweed-volume/src/server/write_queue.rs
  10. 45
      seaweed-volume/tests/http_integration.rs

283
seaweed-volume/Cargo.lock

@ -2,6 +2,15 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "addr2line"
version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b5d307320b3181d6d7954e663bd7c774a838b8220fe0593c86d9fb09f498b4b"
dependencies = [
"gimli",
]
[[package]]
name = "adler2"
version = "2.0.1"
@ -28,6 +37,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "aligned-vec"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc890384c8602f339876ded803c97ad529f3842aba97f6392b3dba0dd171769b"
dependencies = [
"equator",
]
[[package]]
name = "allocator-api2"
version = "0.2.21"
@ -670,6 +688,21 @@ dependencies = [
"tracing",
]
[[package]]
name = "backtrace"
version = "0.3.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb531853791a215d7c62a30daf0dde835f381ab5de4589cfe7c649d2cbe92bd6"
dependencies = [
"addr2line",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
"windows-link",
]
[[package]]
name = "base16ct"
version = "0.1.1"
@ -889,6 +922,15 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "cpp_demangle"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0667304c32ea56cb4cd6d2d7c0cfe9a2f8041229db8c033af7f8d69492429def"
dependencies = [
"cfg-if",
]
[[package]]
name = "cpufeatures"
version = "0.2.17"
@ -1044,6 +1086,15 @@ dependencies = [
"parking_lot_core 0.9.12",
]
[[package]]
name = "debugid"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d"
dependencies = [
"uuid",
]
[[package]]
name = "der"
version = "0.6.1"
@ -1209,6 +1260,26 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "equator"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4711b213838dfee0117e3be6ac926007d7f433d7bbe33595975d4190cb07e6fc"
dependencies = [
"equator-macro",
]
[[package]]
name = "equator-macro"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "equivalent"
version = "1.0.2"
@ -1293,6 +1364,24 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582"
[[package]]
name = "findshlibs"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40b9e59cd0f7e0806cca4be089683ecb6434e602038df21fe6bf6711b2f07f64"
dependencies = [
"cc",
"lazy_static",
"libc",
"winapi",
]
[[package]]
name = "fixedbitset"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "fixedbitset"
version = "0.5.7"
@ -1510,6 +1599,12 @@ dependencies = [
"weezl",
]
[[package]]
name = "gimli"
version = "0.32.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7"
[[package]]
name = "group"
version = "0.12.1"
@ -2041,6 +2136,15 @@ version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695"
[[package]]
name = "itertools"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569"
dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.14.0"
@ -2114,7 +2218,7 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
dependencies = [
"spin",
"spin 0.9.8",
]
[[package]]
@ -2292,7 +2396,7 @@ dependencies = [
"httparse",
"memchr",
"mime",
"spin",
"spin 0.9.8",
"version_check",
]
@ -2325,6 +2429,17 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nix"
version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b"
dependencies = [
"bitflags 1.3.2",
"cfg-if",
"libc",
]
[[package]]
name = "ntapi"
version = "0.4.3"
@ -2405,6 +2520,15 @@ dependencies = [
"libm",
]
[[package]]
name = "object"
version = "0.37.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe"
dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.21.3"
@ -2575,13 +2699,23 @@ version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "petgraph"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db"
dependencies = [
"fixedbitset 0.4.2",
"indexmap 2.13.0",
]
[[package]]
name = "petgraph"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772"
dependencies = [
"fixedbitset",
"fixedbitset 0.5.7",
"indexmap 2.13.0",
]
@ -2682,6 +2816,31 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "pprof"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38a01da47675efa7673b032bf8efd8214f1917d89685e07e395ab125ea42b187"
dependencies = [
"aligned-vec",
"backtrace",
"cfg-if",
"findshlibs",
"libc",
"log",
"nix",
"once_cell",
"prost 0.12.6",
"prost-build 0.12.6",
"prost-derive 0.12.6",
"sha2",
"smallvec",
"spin 0.10.0",
"symbolic-demangle",
"tempfile",
"thiserror 2.0.18",
]
[[package]]
name = "ppv-lite86"
version = "0.2.21"
@ -2758,6 +2917,16 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "prost"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29"
dependencies = [
"bytes",
"prost-derive 0.12.6",
]
[[package]]
name = "prost"
version = "0.13.5"
@ -2765,7 +2934,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
dependencies = [
"bytes",
"prost-derive",
"prost-derive 0.13.5",
]
[[package]]
name = "prost-build"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
dependencies = [
"bytes",
"heck",
"itertools 0.12.1",
"log",
"multimap",
"once_cell",
"petgraph 0.6.5",
"prettyplease",
"prost 0.12.6",
"prost-types 0.12.6",
"regex",
"syn",
"tempfile",
]
[[package]]
@ -2775,19 +2965,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
dependencies = [
"heck",
"itertools",
"itertools 0.14.0",
"log",
"multimap",
"once_cell",
"petgraph",
"petgraph 0.7.1",
"prettyplease",
"prost",
"prost-types",
"prost 0.13.5",
"prost-types 0.13.5",
"regex",
"syn",
"tempfile",
]
[[package]]
name = "prost-derive"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1"
dependencies = [
"anyhow",
"itertools 0.12.1",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-derive"
version = "0.13.5"
@ -2795,19 +2998,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools",
"itertools 0.14.0",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0"
dependencies = [
"prost 0.12.6",
]
[[package]]
name = "prost-types"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16"
dependencies = [
"prost",
"prost 0.13.5",
]
[[package]]
@ -3008,7 +3220,7 @@ dependencies = [
"lru 0.7.8",
"parking_lot 0.11.2",
"smallvec",
"spin",
"spin 0.9.8",
]
[[package]]
@ -3149,6 +3361,12 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rustc-demangle"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b50b8869d9fc858ce7266cce0194bd74df58b9d0e3f6df3a9fc8eb470d95c09d"
[[package]]
name = "rustc-hash"
version = "2.1.1"
@ -3354,9 +3572,10 @@ dependencies = [
"md-5",
"memmap2",
"parking_lot 0.12.5",
"pprof",
"prometheus",
"prost",
"prost-types",
"prost 0.13.5",
"prost-types 0.13.5",
"rand 0.8.5",
"redb",
"reed-solomon-erasure",
@ -3644,6 +3863,15 @@ version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "spin"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"
version = "0.6.0"
@ -3682,6 +3910,29 @@ version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "symbolic-common"
version = "12.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "751a2823d606b5d0a7616499e4130a516ebd01a44f39811be2b9600936509c23"
dependencies = [
"debugid",
"memmap2",
"stable_deref_trait",
"uuid",
]
[[package]]
name = "symbolic-demangle"
version = "12.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79b237cfbe320601dd24b4ac817a5b68bb28f5508e33f08d42be0682cadc8ac9"
dependencies = [
"cpp_demangle",
"rustc-demangle",
"symbolic-common",
]
[[package]]
name = "syn"
version = "2.0.117"
@ -4009,7 +4260,7 @@ dependencies = [
"hyper-util",
"percent-encoding",
"pin-project",
"prost",
"prost 0.13.5",
"rustls-pemfile",
"socket2 0.5.10",
"tokio",
@ -4029,8 +4280,8 @@ checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"prost-types",
"prost-build 0.13.5",
"prost-types 0.13.5",
"quote",
"syn",
]

1
seaweed-volume/Cargo.toml

@ -50,6 +50,7 @@ reed-solomon-erasure = "6"
# Logging
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
pprof = { version = "0.15", features = ["prost-codec"] }
# Config
toml = "0.8"

111
seaweed-volume/src/main.rs

@ -6,8 +6,11 @@ use seaweed_volume::config::{self, VolumeServerConfig};
use seaweed_volume::metrics;
use seaweed_volume::pb::volume_server_pb::volume_server_server::VolumeServerServer;
use seaweed_volume::security::{Guard, SigningKey};
use seaweed_volume::server::debug::build_debug_router;
use seaweed_volume::server::grpc_server::VolumeGrpcService;
use seaweed_volume::server::volume_server::VolumeServerState;
use seaweed_volume::server::volume_server::{
build_metrics_router, RuntimeMetricsConfig, VolumeServerState,
};
use seaweed_volume::server::write_queue::WriteQueue;
use seaweed_volume::storage::store::Store;
use seaweed_volume::storage::types::DiskType;
@ -136,6 +139,10 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
master_url,
self_url,
http_client: reqwest::Client::new(),
metrics_runtime: std::sync::RwLock::new(RuntimeMetricsConfig::default()),
metrics_notify: tokio::sync::Notify::new(),
has_slow_read: config.has_slow_read,
read_buffer_size_bytes: (config.read_buffer_size_mb.max(1) as usize) * 1024 * 1024,
});
// Initialize the batched write queue if enabled
@ -170,7 +177,10 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
}
// Build HTTP routers
let admin_router = seaweed_volume::server::volume_server::build_admin_router(state.clone());
let mut admin_router = seaweed_volume::server::volume_server::build_admin_router(state.clone());
if config.pprof {
admin_router = admin_router.merge(build_debug_router());
}
let admin_addr = format!("{}:{}", config.bind_ip, config.port);
let public_port = config.public_port;
@ -382,20 +392,117 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
None
};
let metrics_handle = if config.metrics_port > 0 {
let metrics_router = build_metrics_router();
let metrics_addr = format!("{}:{}", config.metrics_ip, config.metrics_port);
info!("Metrics server listening on {}", metrics_addr);
let listener = tokio::net::TcpListener::bind(&metrics_addr)
.await
.unwrap_or_else(|e| panic!("Failed to bind metrics HTTP to {}: {}", metrics_addr, e));
let mut shutdown_rx = shutdown_tx.subscribe();
Some(tokio::spawn(async move {
if let Err(e) = axum::serve(listener, metrics_router)
.with_graceful_shutdown(async move {
let _ = shutdown_rx.recv().await;
})
.await
{
error!("Metrics HTTP server error: {}", e);
}
}))
} else {
None
};
let debug_handle = if config.debug {
let debug_addr = format!("0.0.0.0:{}", config.debug_port);
info!("Debug pprof server listening on {}", debug_addr);
let listener = tokio::net::TcpListener::bind(&debug_addr)
.await
.unwrap_or_else(|e| panic!("Failed to bind debug HTTP to {}: {}", debug_addr, e));
let debug_router = build_debug_router();
let mut shutdown_rx = shutdown_tx.subscribe();
Some(tokio::spawn(async move {
if let Err(e) = axum::serve(listener, debug_router)
.with_graceful_shutdown(async move {
let _ = shutdown_rx.recv().await;
})
.await
{
error!("Debug HTTP server error: {}", e);
}
}))
} else {
None
};
let metrics_push_handle = {
let push_state = state.clone();
let push_instance = format!("{}:{}", config.ip, config.port);
let push_shutdown = shutdown_tx.subscribe();
Some(tokio::spawn(async move {
run_metrics_push_loop(push_state, push_instance, push_shutdown).await;
}))
};
// Wait for all servers
let _ = http_handle.await;
let _ = grpc_handle.await;
if let Some(h) = public_handle {
let _ = h.await;
}
if let Some(h) = metrics_handle {
let _ = h.await;
}
if let Some(h) = debug_handle {
let _ = h.await;
}
if let Some(h) = heartbeat_handle {
let _ = h.await;
}
if let Some(h) = metrics_push_handle {
let _ = h.await;
}
info!("Volume server stopped.");
Ok(())
}
async fn run_metrics_push_loop(
state: Arc<VolumeServerState>,
instance: String,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) {
loop {
let push_cfg = { state.metrics_runtime.read().unwrap().push_gateway.clone() };
if push_cfg.address.is_empty() || push_cfg.interval_seconds == 0 {
tokio::select! {
_ = state.metrics_notify.notified() => continue,
_ = shutdown_rx.recv() => return,
}
}
if let Err(e) = metrics::push_metrics_once(
&state.http_client,
&push_cfg.address,
"volumeServer",
&instance,
)
.await
{
info!("could not push metrics to {}: {}", push_cfg.address, e);
}
let interval = std::time::Duration::from_secs(push_cfg.interval_seconds.max(1) as u64);
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = state.metrics_notify.notified() => {}
_ = shutdown_rx.recv() => return,
}
}
}
/// Serve an axum Router over TLS using tokio-rustls.
/// Accepts TCP connections, performs TLS handshake, then serves HTTP over the encrypted stream.
async fn serve_https<F>(

151
seaweed-volume/src/metrics.rs

@ -6,6 +6,13 @@ use prometheus::{
self, Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Opts,
Registry, TextEncoder,
};
use std::sync::Once;
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct PushGatewayConfig {
pub address: String,
pub interval_seconds: u32,
}
lazy_static::lazy_static! {
pub static ref REGISTRY: Registry = Registry::new();
@ -59,33 +66,37 @@ lazy_static::lazy_static! {
).expect("metric can be created");
}
static REGISTER_METRICS: Once = Once::new();
/// Register all metrics with the custom registry.
/// Call this once at startup.
pub fn register_metrics() {
REGISTRY
.register(Box::new(REQUEST_COUNTER.clone()))
.expect("REQUEST_COUNTER registered");
REGISTRY
.register(Box::new(REQUEST_DURATION.clone()))
.expect("REQUEST_DURATION registered");
REGISTRY
.register(Box::new(VOLUMES_TOTAL.clone()))
.expect("VOLUMES_TOTAL registered");
REGISTRY
.register(Box::new(MAX_VOLUMES.clone()))
.expect("MAX_VOLUMES registered");
REGISTRY
.register(Box::new(DISK_SIZE_BYTES.clone()))
.expect("DISK_SIZE_BYTES registered");
REGISTRY
.register(Box::new(DISK_FREE_BYTES.clone()))
.expect("DISK_FREE_BYTES registered");
REGISTRY
.register(Box::new(INFLIGHT_REQUESTS.clone()))
.expect("INFLIGHT_REQUESTS registered");
REGISTRY
.register(Box::new(VOLUME_FILE_COUNT.clone()))
.expect("VOLUME_FILE_COUNT registered");
REGISTER_METRICS.call_once(|| {
REGISTRY
.register(Box::new(REQUEST_COUNTER.clone()))
.expect("REQUEST_COUNTER registered");
REGISTRY
.register(Box::new(REQUEST_DURATION.clone()))
.expect("REQUEST_DURATION registered");
REGISTRY
.register(Box::new(VOLUMES_TOTAL.clone()))
.expect("VOLUMES_TOTAL registered");
REGISTRY
.register(Box::new(MAX_VOLUMES.clone()))
.expect("MAX_VOLUMES registered");
REGISTRY
.register(Box::new(DISK_SIZE_BYTES.clone()))
.expect("DISK_SIZE_BYTES registered");
REGISTRY
.register(Box::new(DISK_FREE_BYTES.clone()))
.expect("DISK_FREE_BYTES registered");
REGISTRY
.register(Box::new(INFLIGHT_REQUESTS.clone()))
.expect("INFLIGHT_REQUESTS registered");
REGISTRY
.register(Box::new(VOLUME_FILE_COUNT.clone()))
.expect("VOLUME_FILE_COUNT registered");
});
}
/// Gather all metrics and encode them in Prometheus text exposition format.
@ -99,9 +110,49 @@ pub fn gather_metrics() -> String {
String::from_utf8(buffer).expect("metrics are valid UTF-8")
}
pub fn build_pushgateway_url(address: &str, job: &str, instance: &str) -> String {
let base = if address.starts_with("http://") || address.starts_with("https://") {
address.to_string()
} else {
format!("http://{}", address)
};
let base = base.trim_end_matches('/');
format!("{}/metrics/job/{}/instance/{}", base, job, instance)
}
pub async fn push_metrics_once(
client: &reqwest::Client,
address: &str,
job: &str,
instance: &str,
) -> Result<(), String> {
let url = build_pushgateway_url(address, job, instance);
let response = client
.put(&url)
.header(
reqwest::header::CONTENT_TYPE,
"text/plain; version=0.0.4; charset=utf-8",
)
.body(gather_metrics())
.send()
.await
.map_err(|e| format!("push metrics request failed: {}", e))?;
if response.status().is_success() {
Ok(())
} else {
Err(format!(
"push metrics failed with status {}",
response.status()
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use axum::{routing::put, Router};
use std::sync::{Arc, Mutex};
#[test]
fn test_gather_metrics_returns_text() {
@ -110,4 +161,56 @@ mod tests {
let output = gather_metrics();
assert!(output.contains("volume_server_request_counter"));
}
#[test]
fn test_build_pushgateway_url() {
assert_eq!(
build_pushgateway_url("localhost:9091", "volumeServer", "test-instance"),
"http://localhost:9091/metrics/job/volumeServer/instance/test-instance"
);
assert_eq!(
build_pushgateway_url("https://push.example", "volumeServer", "node-a"),
"https://push.example/metrics/job/volumeServer/instance/node-a"
);
}
#[tokio::test]
async fn test_push_metrics_once() {
register_metrics();
let captured = Arc::new(Mutex::new(None::<String>));
let captured_clone = captured.clone();
let app = Router::new().route(
"/metrics/job/volumeServer/instance/test-instance",
put(move |body: String| {
let captured = captured_clone.clone();
async move {
*captured.lock().unwrap() = Some(body);
"ok"
}
}),
);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
let client = reqwest::Client::new();
push_metrics_once(
&client,
&format!("127.0.0.1:{}", addr.port()),
"volumeServer",
"test-instance",
)
.await
.unwrap();
let body = captured.lock().unwrap().clone().unwrap();
assert!(body.contains("volume_server_request_counter"));
server.abort();
}
}

159
seaweed-volume/src/server/debug.rs

@ -0,0 +1,159 @@
use axum::body::Body;
use axum::extract::Query;
use axum::http::{header, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::routing::{any, get};
use axum::Router;
use pprof::protos::Message;
use serde::Deserialize;
#[derive(Deserialize, Default)]
struct ProfileQuery {
seconds: Option<u64>,
}
pub fn build_debug_router() -> Router {
Router::new()
.route("/debug/pprof/", get(pprof_index_handler))
.route("/debug/pprof/cmdline", get(pprof_cmdline_handler))
.route("/debug/pprof/profile", get(pprof_profile_handler))
.route("/debug/pprof/symbol", any(pprof_symbol_handler))
.route("/debug/pprof/trace", get(pprof_trace_handler))
}
async fn pprof_index_handler() -> Response {
let body = concat!(
"<html><head><title>/debug/pprof/</title></head><body>",
"<a href=\"cmdline\">cmdline</a><br>",
"<a href=\"profile\">profile</a><br>",
"<a href=\"symbol\">symbol</a><br>",
"<a href=\"trace\">trace</a><br>",
"</body></html>",
);
(
StatusCode::OK,
[(header::CONTENT_TYPE, "text/html; charset=utf-8")],
body,
)
.into_response()
}
async fn pprof_cmdline_handler() -> Response {
let body = std::env::args().collect::<Vec<_>>().join("\0");
(
StatusCode::OK,
[(header::CONTENT_TYPE, "text/plain; charset=utf-8")],
body,
)
.into_response()
}
async fn pprof_profile_handler(Query(query): Query<ProfileQuery>) -> Response {
let seconds = query.seconds.unwrap_or(30).clamp(1, 300);
let guard = match pprof::ProfilerGuard::new(100) {
Ok(guard) => guard,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to start profiler: {}", e),
)
.into_response();
}
};
tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
let report = match guard.report().build() {
Ok(report) => report,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to build profile report: {}", e),
)
.into_response();
}
};
let profile = match report.pprof() {
Ok(profile) => profile,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to encode profile: {}", e),
)
.into_response();
}
};
let mut bytes = Vec::new();
if let Err(e) = profile.encode(&mut bytes) {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to serialize profile: {}", e),
)
.into_response();
}
(
StatusCode::OK,
[(header::CONTENT_TYPE, "application/octet-stream")],
bytes,
)
.into_response()
}
async fn pprof_symbol_handler() -> Response {
(
StatusCode::OK,
[(header::CONTENT_TYPE, "text/plain; charset=utf-8")],
"num_symbols: 0\n",
)
.into_response()
}
async fn pprof_trace_handler(Query(query): Query<ProfileQuery>) -> Response {
let seconds = query.seconds.unwrap_or(1).clamp(1, 30);
tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream")
.body(Body::from(Vec::<u8>::new()))
.unwrap()
}
#[cfg(test)]
mod tests {
use super::*;
use axum::http::Request;
use tower::ServiceExt;
#[tokio::test]
async fn test_debug_index_route() {
let app = build_debug_router();
let response = app
.oneshot(
Request::builder()
.uri("/debug/pprof/")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_debug_cmdline_route() {
let app = build_debug_router();
let response = app
.oneshot(
Request::builder()
.uri("/debug/pprof/cmdline")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
}

78
seaweed-volume/src/server/heartbeat.rs

@ -136,6 +136,14 @@ async fn do_heartbeat(
std::sync::atomic::Ordering::Relaxed,
);
}
let metrics_changed = apply_metrics_push_settings(
state,
&hb_resp.metrics_address,
hb_resp.metrics_interval_seconds,
);
if metrics_changed {
state.metrics_notify.notify_waiters();
}
if !hb_resp.leader.is_empty() {
return Ok(Some(hb_resp.leader));
}
@ -196,6 +204,22 @@ async fn do_heartbeat(
}
}
fn apply_metrics_push_settings(
state: &VolumeServerState,
address: &str,
interval_seconds: u32,
) -> bool {
let mut runtime = state.metrics_runtime.write().unwrap();
if runtime.push_gateway.address == address
&& runtime.push_gateway.interval_seconds == interval_seconds
{
return false;
}
runtime.push_gateway.address = address.to_string();
runtime.push_gateway.interval_seconds = interval_seconds;
true
}
/// Collect volume information into a Heartbeat message.
fn collect_heartbeat(
config: &HeartbeatConfig,
@ -315,8 +339,12 @@ fn collect_ec_heartbeat(state: &Arc<VolumeServerState>) -> master_pb::Heartbeat
mod tests {
use super::*;
use crate::config::MinFreeSpace;
use crate::config::ReadMode;
use crate::remote_storage::s3_tier::S3TierRegistry;
use crate::security::{Guard, SigningKey};
use crate::storage::needle_map::NeedleMapKind;
use crate::storage::types::{DiskType, VolumeId};
use std::sync::RwLock;
fn test_config() -> HeartbeatConfig {
HeartbeatConfig {
@ -331,6 +359,41 @@ mod tests {
}
}
fn test_state_with_store(store: Store) -> Arc<VolumeServerState> {
Arc::new(VolumeServerState {
store: RwLock::new(store),
guard: Guard::new(&[], SigningKey(vec![]), 0, SigningKey(vec![]), 0),
is_stopping: RwLock::new(false),
maintenance: std::sync::atomic::AtomicBool::new(false),
state_version: std::sync::atomic::AtomicU32::new(0),
concurrent_upload_limit: 0,
concurrent_download_limit: 0,
inflight_upload_data_timeout: std::time::Duration::from_secs(60),
inflight_download_data_timeout: std::time::Duration::from_secs(60),
inflight_upload_bytes: std::sync::atomic::AtomicI64::new(0),
inflight_download_bytes: std::sync::atomic::AtomicI64::new(0),
upload_notify: tokio::sync::Notify::new(),
download_notify: tokio::sync::Notify::new(),
data_center: String::new(),
rack: String::new(),
file_size_limit_bytes: 0,
is_heartbeating: std::sync::atomic::AtomicBool::new(false),
has_master: true,
pre_stop_seconds: 0,
volume_state_notify: tokio::sync::Notify::new(),
write_queue: std::sync::OnceLock::new(),
s3_tier_registry: std::sync::RwLock::new(S3TierRegistry::new()),
read_mode: ReadMode::Local,
master_url: String::new(),
self_url: String::new(),
http_client: reqwest::Client::new(),
metrics_runtime: std::sync::RwLock::new(Default::default()),
metrics_notify: tokio::sync::Notify::new(),
has_slow_read: true,
read_buffer_size_bytes: 4 * 1024 * 1024,
})
}
#[test]
fn test_build_heartbeat_includes_store_identity_and_disk_metadata() {
let temp_dir = tempfile::tempdir().unwrap();
@ -392,4 +455,19 @@ mod tests {
assert!(heartbeat.volumes.is_empty());
assert!(heartbeat.has_no_volumes);
}
#[test]
fn test_apply_metrics_push_settings_updates_runtime_state() {
let store = Store::new(NeedleMapKind::InMemory);
let state = test_state_with_store(store);
assert!(apply_metrics_push_settings(&state, "pushgateway:9091", 15,));
{
let runtime = state.metrics_runtime.read().unwrap();
assert_eq!(runtime.push_gateway.address, "pushgateway:9091");
assert_eq!(runtime.push_gateway.interval_seconds, 15);
}
assert!(!apply_metrics_push_settings(&state, "pushgateway:9091", 15,));
}
}

5
seaweed-volume/src/server/mod.rs

@ -1,5 +1,6 @@
pub mod volume_server;
pub mod handlers;
pub mod debug;
pub mod grpc_server;
pub mod handlers;
pub mod heartbeat;
pub mod volume_server;
pub mod write_queue;

112
seaweed-volume/src/server/volume_server.rs

@ -9,16 +9,16 @@
//!
//! Matches Go's server/volume_server.go.
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use axum::{
Router,
routing::{get, any},
middleware::{self, Next},
extract::{Request, State},
http::{HeaderValue, Method, StatusCode},
middleware::{self, Next},
response::{IntoResponse, Response},
http::{StatusCode, HeaderValue, Method},
routing::{any, get},
Router,
};
use crate::config::ReadMode;
@ -28,6 +28,11 @@ use crate::storage::store::Store;
use super::handlers;
use super::write_queue::WriteQueue;
#[derive(Clone, Debug, Default)]
pub struct RuntimeMetricsConfig {
pub push_gateway: crate::metrics::PushGatewayConfig,
}
/// Shared state for the volume server.
pub struct VolumeServerState {
pub store: RwLock<Store>,
@ -74,6 +79,12 @@ pub struct VolumeServerState {
pub self_url: String,
/// HTTP client for proxy requests and master lookups.
pub http_client: reqwest::Client,
/// Metrics push settings learned from master heartbeat responses.
pub metrics_runtime: std::sync::RwLock<RuntimeMetricsConfig>,
pub metrics_notify: tokio::sync::Notify,
/// Read tuning flags for large-file streaming.
pub has_slow_read: bool,
pub read_buffer_size_bytes: usize,
}
impl VolumeServerState {
@ -86,6 +97,10 @@ impl VolumeServerState {
}
}
pub fn build_metrics_router() -> Router {
Router::new().route("/metrics", get(handlers::metrics_handler))
}
/// Middleware: set Server header, echo x-amz-request-id, set CORS if Origin present.
async fn common_headers_middleware(request: Request, next: Next) -> Response {
let origin = request.headers().get("origin").cloned();
@ -94,10 +109,7 @@ async fn common_headers_middleware(request: Request, next: Next) -> Response {
let mut response = next.run(request).await;
let headers = response.headers_mut();
headers.insert(
"Server",
HeaderValue::from_static("SeaweedFS Volume 0.1.0"),
);
headers.insert("Server", HeaderValue::from_static("SeaweedFS Volume 0.1.0"));
if let Some(rid) = request_id {
headers.insert("x-amz-request-id", rid);
@ -110,7 +122,10 @@ async fn common_headers_middleware(request: Request, next: Next) -> Response {
if origin.is_some() {
headers.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
headers.insert("Access-Control-Allow-Credentials", HeaderValue::from_static("true"));
headers.insert(
"Access-Control-Allow-Credentials",
HeaderValue::from_static("true"),
);
}
response
@ -119,28 +134,30 @@ async fn common_headers_middleware(request: Request, next: Next) -> Response {
/// Admin store handler — dispatches based on HTTP method.
/// Matches Go's privateStoreHandler: GET/HEAD → read, POST/PUT → write,
/// DELETE → delete, OPTIONS → CORS headers, anything else → 400.
async fn admin_store_handler(
state: State<Arc<VolumeServerState>>,
request: Request,
) -> Response {
async fn admin_store_handler(state: State<Arc<VolumeServerState>>, request: Request) -> Response {
match request.method().clone() {
Method::GET | Method::HEAD => handlers::get_or_head_handler_from_request(state, request).await,
Method::GET | Method::HEAD => {
handlers::get_or_head_handler_from_request(state, request).await
}
Method::POST | Method::PUT => handlers::post_handler(state, request).await,
Method::DELETE => handlers::delete_handler(state, request).await,
Method::OPTIONS => admin_options_response(),
_ => (StatusCode::BAD_REQUEST, format!("{{\"error\":\"unsupported method {}\"}}", request.method())).into_response(),
_ => (
StatusCode::BAD_REQUEST,
format!("{{\"error\":\"unsupported method {}\"}}", request.method()),
)
.into_response(),
}
}
/// Public store handler — dispatches based on HTTP method.
/// Matches Go's publicReadOnlyHandler: GET/HEAD → read, OPTIONS → CORS,
/// anything else → 200 (passthrough no-op).
async fn public_store_handler(
state: State<Arc<VolumeServerState>>,
request: Request,
) -> Response {
async fn public_store_handler(state: State<Arc<VolumeServerState>>, request: Request) -> Response {
match request.method().clone() {
Method::GET | Method::HEAD => handlers::get_or_head_handler_from_request(state, request).await,
Method::GET | Method::HEAD => {
handlers::get_or_head_handler_from_request(state, request).await
}
Method::OPTIONS => public_options_response(),
_ => StatusCode::OK.into_response(),
}
@ -181,20 +198,31 @@ pub fn build_admin_router(state: Arc<VolumeServerState>) -> Router {
Router::new()
.route("/status", get(handlers::status_handler))
.route("/healthz", get(handlers::healthz_handler))
.route("/metrics", get(handlers::metrics_handler))
.route("/stats/counter", get(handlers::stats_counter_handler))
.route("/stats/memory", get(handlers::stats_memory_handler))
.route("/stats/disk", get(handlers::stats_disk_handler))
.route("/favicon.ico", get(handlers::favicon_handler))
.route("/seaweedfsstatic/*path", get(handlers::static_asset_handler))
.route(
"/seaweedfsstatic/*path",
get(handlers::static_asset_handler),
)
.route("/ui/index.html", get(handlers::ui_handler))
.route("/", any(|_state: State<Arc<VolumeServerState>>, request: Request| async move {
match request.method().clone() {
Method::OPTIONS => admin_options_response(),
Method::GET => StatusCode::OK.into_response(),
_ => (StatusCode::BAD_REQUEST, format!("{{\"error\":\"unsupported method {}\"}}", request.method())).into_response(),
}
}))
.route(
"/",
any(
|_state: State<Arc<VolumeServerState>>, request: Request| async move {
match request.method().clone() {
Method::OPTIONS => admin_options_response(),
Method::GET => StatusCode::OK.into_response(),
_ => (
StatusCode::BAD_REQUEST,
format!("{{\"error\":\"unsupported method {}\"}}", request.method()),
)
.into_response(),
}
},
),
)
.route("/:path", any(admin_store_handler))
.route("/:vid/:fid", any(admin_store_handler))
.route("/:vid/:fid/:filename", any(admin_store_handler))
@ -207,14 +235,22 @@ pub fn build_public_router(state: Arc<VolumeServerState>) -> Router {
Router::new()
.route("/healthz", get(handlers::healthz_handler))
.route("/favicon.ico", get(handlers::favicon_handler))
.route("/seaweedfsstatic/*path", get(handlers::static_asset_handler))
.route("/", any(|_state: State<Arc<VolumeServerState>>, request: Request| async move {
match request.method().clone() {
Method::OPTIONS => public_options_response(),
Method::GET => StatusCode::OK.into_response(),
_ => StatusCode::OK.into_response(),
}
}))
.route(
"/seaweedfsstatic/*path",
get(handlers::static_asset_handler),
)
.route(
"/",
any(
|_state: State<Arc<VolumeServerState>>, request: Request| async move {
match request.method().clone() {
Method::OPTIONS => public_options_response(),
Method::GET => StatusCode::OK.into_response(),
_ => StatusCode::OK.into_response(),
}
},
),
)
.route("/:path", any(public_store_handler))
.route("/:vid/:fid", any(public_store_handler))
.route("/:vid/:fid/:filename", any(public_store_handler))

34
seaweed-volume/src/server/write_queue.rs

@ -42,10 +42,7 @@ impl WriteQueue {
/// The worker holds a reference to `state` for accessing the store.
pub fn new(state: Arc<VolumeServerState>, capacity: usize) -> Self {
let (tx, rx) = mpsc::channel(capacity);
let worker = WriteQueueWorker {
rx,
state,
};
let worker = WriteQueueWorker { rx, state };
tokio::spawn(worker.run());
WriteQueue { tx }
}
@ -53,11 +50,7 @@ impl WriteQueue {
/// Submit a write request and wait for the result.
///
/// Returns `Err` if the worker has shut down or the response channel was dropped.
pub async fn submit(
&self,
volume_id: VolumeId,
needle: Needle,
) -> WriteResult {
pub async fn submit(&self, volume_id: VolumeId, needle: Needle) -> WriteResult {
let (response_tx, response_rx) = oneshot::channel();
let request = WriteRequest {
volume_id,
@ -166,20 +159,15 @@ mod tests {
/// Helper to create a minimal VolumeServerState for testing.
fn make_test_state() -> Arc<VolumeServerState> {
use std::sync::RwLock;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32};
use crate::security::{Guard, SigningKey};
use crate::storage::store::Store;
use crate::server::volume_server::RuntimeMetricsConfig;
use crate::storage::needle_map::NeedleMapKind;
use crate::storage::store::Store;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32};
use std::sync::RwLock;
let store = Store::new(NeedleMapKind::InMemory);
let guard = Guard::new(
&[],
SigningKey(vec![]),
0,
SigningKey(vec![]),
0,
);
let guard = Guard::new(&[], SigningKey(vec![]), 0, SigningKey(vec![]), 0);
Arc::new(VolumeServerState {
store: RwLock::new(store),
@ -203,11 +191,17 @@ mod tests {
pre_stop_seconds: 0,
volume_state_notify: tokio::sync::Notify::new(),
write_queue: std::sync::OnceLock::new(),
s3_tier_registry: std::sync::RwLock::new(crate::remote_storage::s3_tier::S3TierRegistry::new()),
s3_tier_registry: std::sync::RwLock::new(
crate::remote_storage::s3_tier::S3TierRegistry::new(),
),
read_mode: crate::config::ReadMode::Local,
master_url: String::new(),
self_url: String::new(),
http_client: reqwest::Client::new(),
metrics_runtime: std::sync::RwLock::new(RuntimeMetricsConfig::default()),
metrics_notify: tokio::sync::Notify::new(),
has_slow_read: true,
read_buffer_size_bytes: 4 * 1024 * 1024,
})
}

45
seaweed-volume/tests/http_integration.rs

@ -10,7 +10,9 @@ use axum::http::{Request, StatusCode};
use tower::ServiceExt; // for `oneshot`
use seaweed_volume::security::{Guard, SigningKey};
use seaweed_volume::server::volume_server::{build_admin_router, VolumeServerState};
use seaweed_volume::server::volume_server::{
build_admin_router, build_metrics_router, VolumeServerState,
};
use seaweed_volume::storage::needle_map::NeedleMapKind;
use seaweed_volume::storage::store::Store;
use seaweed_volume::storage::types::{DiskType, VolumeId};
@ -68,6 +70,12 @@ fn test_state() -> (Arc<VolumeServerState>, TempDir) {
master_url: String::new(),
self_url: String::new(),
http_client: reqwest::Client::new(),
metrics_runtime: std::sync::RwLock::new(
seaweed_volume::server::volume_server::RuntimeMetricsConfig::default(),
),
metrics_notify: tokio::sync::Notify::new(),
has_slow_read: false,
read_buffer_size_bytes: 1024 * 1024,
});
(state, tmp)
}
@ -163,6 +171,41 @@ async fn status_returns_json_with_version_and_volumes() {
assert_eq!(volumes[0]["Id"], 1);
}
#[tokio::test]
async fn admin_router_does_not_expose_metrics() {
let (state, _tmp) = test_state();
let app = build_admin_router(state);
let response = app
.oneshot(
Request::builder()
.uri("/metrics")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn metrics_router_serves_metrics() {
let app = build_metrics_router();
let response = app
.oneshot(
Request::builder()
.uri("/metrics")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
// ============================================================================
// 4. POST writes data, then GET reads it back
// ============================================================================

Loading…
Cancel
Save