From 9ac1b54bfb139f19496d0df3049d72bcf4dbd687 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Mar 2026 14:35:12 -0700 Subject: [PATCH] Match Go stats endpoint payloads --- seaweed-volume/src/server/handlers.rs | 217 ++++++++++--------- seaweed-volume/src/server/mod.rs | 1 + seaweed-volume/src/server/server_stats.rs | 241 +++++++++++++++++++++ seaweed-volume/src/server/volume_server.rs | 70 +++++- 4 files changed, 421 insertions(+), 108 deletions(-) create mode 100644 seaweed-volume/src/server/server_stats.rs diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index 654172dc4..42d4671d3 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -2312,79 +2312,17 @@ pub async fn status_handler( left.cmp(&right) }); - // Build disk statuses - let mut disk_statuses = Vec::new(); - for loc in &store.locations { - let dir = &loc.directory; - let resolved_dir = std::path::Path::new(dir) - .canonicalize() - .map(|path| path.to_string_lossy().to_string()) - .unwrap_or_else(|_| dir.clone()); - let (all, free) = crate::storage::disk_location::get_disk_stats(&resolved_dir); - let used = all.saturating_sub(free); - let percent_free = if all > 0 { - (free as f64 / all as f64) * 100.0 - } else { - 0.0 - }; - let percent_used = if all > 0 { - (used as f64 / all as f64) * 100.0 - } else { - 0.0 - }; - - let mut ds = serde_json::Map::new(); - ds.insert("dir".to_string(), serde_json::Value::from(resolved_dir)); - ds.insert("all".to_string(), serde_json::Value::from(all)); - ds.insert("used".to_string(), serde_json::Value::from(used)); - ds.insert("free".to_string(), serde_json::Value::from(free)); - ds.insert( - "percent_free".to_string(), - serde_json::Value::from(percent_free), - ); - ds.insert( - "percent_used".to_string(), - serde_json::Value::from(percent_used), - ); - ds.insert( - "disk_type".to_string(), - serde_json::Value::from(loc.disk_type.to_string()), - ); - disk_statuses.push(serde_json::Value::Object(ds)); - } - let mut m = serde_json::Map::new(); m.insert( "Version".to_string(), - serde_json::Value::from(crate::version::full_version()), + serde_json::Value::from(crate::version::version()), ); m.insert("Volumes".to_string(), serde_json::Value::Array(volumes)); m.insert( "DiskStatuses".to_string(), - serde_json::Value::Array(disk_statuses), + serde_json::Value::Array(build_disk_statuses(&store)), ); - - let json_value = serde_json::Value::Object(m); - - let is_pretty = params.pretty.as_ref().is_some_and(|s| !s.is_empty()); - let json_body = if is_pretty { - serde_json::to_string_pretty(&json_value).unwrap() - } else { - serde_json::to_string(&json_value).unwrap() - }; - - if let Some(ref cb) = params.callback { - let jsonp = format!("{}({})", cb, json_body); - Response::builder() - .header(header::CONTENT_TYPE, "application/javascript") - .body(Body::from(jsonp)) - .unwrap() - } else { - Response::builder() - .header(header::CONTENT_TYPE, "application/json") - .body(Body::from(json_body)) - .unwrap() - } + json_response_with_params(StatusCode::OK, &serde_json::Value::Object(m), Some(¶ms)) } // ============================================================================ @@ -2424,15 +2362,18 @@ pub async fn metrics_handler() -> Response { // Stats Handlers // ============================================================================ -pub async fn stats_counter_handler() -> Response { - let body = metrics::gather_metrics(); - (StatusCode::OK, [(header::CONTENT_TYPE, "text/plain")], body).into_response() +pub async fn stats_counter_handler(Query(params): Query) -> Response { + let payload = serde_json::json!({ + "Version": crate::version::version(), + "Counters": super::server_stats::snapshot(), + }); + json_response_with_params(StatusCode::OK, &payload, Some(¶ms)) } -pub async fn stats_memory_handler() -> Response { +pub async fn stats_memory_handler(Query(params): Query) -> Response { let mem = super::memory_status::collect_mem_status(); - let info = serde_json::json!({ - "Version": crate::version::full_version(), + let payload = serde_json::json!({ + "Version": crate::version::version(), "Memory": { "goroutines": mem.goroutines, "all": mem.all, @@ -2443,37 +2384,19 @@ pub async fn stats_memory_handler() -> Response { "stack": mem.stack, }, }); - ( - StatusCode::OK, - [(header::CONTENT_TYPE, "application/json")], - info.to_string(), - ) - .into_response() + json_response_with_params(StatusCode::OK, &payload, Some(¶ms)) } -pub async fn stats_disk_handler(State(state): State>) -> Response { +pub async fn stats_disk_handler( + Query(params): Query, + State(state): State>, +) -> Response { let store = state.store.read().unwrap(); - let mut ds = Vec::new(); - for loc in &store.locations { - let dir = loc.directory.clone(); - let (all, free) = crate::storage::disk_location::get_disk_stats(&dir); - ds.push(serde_json::json!({ - "dir": dir, - "all": all, - "used": all - free, - "free": free, - })); - } - let info = serde_json::json!({ - "Version": crate::version::full_version(), - "DiskStatuses": ds, + let payload = serde_json::json!({ + "Version": crate::version::version(), + "DiskStatuses": build_disk_statuses(&store), }); - ( - StatusCode::OK, - [(header::CONTENT_TYPE, "application/json")], - info.to_string(), - ) - .into_response() + json_response_with_params(StatusCode::OK, &payload, Some(¶ms)) } // ============================================================================ @@ -2657,6 +2580,80 @@ fn try_expand_chunk_manifest( // Helpers // ============================================================================ +fn absolute_display_path(path: &str) -> String { + let p = std::path::Path::new(path); + if p.is_absolute() { + return path.to_string(); + } + std::env::current_dir() + .map(|cwd| cwd.join(p).to_string_lossy().to_string()) + .unwrap_or_else(|_| path.to_string()) +} + +fn build_disk_statuses(store: &crate::storage::store::Store) -> Vec { + let mut disk_statuses = Vec::new(); + for loc in &store.locations { + let resolved_dir = absolute_display_path(&loc.directory); + let (all, free) = crate::storage::disk_location::get_disk_stats(&resolved_dir); + let used = all.saturating_sub(free); + let percent_free = if all > 0 { + (free as f64 / all as f64) * 100.0 + } else { + 0.0 + }; + let percent_used = if all > 0 { + (used as f64 / all as f64) * 100.0 + } else { + 0.0 + }; + + disk_statuses.push(serde_json::json!({ + "dir": resolved_dir, + "all": all, + "used": used, + "free": free, + "percent_free": percent_free, + "percent_used": percent_used, + "disk_type": loc.disk_type.to_string(), + })); + } + disk_statuses +} + +fn json_response_with_params( + status: StatusCode, + body: &T, + params: Option<&ReadQueryParams>, +) -> Response { + let is_pretty = params + .and_then(|params| params.pretty.as_ref()) + .is_some_and(|value| !value.is_empty()); + let callback = params + .and_then(|params| params.callback.as_ref()) + .filter(|value| !value.is_empty()) + .cloned(); + + let json_body = if is_pretty { + serde_json::to_string_pretty(body).unwrap() + } else { + serde_json::to_string(body).unwrap() + }; + + if let Some(callback) = callback { + Response::builder() + .status(status) + .header(header::CONTENT_TYPE, "application/javascript") + .body(Body::from(format!("{}({})", callback, json_body))) + .unwrap() + } else { + Response::builder() + .status(status) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(json_body)) + .unwrap() + } +} + /// Return a JSON error response with optional query string for pretty/JSONP support. /// Supports `?pretty=` for pretty-printed JSON and `?callback=fn` for JSONP, /// matching Go's writeJsonError behavior. @@ -2897,7 +2894,7 @@ mod tests { #[tokio::test] async fn test_stats_memory_handler_matches_go_memstatus_shape() { - let response = stats_memory_handler().await; + let response = stats_memory_handler(Query(ReadQueryParams::default())).await; assert_eq!(response.status(), StatusCode::OK); let body = axum::body::to_bytes(response.into_body(), usize::MAX) @@ -2911,6 +2908,28 @@ mod tests { } } + #[tokio::test] + async fn test_stats_counter_handler_matches_go_json_shape() { + super::super::server_stats::reset_for_tests(); + super::super::server_stats::record_read_request(); + + let response = stats_counter_handler(Query(ReadQueryParams::default())).await; + assert_eq!(response.status(), StatusCode::OK); + + let body = axum::body::to_bytes(response.into_body(), usize::MAX) + .await + .unwrap(); + let payload: serde_json::Value = serde_json::from_slice(&body).unwrap(); + + assert_eq!( + payload.get("Version").and_then(|value| value.as_str()), + Some(crate::version::version()) + ); + let counters = payload.get("Counters").unwrap(); + assert!(counters.get("ReadRequests").is_some()); + assert!(counters.get("Requests").is_some()); + } + #[test] fn test_is_compressible_file_type() { // Text types diff --git a/seaweed-volume/src/server/mod.rs b/seaweed-volume/src/server/mod.rs index e0b21f1f9..637a4f426 100644 --- a/seaweed-volume/src/server/mod.rs +++ b/seaweed-volume/src/server/mod.rs @@ -6,5 +6,6 @@ pub mod heartbeat; pub mod memory_status; pub mod profiling; pub mod request_id; +pub mod server_stats; pub mod volume_server; pub mod write_queue; diff --git a/seaweed-volume/src/server/server_stats.rs b/seaweed-volume/src/server/server_stats.rs new file mode 100644 index 000000000..ae42848b7 --- /dev/null +++ b/seaweed-volume/src/server/server_stats.rs @@ -0,0 +1,241 @@ +use chrono::{Datelike, Local, Timelike}; +use serde::Serialize; +use std::sync::{LazyLock, Mutex}; +use std::time::Instant; + +static START_TIME: LazyLock = LazyLock::new(Instant::now); +static SERVER_STATS: LazyLock = LazyLock::new(ServerStats::default); + +#[derive(Default)] +pub struct ServerStats { + inner: Mutex, +} + +#[derive(Default)] +struct ServerStatsInner { + requests: DurationCounter, + connections: DurationCounter, + assign_requests: DurationCounter, + read_requests: DurationCounter, + write_requests: DurationCounter, + delete_requests: DurationCounter, + bytes_in: DurationCounter, + bytes_out: DurationCounter, +} + +#[derive(Clone, Serialize)] +#[serde(rename_all = "PascalCase")] +pub struct ServerStatsSnapshot { + pub requests: DurationCounterSnapshot, + pub connections: DurationCounterSnapshot, + pub assign_requests: DurationCounterSnapshot, + pub read_requests: DurationCounterSnapshot, + pub write_requests: DurationCounterSnapshot, + pub delete_requests: DurationCounterSnapshot, + pub bytes_in: DurationCounterSnapshot, + pub bytes_out: DurationCounterSnapshot, +} + +#[derive(Clone, Serialize)] +#[serde(rename_all = "PascalCase")] +pub struct DurationCounterSnapshot { + pub minute_counter: RoundRobinCounterSnapshot, + pub hour_counter: RoundRobinCounterSnapshot, + pub day_counter: RoundRobinCounterSnapshot, + pub week_counter: RoundRobinCounterSnapshot, +} + +#[derive(Clone, Serialize)] +#[serde(rename_all = "PascalCase")] +pub struct RoundRobinCounterSnapshot { + pub last_index: i32, + pub values: Vec, + pub counts: Vec, +} + +#[derive(Clone)] +struct DurationCounter { + minute_counter: RoundRobinCounter, + hour_counter: RoundRobinCounter, + day_counter: RoundRobinCounter, + week_counter: RoundRobinCounter, +} + +#[derive(Clone)] +struct RoundRobinCounter { + last_index: i32, + values: Vec, + counts: Vec, +} + +impl Default for DurationCounter { + fn default() -> Self { + Self { + minute_counter: RoundRobinCounter::new(60), + hour_counter: RoundRobinCounter::new(60), + day_counter: RoundRobinCounter::new(24), + week_counter: RoundRobinCounter::new(7), + } + } +} + +impl RoundRobinCounter { + fn new(slots: usize) -> Self { + Self { + last_index: -1, + values: vec![0; slots], + counts: vec![0; slots], + } + } + + fn add(&mut self, index: usize, val: i64) { + if index >= self.values.len() { + return; + } + while self.last_index != index as i32 { + self.last_index = (self.last_index + 1).rem_euclid(self.values.len() as i32); + self.values[self.last_index as usize] = 0; + self.counts[self.last_index as usize] = 0; + } + self.values[index] += val; + self.counts[index] += 1; + } + + fn snapshot(&self) -> RoundRobinCounterSnapshot { + RoundRobinCounterSnapshot { + last_index: self.last_index, + values: self.values.clone(), + counts: self.counts.clone(), + } + } +} + +impl DurationCounter { + fn add_now(&mut self, val: i64) { + let now = Local::now(); + self.minute_counter.add(now.second() as usize, val); + self.hour_counter.add(now.minute() as usize, val); + self.day_counter.add(now.hour() as usize, val); + self.week_counter + .add(now.weekday().num_days_from_sunday() as usize, val); + } + + fn snapshot(&self) -> DurationCounterSnapshot { + DurationCounterSnapshot { + minute_counter: self.minute_counter.snapshot(), + hour_counter: self.hour_counter.snapshot(), + day_counter: self.day_counter.snapshot(), + week_counter: self.week_counter.snapshot(), + } + } +} + +impl ServerStatsInner { + fn snapshot(&self) -> ServerStatsSnapshot { + ServerStatsSnapshot { + requests: self.requests.snapshot(), + connections: self.connections.snapshot(), + assign_requests: self.assign_requests.snapshot(), + read_requests: self.read_requests.snapshot(), + write_requests: self.write_requests.snapshot(), + delete_requests: self.delete_requests.snapshot(), + bytes_in: self.bytes_in.snapshot(), + bytes_out: self.bytes_out.snapshot(), + } + } +} + +impl ServerStats { + fn update(&self, update: F) + where + F: FnOnce(&mut ServerStatsInner), + { + let mut inner = self.inner.lock().unwrap(); + update(&mut inner); + } + + fn snapshot(&self) -> ServerStatsSnapshot { + self.inner.lock().unwrap().snapshot() + } +} + +impl RoundRobinCounterSnapshot { + pub fn to_list(&self) -> Vec { + if self.values.is_empty() { + return Vec::new(); + } + let mut ret = Vec::with_capacity(self.values.len()); + let mut index = self.last_index; + let mut step = self.values.len(); + while step > 0 { + step -= 1; + index += 1; + if index >= self.values.len() as i32 { + index = 0; + } + ret.push(self.values[index as usize]); + } + ret + } +} + +pub fn init_process_start() { + LazyLock::force(&START_TIME); + LazyLock::force(&SERVER_STATS); +} + +pub fn uptime_string() -> String { + let secs = START_TIME.elapsed().as_secs(); + let hours = secs / 3600; + let minutes = (secs % 3600) / 60; + let seconds = secs % 60; + let mut out = String::new(); + if hours > 0 { + out.push_str(&format!("{}h", hours)); + } + if hours > 0 || minutes > 0 { + out.push_str(&format!("{}m", minutes)); + } + out.push_str(&format!("{}s", seconds)); + out +} + +pub fn snapshot() -> ServerStatsSnapshot { + SERVER_STATS.snapshot() +} + +pub fn record_request_open() { + SERVER_STATS.update(|inner| inner.requests.add_now(1)); +} + +pub fn record_request_close() { + SERVER_STATS.update(|inner| inner.requests.add_now(-1)); +} + +pub fn record_read_request() { + SERVER_STATS.update(|inner| inner.read_requests.add_now(1)); +} + +pub fn record_write_request() { + SERVER_STATS.update(|inner| inner.write_requests.add_now(1)); +} + +pub fn record_delete_request() { + SERVER_STATS.update(|inner| inner.delete_requests.add_now(1)); +} + +pub fn record_bytes_in(bytes: i64) { + SERVER_STATS.update(|inner| inner.bytes_in.add_now(bytes)); +} + +pub fn record_bytes_out(bytes: i64) { + SERVER_STATS.update(|inner| inner.bytes_out.add_now(bytes)); +} + +#[cfg(test)] +pub fn reset_for_tests() { + LazyLock::force(&START_TIME); + let mut inner = SERVER_STATS.inner.lock().unwrap(); + *inner = ServerStatsInner::default(); +} + diff --git a/seaweed-volume/src/server/volume_server.rs b/seaweed-volume/src/server/volume_server.rs index a6e057014..fc72ae79f 100644 --- a/seaweed-volume/src/server/volume_server.rs +++ b/seaweed-volume/src/server/volume_server.rs @@ -14,7 +14,7 @@ use std::sync::{Arc, RwLock}; use axum::{ extract::{Request, State}, - http::{HeaderValue, Method, StatusCode}, + http::{header, HeaderValue, Method, StatusCode}, middleware::{self, Next}, response::{IntoResponse, Response}, routing::{any, get}, @@ -160,23 +160,57 @@ async fn common_headers_middleware(request: Request, next: Next) -> Response { /// DELETE → delete, OPTIONS → CORS headers, anything else → 400. async fn admin_store_handler(state: State>, request: Request) -> Response { let start = std::time::Instant::now(); - let method_str = request.method().as_str().to_string(); + let method = request.method().clone(); + let method_str = method.as_str().to_string(); + let request_bytes = request + .headers() + .get(header::CONTENT_LENGTH) + .and_then(|value| value.to_str().ok()) + .and_then(|value| value.parse::().ok()) + .filter(|value| *value > 0) + .unwrap_or(0); + super::server_stats::record_request_open(); crate::metrics::INFLIGHT_REQUESTS_GAUGE .with_label_values(&[&method_str]) .inc(); - let response = match request.method().clone() { + let response = match method.clone() { Method::GET | Method::HEAD => { + super::server_stats::record_read_request(); handlers::get_or_head_handler_from_request(state, request).await } - Method::POST | Method::PUT => handlers::post_handler(state, request).await, - Method::DELETE => handlers::delete_handler(state, request).await, - Method::OPTIONS => admin_options_response(), + Method::POST | Method::PUT => { + super::server_stats::record_write_request(); + if request_bytes > 0 { + super::server_stats::record_bytes_in(request_bytes); + } + handlers::post_handler(state, request).await + } + Method::DELETE => { + super::server_stats::record_delete_request(); + handlers::delete_handler(state, request).await + } + Method::OPTIONS => { + super::server_stats::record_read_request(); + admin_options_response() + } _ => ( StatusCode::BAD_REQUEST, format!("{{\"error\":\"unsupported method {}\"}}", request.method()), ) .into_response(), }; + if method == Method::GET { + if let Some(response_bytes) = response + .headers() + .get(header::CONTENT_LENGTH) + .and_then(|value| value.to_str().ok()) + .and_then(|value| value.parse::().ok()) + .filter(|value| *value > 0) + { + super::server_stats::record_bytes_out(response_bytes); + } + } + super::server_stats::record_request_close(); crate::metrics::INFLIGHT_REQUESTS_GAUGE .with_label_values(&[&method_str]) .dec(); @@ -194,17 +228,35 @@ async fn admin_store_handler(state: State>, request: Requ /// anything else → 200 (passthrough no-op). async fn public_store_handler(state: State>, request: Request) -> Response { let start = std::time::Instant::now(); - let method_str = request.method().as_str().to_string(); + let method = request.method().clone(); + let method_str = method.as_str().to_string(); + super::server_stats::record_request_open(); crate::metrics::INFLIGHT_REQUESTS_GAUGE .with_label_values(&[&method_str]) .inc(); - let response = match request.method().clone() { + let response = match method.clone() { Method::GET | Method::HEAD => { + super::server_stats::record_read_request(); handlers::get_or_head_handler_from_request(state, request).await } - Method::OPTIONS => public_options_response(), + Method::OPTIONS => { + super::server_stats::record_read_request(); + public_options_response() + } _ => StatusCode::OK.into_response(), }; + if method == Method::GET { + if let Some(response_bytes) = response + .headers() + .get(header::CONTENT_LENGTH) + .and_then(|value| value.to_str().ok()) + .and_then(|value| value.parse::().ok()) + .filter(|value| *value > 0) + { + super::server_stats::record_bytes_out(response_bytes); + } + } + super::server_stats::record_request_close(); crate::metrics::INFLIGHT_REQUESTS_GAUGE .with_label_values(&[&method_str]) .dec();