Browse Source

feat: add Go-compatible Prometheus metrics to Rust volume server

Closes the metrics gap between Rust and Go volume servers (8 → 23
metrics). Adds handler counters, vacuuming histograms, volume/disk
gauges, inflight request tracking, and concurrent limit gauges.
Centralizes request counting in store handlers instead of per-handler.
rust-volume-server
Chris Lu 20 hours ago
parent
commit
f189844776
  1. 12
      seaweed-volume/src/main.rs
  2. 207
      seaweed-volume/src/metrics.rs
  3. 19
      seaweed-volume/src/server/grpc_server.rs
  4. 95
      seaweed-volume/src/server/handlers.rs
  5. 33
      seaweed-volume/src/server/heartbeat.rs
  6. 38
      seaweed-volume/src/server/volume_server.rs
  7. 32
      seaweed-volume/src/storage/disk_location.rs
  8. 7
      seaweed-volume/src/storage/store.rs

12
seaweed-volume/src/main.rs

@ -152,6 +152,18 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
let _ = state.write_queue.set(wq);
}
// Set initial metric gauges for concurrent limits and max volumes
metrics::CONCURRENT_UPLOAD_LIMIT.set(state.concurrent_upload_limit);
metrics::CONCURRENT_DOWNLOAD_LIMIT.set(state.concurrent_download_limit);
{
let store = state.store.read().unwrap();
let mut max_vols: i64 = 0;
for loc in &store.locations {
max_vols += loc.max_volume_count.load(std::sync::atomic::Ordering::Relaxed) as i64;
}
metrics::MAX_VOLUMES.set(max_vols);
}
// Run initial disk space check
{
let store = state.store.read().unwrap();

207
seaweed-volume/src/metrics.rs

@ -3,8 +3,8 @@
//! Mirrors the Go SeaweedFS volume server metrics.
use prometheus::{
self, Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Opts,
Registry, TextEncoder,
self, Encoder, GaugeVec, HistogramOpts, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec,
Opts, Registry, TextEncoder,
};
use std::sync::Once;
@ -17,30 +17,130 @@ pub struct PushGatewayConfig {
lazy_static::lazy_static! {
pub static ref REGISTRY: Registry = Registry::new();
/// Request counter with label `type` = read | write | delete.
// ---- Request metrics (Go: VolumeServerRequestCounter, VolumeServerRequestHistogram) ----
/// Request counter with labels `type` (HTTP method) and `code` (HTTP status).
pub static ref REQUEST_COUNTER: IntCounterVec = IntCounterVec::new(
Opts::new("volume_server_request_counter", "Volume server request counter"),
&["type"],
Opts::new("SeaweedFS_volumeServer_request_total", "Volume server requests"),
&["type", "code"],
).expect("metric can be created");
/// Request duration histogram with label `type` = read | write | delete.
/// Request duration histogram with label `type` (HTTP method).
pub static ref REQUEST_DURATION: HistogramVec = HistogramVec::new(
HistogramOpts::new("volume_server_request_duration", "Volume server request duration in seconds"),
HistogramOpts::new(
"SeaweedFS_volumeServer_request_seconds",
"Volume server request duration in seconds",
).buckets(exponential_buckets(0.0001, 2.0, 24)),
&["type"],
).expect("metric can be created");
/// Total number of volumes on this server.
pub static ref VOLUMES_TOTAL: IntGauge = IntGauge::new(
"volume_server_volumes_total",
"Total number of volumes",
// ---- Handler counters (Go: VolumeServerHandlerCounter) ----
/// Handler-level operation counter with label `type`.
pub static ref HANDLER_COUNTER: IntCounterVec = IntCounterVec::new(
Opts::new("SeaweedFS_volumeServer_handler_total", "Volume server handler counters"),
&["type"],
).expect("metric can be created");
// ---- Vacuuming metrics (Go: VolumeServerVacuuming*) ----
/// Vacuuming compact counter with label `success` (true/false).
pub static ref VACUUMING_COMPACT_COUNTER: IntCounterVec = IntCounterVec::new(
Opts::new("SeaweedFS_volumeServer_vacuuming_compact_total", "Volume vacuuming compact operations"),
&["success"],
).expect("metric can be created");
/// Vacuuming commit counter with label `success` (true/false).
pub static ref VACUUMING_COMMIT_COUNTER: IntCounterVec = IntCounterVec::new(
Opts::new("SeaweedFS_volumeServer_vacuuming_commit_total", "Volume vacuuming commit operations"),
&["success"],
).expect("metric can be created");
/// Vacuuming duration histogram with label `type` (compact/commit).
pub static ref VACUUMING_HISTOGRAM: HistogramVec = HistogramVec::new(
HistogramOpts::new(
"SeaweedFS_volumeServer_vacuuming_seconds",
"Volume vacuuming duration in seconds",
).buckets(exponential_buckets(0.0001, 2.0, 24)),
&["type"],
).expect("metric can be created");
// ---- Volume gauges (Go: VolumeServerVolumeGauge, VolumeServerReadOnlyVolumeGauge) ----
/// Volumes per collection and type (volume/ec_shards).
pub static ref VOLUME_GAUGE: GaugeVec = GaugeVec::new(
Opts::new("SeaweedFS_volumeServer_volumes", "Number of volumes"),
&["collection", "type"],
).expect("metric can be created");
/// Read-only volumes per collection and type.
pub static ref READ_ONLY_VOLUME_GAUGE: GaugeVec = GaugeVec::new(
Opts::new("SeaweedFS_volumeServer_readOnly_volumes", "Number of read-only volumes"),
&["collection", "type"],
).expect("metric can be created");
/// Maximum number of volumes this server can hold.
pub static ref MAX_VOLUMES: IntGauge = IntGauge::new(
"volume_server_max_volumes",
"SeaweedFS_volumeServer_max_volumes",
"Maximum number of volumes",
).expect("metric can be created");
// ---- Disk size gauges (Go: VolumeServerDiskSizeGauge) ----
/// Actual disk size used by volumes per collection and type (normal/deleted_bytes/ec).
pub static ref DISK_SIZE_GAUGE: GaugeVec = GaugeVec::new(
Opts::new("SeaweedFS_volumeServer_total_disk_size", "Actual disk size used by volumes"),
&["collection", "type"],
).expect("metric can be created");
// ---- Resource gauges (Go: VolumeServerResourceGauge) ----
/// Disk resource usage per directory and type (all/used/free/avail).
pub static ref RESOURCE_GAUGE: GaugeVec = GaugeVec::new(
Opts::new("SeaweedFS_volumeServer_resource", "Server resource usage"),
&["name", "type"],
).expect("metric can be created");
// ---- In-flight gauges (Go: VolumeServerInFlightRequestsGauge, InFlightDownload/UploadSize) ----
/// In-flight requests per HTTP method.
pub static ref INFLIGHT_REQUESTS_GAUGE: IntGaugeVec = IntGaugeVec::new(
Opts::new("SeaweedFS_volumeServer_inFlight_requests", "In-flight requests by type"),
&["type"],
).expect("metric can be created");
/// Concurrent download limit in bytes.
pub static ref CONCURRENT_DOWNLOAD_LIMIT: IntGauge = IntGauge::new(
"SeaweedFS_volumeServer_concurrent_download_limit",
"Limit for total concurrent download size in bytes",
).expect("metric can be created");
/// Concurrent upload limit in bytes.
pub static ref CONCURRENT_UPLOAD_LIMIT: IntGauge = IntGauge::new(
"SeaweedFS_volumeServer_concurrent_upload_limit",
"Limit for total concurrent upload size in bytes",
).expect("metric can be created");
/// Current in-flight download bytes.
pub static ref INFLIGHT_DOWNLOAD_SIZE: IntGauge = IntGauge::new(
"SeaweedFS_volumeServer_inFlight_download_size",
"Current in-flight total download size in bytes",
).expect("metric can be created");
/// Current in-flight upload bytes.
pub static ref INFLIGHT_UPLOAD_SIZE: IntGauge = IntGauge::new(
"SeaweedFS_volumeServer_inFlight_upload_size",
"Current in-flight total upload size in bytes",
).expect("metric can be created");
// ---- Legacy aliases for backward compat with existing code ----
/// Total number of volumes on this server (flat gauge).
pub static ref VOLUMES_TOTAL: IntGauge = IntGauge::new(
"volume_server_volumes_total",
"Total number of volumes",
).expect("metric can be created");
/// Disk size in bytes per directory.
pub static ref DISK_SIZE_BYTES: IntGaugeVec = IntGaugeVec::new(
Opts::new("volume_server_disk_size_bytes", "Disk size in bytes"),
@ -53,7 +153,7 @@ lazy_static::lazy_static! {
&["dir"],
).expect("metric can be created");
/// Current number of in-flight requests.
/// Current number of in-flight requests (flat gauge).
pub static ref INFLIGHT_REQUESTS: IntGauge = IntGauge::new(
"volume_server_inflight_requests",
"Current number of in-flight requests",
@ -66,36 +166,63 @@ lazy_static::lazy_static! {
).expect("metric can be created");
}
/// Generate exponential bucket boundaries for histograms.
fn exponential_buckets(start: f64, factor: f64, count: usize) -> Vec<f64> {
let mut buckets = Vec::with_capacity(count);
let mut val = start;
for _ in 0..count {
buckets.push(val);
val *= factor;
}
buckets
}
// Handler counter type constants (matches Go's metrics_names.go).
pub const WRITE_TO_LOCAL_DISK: &str = "writeToLocalDisk";
pub const ERROR_WRITE_TO_LOCAL_DISK: &str = "errorWriteToLocalDisk";
pub const ERROR_WRITE_TO_REPLICAS: &str = "errorWriteToReplicas";
pub const ERROR_GET_NOT_FOUND: &str = "errorGetNotFound";
pub const ERROR_GET_INTERNAL: &str = "errorGetInternal";
pub const ERROR_CRC: &str = "errorCRC";
pub const ERROR_SIZE_MISMATCH: &str = "errorSizeMismatch";
pub const ERROR_INDEX_OUT_OF_RANGE: &str = "errorIndexOutOfRange";
pub const DOWNLOAD_LIMIT_COND: &str = "downloadLimitCondition";
pub const UPLOAD_LIMIT_COND: &str = "uploadLimitCondition";
static REGISTER_METRICS: Once = Once::new();
/// Register all metrics with the custom registry.
/// Call this once at startup.
pub fn register_metrics() {
REGISTER_METRICS.call_once(|| {
REGISTRY
.register(Box::new(REQUEST_COUNTER.clone()))
.expect("REQUEST_COUNTER registered");
REGISTRY
.register(Box::new(REQUEST_DURATION.clone()))
.expect("REQUEST_DURATION registered");
REGISTRY
.register(Box::new(VOLUMES_TOTAL.clone()))
.expect("VOLUMES_TOTAL registered");
REGISTRY
.register(Box::new(MAX_VOLUMES.clone()))
.expect("MAX_VOLUMES registered");
REGISTRY
.register(Box::new(DISK_SIZE_BYTES.clone()))
.expect("DISK_SIZE_BYTES registered");
REGISTRY
.register(Box::new(DISK_FREE_BYTES.clone()))
.expect("DISK_FREE_BYTES registered");
REGISTRY
.register(Box::new(INFLIGHT_REQUESTS.clone()))
.expect("INFLIGHT_REQUESTS registered");
REGISTRY
.register(Box::new(VOLUME_FILE_COUNT.clone()))
.expect("VOLUME_FILE_COUNT registered");
let metrics: Vec<Box<dyn prometheus::core::Collector>> = vec![
// New Go-compatible metrics
Box::new(REQUEST_COUNTER.clone()),
Box::new(REQUEST_DURATION.clone()),
Box::new(HANDLER_COUNTER.clone()),
Box::new(VACUUMING_COMPACT_COUNTER.clone()),
Box::new(VACUUMING_COMMIT_COUNTER.clone()),
Box::new(VACUUMING_HISTOGRAM.clone()),
Box::new(VOLUME_GAUGE.clone()),
Box::new(READ_ONLY_VOLUME_GAUGE.clone()),
Box::new(MAX_VOLUMES.clone()),
Box::new(DISK_SIZE_GAUGE.clone()),
Box::new(RESOURCE_GAUGE.clone()),
Box::new(INFLIGHT_REQUESTS_GAUGE.clone()),
Box::new(CONCURRENT_DOWNLOAD_LIMIT.clone()),
Box::new(CONCURRENT_UPLOAD_LIMIT.clone()),
Box::new(INFLIGHT_DOWNLOAD_SIZE.clone()),
Box::new(INFLIGHT_UPLOAD_SIZE.clone()),
// Legacy metrics
Box::new(VOLUMES_TOTAL.clone()),
Box::new(DISK_SIZE_BYTES.clone()),
Box::new(DISK_FREE_BYTES.clone()),
Box::new(INFLIGHT_REQUESTS.clone()),
Box::new(VOLUME_FILE_COUNT.clone()),
];
for m in metrics {
REGISTRY.register(m).expect("metric registered");
}
});
}
@ -157,9 +284,9 @@ mod tests {
#[test]
fn test_gather_metrics_returns_text() {
register_metrics();
REQUEST_COUNTER.with_label_values(&["read"]).inc();
REQUEST_COUNTER.with_label_values(&["GET", "200"]).inc();
let output = gather_metrics();
assert!(output.contains("volume_server_request_counter"));
assert!(output.contains("SeaweedFS_volumeServer_request_total"));
}
#[test]
@ -209,7 +336,7 @@ mod tests {
.unwrap();
let body = captured.lock().unwrap().clone().unwrap();
assert!(body.contains("volume_server_request_counter"));
assert!(body.contains("SeaweedFS_volumeServer_request_total"));
server.abort();
}

19
seaweed-volume/src/server/grpc_server.rs

@ -314,6 +314,7 @@ impl VolumeServer for VolumeGrpcService {
let (tx, rx) = tokio::sync::mpsc::channel(16);
tokio::task::spawn_blocking(move || {
let compact_start = std::time::Instant::now();
let report_interval: i64 = 128 * 1024 * 1024;
let next_report = std::sync::atomic::AtomicI64::new(report_interval);
@ -337,6 +338,14 @@ impl VolumeServer for VolumeGrpcService {
})
};
let success = result.is_ok();
crate::metrics::VACUUMING_HISTOGRAM
.with_label_values(&["compact"])
.observe(compact_start.elapsed().as_secs_f64());
crate::metrics::VACUUMING_COMPACT_COUNTER
.with_label_values(&[if success { "true" } else { "false" }])
.inc();
if let Err(e) = result {
let _ = tx.blocking_send(Err(Status::internal(e)));
}
@ -354,8 +363,16 @@ impl VolumeServer for VolumeGrpcService {
) -> Result<Response<volume_server_pb::VacuumVolumeCommitResponse>, Status> {
self.state.check_maintenance()?;
let vid = VolumeId(request.into_inner().volume_id);
let commit_start = std::time::Instant::now();
let mut store = self.state.store.write().unwrap();
match store.commit_compact_volume(vid) {
let result = store.commit_compact_volume(vid);
crate::metrics::VACUUMING_HISTOGRAM
.with_label_values(&["commit"])
.observe(commit_start.elapsed().as_secs_f64());
crate::metrics::VACUUMING_COMMIT_COUNTER
.with_label_values(&[if result.is_ok() { "true" } else { "false" }])
.inc();
match result {
Ok((is_read_only, volume_size)) => Ok(Response::new(
volume_server_pb::VacuumVolumeCommitResponse {
is_read_only,

95
seaweed-volume/src/server/handlers.rs

@ -29,11 +29,13 @@ struct InflightGuard<'a> {
counter: &'a std::sync::atomic::AtomicI64,
bytes: i64,
notify: &'a tokio::sync::Notify,
metric: &'a prometheus::IntGauge,
}
impl<'a> Drop for InflightGuard<'a> {
fn drop(&mut self) {
self.counter.fetch_sub(self.bytes, Ordering::Relaxed);
let new_val = self.counter.fetch_sub(self.bytes, Ordering::Relaxed) - self.bytes;
self.metric.set(new_val);
self.notify.notify_waiters();
}
}
@ -63,9 +65,12 @@ impl http_body::Body for TrackedBody {
impl Drop for TrackedBody {
fn drop(&mut self) {
self.state
let new_val = self
.state
.inflight_download_bytes
.fetch_sub(self.bytes, Ordering::Relaxed);
.fetch_sub(self.bytes, Ordering::Relaxed)
- self.bytes;
metrics::INFLIGHT_DOWNLOAD_SIZE.set(new_val);
self.state.download_notify.notify_waiters();
}
}
@ -177,8 +182,11 @@ impl http_body::Body for StreamingBody {
impl Drop for StreamingBody {
fn drop(&mut self) {
if let Some(ref st) = self.state {
st.inflight_download_bytes
.fetch_sub(self.tracked_bytes, Ordering::Relaxed);
let new_val = st
.inflight_download_bytes
.fetch_sub(self.tracked_bytes, Ordering::Relaxed)
- self.tracked_bytes;
metrics::INFLIGHT_DOWNLOAD_SIZE.set(new_val);
st.download_notify.notify_waiters();
}
}
@ -606,9 +614,6 @@ async fn get_or_head_handler_inner(
query: ReadQueryParams,
request: Request<Body>,
) -> Response {
let start = std::time::Instant::now();
metrics::REQUEST_COUNTER.with_label_values(&["read"]).inc();
let path = request.uri().path().to_string();
let method = request.method().clone();
@ -699,6 +704,9 @@ async fn get_or_head_handler_inner(
.await
.is_err()
{
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::DOWNLOAD_LIMIT_COND])
.inc();
return (StatusCode::TOO_MANY_REQUESTS, "download limit exceeded").into_response();
}
}
@ -730,12 +738,21 @@ async fn get_or_head_handler_inner(
let stream_info = match stream_info {
Ok(info) => Some(info),
Err(crate::storage::volume::VolumeError::NotFound) => {
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::ERROR_GET_NOT_FOUND])
.inc();
return StatusCode::NOT_FOUND.into_response();
}
Err(crate::storage::volume::VolumeError::Deleted) => {
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::ERROR_GET_NOT_FOUND])
.inc();
return StatusCode::NOT_FOUND.into_response();
}
Err(e) => {
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::ERROR_GET_INTERNAL])
.inc();
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("read error: {}", e),
@ -990,15 +1007,13 @@ async fn get_or_head_handler_inner(
info.data_size.to_string().parse().unwrap(),
);
metrics::REQUEST_DURATION
.with_label_values(&["read"])
.observe(start.elapsed().as_secs_f64());
let tracked_bytes = info.data_size as i64;
let tracking_state = if download_guard.is_some() {
state
let new_val = state
.inflight_download_bytes
.fetch_add(tracked_bytes, Ordering::Relaxed);
.fetch_add(tracked_bytes, Ordering::Relaxed)
+ tracked_bytes;
metrics::INFLIGHT_DOWNLOAD_SIZE.set(new_val);
Some(state.clone())
} else {
None
@ -1097,16 +1112,14 @@ async fn get_or_head_handler_inner(
return (StatusCode::OK, response_headers).into_response();
}
metrics::REQUEST_DURATION
.with_label_values(&["read"])
.observe(start.elapsed().as_secs_f64());
// If download throttling is active, wrap the body so we track when it's fully sent
if download_guard.is_some() {
let data_len = data.len() as i64;
state
let new_val = state
.inflight_download_bytes
.fetch_add(data_len, Ordering::Relaxed);
.fetch_add(data_len, Ordering::Relaxed)
+ data_len;
metrics::INFLIGHT_DOWNLOAD_SIZE.set(new_val);
let tracked_body = TrackedBody {
data,
state: state.clone(),
@ -1364,9 +1377,6 @@ pub async fn post_handler(
State(state): State<Arc<VolumeServerState>>,
request: Request<Body>,
) -> Response {
let start = std::time::Instant::now();
metrics::REQUEST_COUNTER.with_label_values(&["write"]).inc();
let path = request.uri().path().to_string();
let query = request.uri().query().unwrap_or("").to_string();
let headers = request.headers().clone();
@ -1419,6 +1429,9 @@ pub async fn post_handler(
.await
.is_err()
{
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::UPLOAD_LIMIT_COND])
.inc();
return json_error_with_query(
StatusCode::TOO_MANY_REQUESTS,
"upload limit exceeded",
@ -1426,9 +1439,11 @@ pub async fn post_handler(
);
}
}
state
let new_val = state
.inflight_upload_bytes
.fetch_add(content_length, Ordering::Relaxed);
.fetch_add(content_length, Ordering::Relaxed)
+ content_length;
metrics::INFLIGHT_UPLOAD_SIZE.set(new_val);
}
// RAII guard to release upload throttle on any exit path
@ -1437,6 +1452,7 @@ pub async fn post_handler(
counter: &state.inflight_upload_bytes,
bytes: content_length,
notify: &state.upload_notify,
metric: &metrics::INFLIGHT_UPLOAD_SIZE,
})
} else {
None
@ -1797,9 +1813,6 @@ pub async fn post_handler(
},
content_md5: Some(content_md5_value.clone()),
};
metrics::REQUEST_DURATION
.with_label_values(&["write"])
.observe(start.elapsed().as_secs_f64());
let etag = n.etag();
let etag_header = if etag.starts_with('"') {
etag.clone()
@ -1820,11 +1833,16 @@ pub async fn post_handler(
Err(crate::storage::volume::VolumeError::ReadOnly) => {
json_error_with_query(StatusCode::FORBIDDEN, "volume is read-only", Some(&query))
}
Err(e) => json_error_with_query(
StatusCode::INTERNAL_SERVER_ERROR,
format!("write error: {}", e),
Some(&query),
),
Err(e) => {
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::ERROR_WRITE_TO_LOCAL_DISK])
.inc();
json_error_with_query(
StatusCode::INTERNAL_SERVER_ERROR,
format!("write error: {}", e),
Some(&query),
)
}
};
// _upload_guard drops here, releasing inflight bytes
@ -1844,11 +1862,6 @@ pub async fn delete_handler(
State(state): State<Arc<VolumeServerState>>,
request: Request<Body>,
) -> Response {
let start = std::time::Instant::now();
metrics::REQUEST_COUNTER
.with_label_values(&["delete"])
.inc();
let path = request.uri().path().to_string();
let del_query = request.uri().query().unwrap_or("").to_string();
let headers = request.headers().clone();
@ -1991,9 +2004,6 @@ pub async fn delete_handler(
Some(&del_query),
);
}
metrics::REQUEST_DURATION
.with_label_values(&["delete"])
.observe(start.elapsed().as_secs_f64());
// Return the manifest's declared size (matches Go behavior)
let result = DeleteResult {
size: manifest.size as i64,
@ -2039,9 +2049,6 @@ pub async fn delete_handler(
match delete_result {
Ok(size) => {
metrics::REQUEST_DURATION
.with_label_values(&["delete"])
.observe(start.elapsed().as_secs_f64());
let result = DeleteResult {
size: size.0 as i64,
};

33
seaweed-volume/src/server/heartbeat.rs

@ -422,6 +422,10 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb
let mut max_file_key = NeedleId(0);
let mut max_volume_counts: HashMap<String, u32> = HashMap::new();
// Collect per-collection disk size and read-only counts for metrics
let mut disk_sizes: HashMap<String, (u64, u64)> = HashMap::new(); // (normal, deleted)
let mut ro_counts: HashMap<String, u32> = HashMap::new();
for loc in &store.locations {
let disk_type_str = loc.disk_type.to_string();
let max_count = loc
@ -435,6 +439,16 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb
max_file_key = cur_max;
}
// Track disk size by collection
let entry = disk_sizes.entry(vol.collection.clone()).or_insert((0, 0));
entry.0 += vol.content_size();
entry.1 += vol.deleted_size();
// Track read-only count by collection
if vol.is_read_only() {
*ro_counts.entry(vol.collection.clone()).or_insert(0) += 1;
}
volumes.push(master_pb::VolumeInformationMessage {
id: vol.id.0,
size: vol.content_size(),
@ -453,6 +467,25 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb
});
}
}
// Update disk size and read-only gauges
for (col, (normal, deleted)) in &disk_sizes {
crate::metrics::DISK_SIZE_GAUGE
.with_label_values(&[col, "normal"])
.set(*normal as f64);
crate::metrics::DISK_SIZE_GAUGE
.with_label_values(&[col, "deleted_bytes"])
.set(*deleted as f64);
}
for (col, count) in &ro_counts {
crate::metrics::READ_ONLY_VOLUME_GAUGE
.with_label_values(&[col, "volume"])
.set(*count as f64);
}
// Update max volumes gauge
let total_max: i64 = max_volume_counts.values().map(|v| *v as i64).sum();
crate::metrics::MAX_VOLUMES.set(total_max);
let has_no_volumes = volumes.is_empty();
let (location_uuids, disk_tags) = collect_location_metadata(store);

38
seaweed-volume/src/server/volume_server.rs

@ -137,7 +137,12 @@ async fn common_headers_middleware(request: Request, next: Next) -> Response {
/// Matches Go's privateStoreHandler: GET/HEAD → read, POST/PUT → write,
/// DELETE → delete, OPTIONS → CORS headers, anything else → 400.
async fn admin_store_handler(state: State<Arc<VolumeServerState>>, request: Request) -> Response {
match request.method().clone() {
let start = std::time::Instant::now();
let method_str = request.method().as_str().to_string();
crate::metrics::INFLIGHT_REQUESTS_GAUGE
.with_label_values(&[&method_str])
.inc();
let response = match request.method().clone() {
Method::GET | Method::HEAD => {
handlers::get_or_head_handler_from_request(state, request).await
}
@ -149,20 +154,45 @@ async fn admin_store_handler(state: State<Arc<VolumeServerState>>, request: Requ
format!("{{\"error\":\"unsupported method {}\"}}", request.method()),
)
.into_response(),
}
};
crate::metrics::INFLIGHT_REQUESTS_GAUGE
.with_label_values(&[&method_str])
.dec();
crate::metrics::REQUEST_COUNTER
.with_label_values(&[&method_str, response.status().as_str()])
.inc();
crate::metrics::REQUEST_DURATION
.with_label_values(&[&method_str])
.observe(start.elapsed().as_secs_f64());
response
}
/// Public store handler — dispatches based on HTTP method.
/// Matches Go's publicReadOnlyHandler: GET/HEAD → read, OPTIONS → CORS,
/// anything else → 200 (passthrough no-op).
async fn public_store_handler(state: State<Arc<VolumeServerState>>, request: Request) -> Response {
match request.method().clone() {
let start = std::time::Instant::now();
let method_str = request.method().as_str().to_string();
crate::metrics::INFLIGHT_REQUESTS_GAUGE
.with_label_values(&[&method_str])
.inc();
let response = match request.method().clone() {
Method::GET | Method::HEAD => {
handlers::get_or_head_handler_from_request(state, request).await
}
Method::OPTIONS => public_options_response(),
_ => StatusCode::OK.into_response(),
}
};
crate::metrics::INFLIGHT_REQUESTS_GAUGE
.with_label_values(&[&method_str])
.dec();
crate::metrics::REQUEST_COUNTER
.with_label_values(&[&method_str, response.status().as_str()])
.inc();
crate::metrics::REQUEST_DURATION
.with_label_values(&[&method_str])
.observe(start.elapsed().as_secs_f64());
response
}
/// Build OPTIONS response for admin port.

32
seaweed-volume/src/storage/disk_location.rs

@ -175,6 +175,9 @@ impl DiskLocation {
Version::current(),
) {
Ok(v) => {
crate::metrics::VOLUME_GAUGE
.with_label_values(&[&collection, "volume"])
.inc();
self.volumes.insert(vid, v);
}
Err(e) => {
@ -303,7 +306,11 @@ impl DiskLocation {
/// Add a volume to this location.
pub fn set_volume(&mut self, vid: VolumeId, volume: Volume) {
let collection = volume.collection.clone();
self.volumes.insert(vid, volume);
crate::metrics::VOLUME_GAUGE
.with_label_values(&[&collection, "volume"])
.inc();
}
/// Create a new volume in this location.
@ -328,6 +335,9 @@ impl DiskLocation {
preallocate,
version,
)?;
crate::metrics::VOLUME_GAUGE
.with_label_values(&[collection, "volume"])
.inc();
self.volumes.insert(vid, v);
Ok(())
}
@ -335,6 +345,9 @@ impl DiskLocation {
/// Remove and close a volume.
pub fn unload_volume(&mut self, vid: VolumeId) -> Option<Volume> {
if let Some(mut v) = self.volumes.remove(&vid) {
crate::metrics::VOLUME_GAUGE
.with_label_values(&[&v.collection, "volume"])
.dec();
v.close();
Some(v)
} else {
@ -345,6 +358,9 @@ impl DiskLocation {
/// Remove, close, and delete all files for a volume.
pub fn delete_volume(&mut self, vid: VolumeId) -> Result<(), VolumeError> {
if let Some(mut v) = self.volumes.remove(&vid) {
crate::metrics::VOLUME_GAUGE
.with_label_values(&[&v.collection, "volume"])
.dec();
v.destroy()?;
Ok(())
} else {
@ -414,6 +430,7 @@ impl DiskLocation {
if total == 0 {
return;
}
let used = total.saturating_sub(free);
let is_low = match &self.min_free_space {
MinFreeSpace::Percent(pct) => {
let free_pct = (free as f64 / total as f64) * 100.0;
@ -423,6 +440,21 @@ impl DiskLocation {
};
self.is_disk_space_low.store(is_low, Ordering::Relaxed);
self.available_space.store(free, Ordering::Relaxed);
// Update resource gauges
crate::metrics::RESOURCE_GAUGE
.with_label_values(&[&self.directory, "all"])
.set(total as f64);
crate::metrics::RESOURCE_GAUGE
.with_label_values(&[&self.directory, "used"])
.set(used as f64);
crate::metrics::RESOURCE_GAUGE
.with_label_values(&[&self.directory, "free"])
.set(free as f64);
// "avail" is same as "free" for us (Go subtracts reserved blocks but we use statvfs f_bavail)
crate::metrics::RESOURCE_GAUGE
.with_label_values(&[&self.directory, "avail"])
.set(free as f64);
}
/// Close all volumes.

7
seaweed-volume/src/storage/store.rs

@ -418,6 +418,9 @@ impl Store {
for &shard_id in shard_ids {
let shard = EcVolumeShard::new(&dir, collection, vid, shard_id as u8);
ec_vol.add_shard(shard).map_err(|e| VolumeError::Io(e))?;
crate::metrics::VOLUME_GAUGE
.with_label_values(&[collection, "ec_shards"])
.inc();
}
Ok(())
@ -426,8 +429,12 @@ impl Store {
/// Unmount EC shards for a volume.
pub fn unmount_ec_shards(&mut self, vid: VolumeId, shard_ids: &[u32]) {
if let Some(ec_vol) = self.ec_volumes.get_mut(&vid) {
let collection = ec_vol.collection.clone();
for &shard_id in shard_ids {
ec_vol.remove_shard(shard_id as u8);
crate::metrics::VOLUME_GAUGE
.with_label_values(&[&collection, "ec_shards"])
.dec();
}
if ec_vol.shard_count() == 0 {
let mut vol = self.ec_volumes.remove(&vid).unwrap();

Loading…
Cancel
Save