Browse Source

Match Go stats endpoint payloads

rust-volume-server
Chris Lu 4 days ago
parent
commit
9ac1b54bfb
  1. 217
      seaweed-volume/src/server/handlers.rs
  2. 1
      seaweed-volume/src/server/mod.rs
  3. 241
      seaweed-volume/src/server/server_stats.rs
  4. 70
      seaweed-volume/src/server/volume_server.rs

217
seaweed-volume/src/server/handlers.rs

@ -2312,79 +2312,17 @@ pub async fn status_handler(
left.cmp(&right)
});
// Build disk statuses
let mut disk_statuses = Vec::new();
for loc in &store.locations {
let dir = &loc.directory;
let resolved_dir = std::path::Path::new(dir)
.canonicalize()
.map(|path| path.to_string_lossy().to_string())
.unwrap_or_else(|_| dir.clone());
let (all, free) = crate::storage::disk_location::get_disk_stats(&resolved_dir);
let used = all.saturating_sub(free);
let percent_free = if all > 0 {
(free as f64 / all as f64) * 100.0
} else {
0.0
};
let percent_used = if all > 0 {
(used as f64 / all as f64) * 100.0
} else {
0.0
};
let mut ds = serde_json::Map::new();
ds.insert("dir".to_string(), serde_json::Value::from(resolved_dir));
ds.insert("all".to_string(), serde_json::Value::from(all));
ds.insert("used".to_string(), serde_json::Value::from(used));
ds.insert("free".to_string(), serde_json::Value::from(free));
ds.insert(
"percent_free".to_string(),
serde_json::Value::from(percent_free),
);
ds.insert(
"percent_used".to_string(),
serde_json::Value::from(percent_used),
);
ds.insert(
"disk_type".to_string(),
serde_json::Value::from(loc.disk_type.to_string()),
);
disk_statuses.push(serde_json::Value::Object(ds));
}
let mut m = serde_json::Map::new();
m.insert(
"Version".to_string(),
serde_json::Value::from(crate::version::full_version()),
serde_json::Value::from(crate::version::version()),
);
m.insert("Volumes".to_string(), serde_json::Value::Array(volumes));
m.insert(
"DiskStatuses".to_string(),
serde_json::Value::Array(disk_statuses),
serde_json::Value::Array(build_disk_statuses(&store)),
);
let json_value = serde_json::Value::Object(m);
let is_pretty = params.pretty.as_ref().is_some_and(|s| !s.is_empty());
let json_body = if is_pretty {
serde_json::to_string_pretty(&json_value).unwrap()
} else {
serde_json::to_string(&json_value).unwrap()
};
if let Some(ref cb) = params.callback {
let jsonp = format!("{}({})", cb, json_body);
Response::builder()
.header(header::CONTENT_TYPE, "application/javascript")
.body(Body::from(jsonp))
.unwrap()
} else {
Response::builder()
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(json_body))
.unwrap()
}
json_response_with_params(StatusCode::OK, &serde_json::Value::Object(m), Some(&params))
}
// ============================================================================
@ -2424,15 +2362,18 @@ pub async fn metrics_handler() -> Response {
// Stats Handlers
// ============================================================================
pub async fn stats_counter_handler() -> Response {
let body = metrics::gather_metrics();
(StatusCode::OK, [(header::CONTENT_TYPE, "text/plain")], body).into_response()
pub async fn stats_counter_handler(Query(params): Query<ReadQueryParams>) -> Response {
let payload = serde_json::json!({
"Version": crate::version::version(),
"Counters": super::server_stats::snapshot(),
});
json_response_with_params(StatusCode::OK, &payload, Some(&params))
}
pub async fn stats_memory_handler() -> Response {
pub async fn stats_memory_handler(Query(params): Query<ReadQueryParams>) -> Response {
let mem = super::memory_status::collect_mem_status();
let info = serde_json::json!({
"Version": crate::version::full_version(),
let payload = serde_json::json!({
"Version": crate::version::version(),
"Memory": {
"goroutines": mem.goroutines,
"all": mem.all,
@ -2443,37 +2384,19 @@ pub async fn stats_memory_handler() -> Response {
"stack": mem.stack,
},
});
(
StatusCode::OK,
[(header::CONTENT_TYPE, "application/json")],
info.to_string(),
)
.into_response()
json_response_with_params(StatusCode::OK, &payload, Some(&params))
}
pub async fn stats_disk_handler(State(state): State<Arc<VolumeServerState>>) -> Response {
pub async fn stats_disk_handler(
Query(params): Query<ReadQueryParams>,
State(state): State<Arc<VolumeServerState>>,
) -> Response {
let store = state.store.read().unwrap();
let mut ds = Vec::new();
for loc in &store.locations {
let dir = loc.directory.clone();
let (all, free) = crate::storage::disk_location::get_disk_stats(&dir);
ds.push(serde_json::json!({
"dir": dir,
"all": all,
"used": all - free,
"free": free,
}));
}
let info = serde_json::json!({
"Version": crate::version::full_version(),
"DiskStatuses": ds,
let payload = serde_json::json!({
"Version": crate::version::version(),
"DiskStatuses": build_disk_statuses(&store),
});
(
StatusCode::OK,
[(header::CONTENT_TYPE, "application/json")],
info.to_string(),
)
.into_response()
json_response_with_params(StatusCode::OK, &payload, Some(&params))
}
// ============================================================================
@ -2657,6 +2580,80 @@ fn try_expand_chunk_manifest(
// Helpers
// ============================================================================
fn absolute_display_path(path: &str) -> String {
let p = std::path::Path::new(path);
if p.is_absolute() {
return path.to_string();
}
std::env::current_dir()
.map(|cwd| cwd.join(p).to_string_lossy().to_string())
.unwrap_or_else(|_| path.to_string())
}
fn build_disk_statuses(store: &crate::storage::store::Store) -> Vec<serde_json::Value> {
let mut disk_statuses = Vec::new();
for loc in &store.locations {
let resolved_dir = absolute_display_path(&loc.directory);
let (all, free) = crate::storage::disk_location::get_disk_stats(&resolved_dir);
let used = all.saturating_sub(free);
let percent_free = if all > 0 {
(free as f64 / all as f64) * 100.0
} else {
0.0
};
let percent_used = if all > 0 {
(used as f64 / all as f64) * 100.0
} else {
0.0
};
disk_statuses.push(serde_json::json!({
"dir": resolved_dir,
"all": all,
"used": used,
"free": free,
"percent_free": percent_free,
"percent_used": percent_used,
"disk_type": loc.disk_type.to_string(),
}));
}
disk_statuses
}
fn json_response_with_params<T: Serialize>(
status: StatusCode,
body: &T,
params: Option<&ReadQueryParams>,
) -> Response {
let is_pretty = params
.and_then(|params| params.pretty.as_ref())
.is_some_and(|value| !value.is_empty());
let callback = params
.and_then(|params| params.callback.as_ref())
.filter(|value| !value.is_empty())
.cloned();
let json_body = if is_pretty {
serde_json::to_string_pretty(body).unwrap()
} else {
serde_json::to_string(body).unwrap()
};
if let Some(callback) = callback {
Response::builder()
.status(status)
.header(header::CONTENT_TYPE, "application/javascript")
.body(Body::from(format!("{}({})", callback, json_body)))
.unwrap()
} else {
Response::builder()
.status(status)
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(json_body))
.unwrap()
}
}
/// Return a JSON error response with optional query string for pretty/JSONP support.
/// Supports `?pretty=<any non-empty value>` for pretty-printed JSON and `?callback=fn` for JSONP,
/// matching Go's writeJsonError behavior.
@ -2897,7 +2894,7 @@ mod tests {
#[tokio::test]
async fn test_stats_memory_handler_matches_go_memstatus_shape() {
let response = stats_memory_handler().await;
let response = stats_memory_handler(Query(ReadQueryParams::default())).await;
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
@ -2911,6 +2908,28 @@ mod tests {
}
}
#[tokio::test]
async fn test_stats_counter_handler_matches_go_json_shape() {
super::super::server_stats::reset_for_tests();
super::super::server_stats::record_read_request();
let response = stats_counter_handler(Query(ReadQueryParams::default())).await;
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let payload: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(
payload.get("Version").and_then(|value| value.as_str()),
Some(crate::version::version())
);
let counters = payload.get("Counters").unwrap();
assert!(counters.get("ReadRequests").is_some());
assert!(counters.get("Requests").is_some());
}
#[test]
fn test_is_compressible_file_type() {
// Text types

1
seaweed-volume/src/server/mod.rs

@ -6,5 +6,6 @@ pub mod heartbeat;
pub mod memory_status;
pub mod profiling;
pub mod request_id;
pub mod server_stats;
pub mod volume_server;
pub mod write_queue;

241
seaweed-volume/src/server/server_stats.rs

@ -0,0 +1,241 @@
use chrono::{Datelike, Local, Timelike};
use serde::Serialize;
use std::sync::{LazyLock, Mutex};
use std::time::Instant;
static START_TIME: LazyLock<Instant> = LazyLock::new(Instant::now);
static SERVER_STATS: LazyLock<ServerStats> = LazyLock::new(ServerStats::default);
#[derive(Default)]
pub struct ServerStats {
inner: Mutex<ServerStatsInner>,
}
#[derive(Default)]
struct ServerStatsInner {
requests: DurationCounter,
connections: DurationCounter,
assign_requests: DurationCounter,
read_requests: DurationCounter,
write_requests: DurationCounter,
delete_requests: DurationCounter,
bytes_in: DurationCounter,
bytes_out: DurationCounter,
}
#[derive(Clone, Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct ServerStatsSnapshot {
pub requests: DurationCounterSnapshot,
pub connections: DurationCounterSnapshot,
pub assign_requests: DurationCounterSnapshot,
pub read_requests: DurationCounterSnapshot,
pub write_requests: DurationCounterSnapshot,
pub delete_requests: DurationCounterSnapshot,
pub bytes_in: DurationCounterSnapshot,
pub bytes_out: DurationCounterSnapshot,
}
#[derive(Clone, Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct DurationCounterSnapshot {
pub minute_counter: RoundRobinCounterSnapshot,
pub hour_counter: RoundRobinCounterSnapshot,
pub day_counter: RoundRobinCounterSnapshot,
pub week_counter: RoundRobinCounterSnapshot,
}
#[derive(Clone, Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct RoundRobinCounterSnapshot {
pub last_index: i32,
pub values: Vec<i64>,
pub counts: Vec<i64>,
}
#[derive(Clone)]
struct DurationCounter {
minute_counter: RoundRobinCounter,
hour_counter: RoundRobinCounter,
day_counter: RoundRobinCounter,
week_counter: RoundRobinCounter,
}
#[derive(Clone)]
struct RoundRobinCounter {
last_index: i32,
values: Vec<i64>,
counts: Vec<i64>,
}
impl Default for DurationCounter {
fn default() -> Self {
Self {
minute_counter: RoundRobinCounter::new(60),
hour_counter: RoundRobinCounter::new(60),
day_counter: RoundRobinCounter::new(24),
week_counter: RoundRobinCounter::new(7),
}
}
}
impl RoundRobinCounter {
fn new(slots: usize) -> Self {
Self {
last_index: -1,
values: vec![0; slots],
counts: vec![0; slots],
}
}
fn add(&mut self, index: usize, val: i64) {
if index >= self.values.len() {
return;
}
while self.last_index != index as i32 {
self.last_index = (self.last_index + 1).rem_euclid(self.values.len() as i32);
self.values[self.last_index as usize] = 0;
self.counts[self.last_index as usize] = 0;
}
self.values[index] += val;
self.counts[index] += 1;
}
fn snapshot(&self) -> RoundRobinCounterSnapshot {
RoundRobinCounterSnapshot {
last_index: self.last_index,
values: self.values.clone(),
counts: self.counts.clone(),
}
}
}
impl DurationCounter {
fn add_now(&mut self, val: i64) {
let now = Local::now();
self.minute_counter.add(now.second() as usize, val);
self.hour_counter.add(now.minute() as usize, val);
self.day_counter.add(now.hour() as usize, val);
self.week_counter
.add(now.weekday().num_days_from_sunday() as usize, val);
}
fn snapshot(&self) -> DurationCounterSnapshot {
DurationCounterSnapshot {
minute_counter: self.minute_counter.snapshot(),
hour_counter: self.hour_counter.snapshot(),
day_counter: self.day_counter.snapshot(),
week_counter: self.week_counter.snapshot(),
}
}
}
impl ServerStatsInner {
fn snapshot(&self) -> ServerStatsSnapshot {
ServerStatsSnapshot {
requests: self.requests.snapshot(),
connections: self.connections.snapshot(),
assign_requests: self.assign_requests.snapshot(),
read_requests: self.read_requests.snapshot(),
write_requests: self.write_requests.snapshot(),
delete_requests: self.delete_requests.snapshot(),
bytes_in: self.bytes_in.snapshot(),
bytes_out: self.bytes_out.snapshot(),
}
}
}
impl ServerStats {
fn update<F>(&self, update: F)
where
F: FnOnce(&mut ServerStatsInner),
{
let mut inner = self.inner.lock().unwrap();
update(&mut inner);
}
fn snapshot(&self) -> ServerStatsSnapshot {
self.inner.lock().unwrap().snapshot()
}
}
impl RoundRobinCounterSnapshot {
pub fn to_list(&self) -> Vec<i64> {
if self.values.is_empty() {
return Vec::new();
}
let mut ret = Vec::with_capacity(self.values.len());
let mut index = self.last_index;
let mut step = self.values.len();
while step > 0 {
step -= 1;
index += 1;
if index >= self.values.len() as i32 {
index = 0;
}
ret.push(self.values[index as usize]);
}
ret
}
}
pub fn init_process_start() {
LazyLock::force(&START_TIME);
LazyLock::force(&SERVER_STATS);
}
pub fn uptime_string() -> String {
let secs = START_TIME.elapsed().as_secs();
let hours = secs / 3600;
let minutes = (secs % 3600) / 60;
let seconds = secs % 60;
let mut out = String::new();
if hours > 0 {
out.push_str(&format!("{}h", hours));
}
if hours > 0 || minutes > 0 {
out.push_str(&format!("{}m", minutes));
}
out.push_str(&format!("{}s", seconds));
out
}
pub fn snapshot() -> ServerStatsSnapshot {
SERVER_STATS.snapshot()
}
pub fn record_request_open() {
SERVER_STATS.update(|inner| inner.requests.add_now(1));
}
pub fn record_request_close() {
SERVER_STATS.update(|inner| inner.requests.add_now(-1));
}
pub fn record_read_request() {
SERVER_STATS.update(|inner| inner.read_requests.add_now(1));
}
pub fn record_write_request() {
SERVER_STATS.update(|inner| inner.write_requests.add_now(1));
}
pub fn record_delete_request() {
SERVER_STATS.update(|inner| inner.delete_requests.add_now(1));
}
pub fn record_bytes_in(bytes: i64) {
SERVER_STATS.update(|inner| inner.bytes_in.add_now(bytes));
}
pub fn record_bytes_out(bytes: i64) {
SERVER_STATS.update(|inner| inner.bytes_out.add_now(bytes));
}
#[cfg(test)]
pub fn reset_for_tests() {
LazyLock::force(&START_TIME);
let mut inner = SERVER_STATS.inner.lock().unwrap();
*inner = ServerStatsInner::default();
}

70
seaweed-volume/src/server/volume_server.rs

@ -14,7 +14,7 @@ use std::sync::{Arc, RwLock};
use axum::{
extract::{Request, State},
http::{HeaderValue, Method, StatusCode},
http::{header, HeaderValue, Method, StatusCode},
middleware::{self, Next},
response::{IntoResponse, Response},
routing::{any, get},
@ -160,23 +160,57 @@ async fn common_headers_middleware(request: Request, next: Next) -> Response {
/// DELETE → delete, OPTIONS → CORS headers, anything else → 400.
async fn admin_store_handler(state: State<Arc<VolumeServerState>>, request: Request) -> Response {
let start = std::time::Instant::now();
let method_str = request.method().as_str().to_string();
let method = request.method().clone();
let method_str = method.as_str().to_string();
let request_bytes = request
.headers()
.get(header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<i64>().ok())
.filter(|value| *value > 0)
.unwrap_or(0);
super::server_stats::record_request_open();
crate::metrics::INFLIGHT_REQUESTS_GAUGE
.with_label_values(&[&method_str])
.inc();
let response = match request.method().clone() {
let response = match method.clone() {
Method::GET | Method::HEAD => {
super::server_stats::record_read_request();
handlers::get_or_head_handler_from_request(state, request).await
}
Method::POST | Method::PUT => handlers::post_handler(state, request).await,
Method::DELETE => handlers::delete_handler(state, request).await,
Method::OPTIONS => admin_options_response(),
Method::POST | Method::PUT => {
super::server_stats::record_write_request();
if request_bytes > 0 {
super::server_stats::record_bytes_in(request_bytes);
}
handlers::post_handler(state, request).await
}
Method::DELETE => {
super::server_stats::record_delete_request();
handlers::delete_handler(state, request).await
}
Method::OPTIONS => {
super::server_stats::record_read_request();
admin_options_response()
}
_ => (
StatusCode::BAD_REQUEST,
format!("{{\"error\":\"unsupported method {}\"}}", request.method()),
)
.into_response(),
};
if method == Method::GET {
if let Some(response_bytes) = response
.headers()
.get(header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<i64>().ok())
.filter(|value| *value > 0)
{
super::server_stats::record_bytes_out(response_bytes);
}
}
super::server_stats::record_request_close();
crate::metrics::INFLIGHT_REQUESTS_GAUGE
.with_label_values(&[&method_str])
.dec();
@ -194,17 +228,35 @@ async fn admin_store_handler(state: State<Arc<VolumeServerState>>, request: Requ
/// anything else → 200 (passthrough no-op).
async fn public_store_handler(state: State<Arc<VolumeServerState>>, request: Request) -> Response {
let start = std::time::Instant::now();
let method_str = request.method().as_str().to_string();
let method = request.method().clone();
let method_str = method.as_str().to_string();
super::server_stats::record_request_open();
crate::metrics::INFLIGHT_REQUESTS_GAUGE
.with_label_values(&[&method_str])
.inc();
let response = match request.method().clone() {
let response = match method.clone() {
Method::GET | Method::HEAD => {
super::server_stats::record_read_request();
handlers::get_or_head_handler_from_request(state, request).await
}
Method::OPTIONS => public_options_response(),
Method::OPTIONS => {
super::server_stats::record_read_request();
public_options_response()
}
_ => StatusCode::OK.into_response(),
};
if method == Method::GET {
if let Some(response_bytes) = response
.headers()
.get(header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<i64>().ok())
.filter(|value| *value > 0)
{
super::server_stats::record_bytes_out(response_bytes);
}
}
super::server_stats::record_request_close();
crate::metrics::INFLIGHT_REQUESTS_GAUGE
.with_label_values(&[&method_str])
.dec();

Loading…
Cancel
Save