From f189844776d1f29548d82646b971b4329cc648c0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2026 23:28:05 -0700 Subject: [PATCH] feat: add Go-compatible Prometheus metrics to Rust volume server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the metrics gap between Rust and Go volume servers (8 → 23 metrics). Adds handler counters, vacuuming histograms, volume/disk gauges, inflight request tracking, and concurrent limit gauges. Centralizes request counting in store handlers instead of per-handler. --- seaweed-volume/src/main.rs | 12 ++ seaweed-volume/src/metrics.rs | 207 ++++++++++++++++---- seaweed-volume/src/server/grpc_server.rs | 19 +- seaweed-volume/src/server/handlers.rs | 95 ++++----- seaweed-volume/src/server/heartbeat.rs | 33 ++++ seaweed-volume/src/server/volume_server.rs | 38 +++- seaweed-volume/src/storage/disk_location.rs | 32 +++ seaweed-volume/src/storage/store.rs | 7 + 8 files changed, 354 insertions(+), 89 deletions(-) diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index f53b894f8..f1d3c5756 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -152,6 +152,18 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box Vec { + let mut buckets = Vec::with_capacity(count); + let mut val = start; + for _ in 0..count { + buckets.push(val); + val *= factor; + } + buckets +} + +// Handler counter type constants (matches Go's metrics_names.go). +pub const WRITE_TO_LOCAL_DISK: &str = "writeToLocalDisk"; +pub const ERROR_WRITE_TO_LOCAL_DISK: &str = "errorWriteToLocalDisk"; +pub const ERROR_WRITE_TO_REPLICAS: &str = "errorWriteToReplicas"; +pub const ERROR_GET_NOT_FOUND: &str = "errorGetNotFound"; +pub const ERROR_GET_INTERNAL: &str = "errorGetInternal"; +pub const ERROR_CRC: &str = "errorCRC"; +pub const ERROR_SIZE_MISMATCH: &str = "errorSizeMismatch"; +pub const ERROR_INDEX_OUT_OF_RANGE: &str = "errorIndexOutOfRange"; +pub const DOWNLOAD_LIMIT_COND: &str = "downloadLimitCondition"; +pub const UPLOAD_LIMIT_COND: &str = "uploadLimitCondition"; + static REGISTER_METRICS: Once = Once::new(); /// Register all metrics with the custom registry. /// Call this once at startup. pub fn register_metrics() { REGISTER_METRICS.call_once(|| { - REGISTRY - .register(Box::new(REQUEST_COUNTER.clone())) - .expect("REQUEST_COUNTER registered"); - REGISTRY - .register(Box::new(REQUEST_DURATION.clone())) - .expect("REQUEST_DURATION registered"); - REGISTRY - .register(Box::new(VOLUMES_TOTAL.clone())) - .expect("VOLUMES_TOTAL registered"); - REGISTRY - .register(Box::new(MAX_VOLUMES.clone())) - .expect("MAX_VOLUMES registered"); - REGISTRY - .register(Box::new(DISK_SIZE_BYTES.clone())) - .expect("DISK_SIZE_BYTES registered"); - REGISTRY - .register(Box::new(DISK_FREE_BYTES.clone())) - .expect("DISK_FREE_BYTES registered"); - REGISTRY - .register(Box::new(INFLIGHT_REQUESTS.clone())) - .expect("INFLIGHT_REQUESTS registered"); - REGISTRY - .register(Box::new(VOLUME_FILE_COUNT.clone())) - .expect("VOLUME_FILE_COUNT registered"); + let metrics: Vec> = vec![ + // New Go-compatible metrics + Box::new(REQUEST_COUNTER.clone()), + Box::new(REQUEST_DURATION.clone()), + Box::new(HANDLER_COUNTER.clone()), + Box::new(VACUUMING_COMPACT_COUNTER.clone()), + Box::new(VACUUMING_COMMIT_COUNTER.clone()), + Box::new(VACUUMING_HISTOGRAM.clone()), + Box::new(VOLUME_GAUGE.clone()), + Box::new(READ_ONLY_VOLUME_GAUGE.clone()), + Box::new(MAX_VOLUMES.clone()), + Box::new(DISK_SIZE_GAUGE.clone()), + Box::new(RESOURCE_GAUGE.clone()), + Box::new(INFLIGHT_REQUESTS_GAUGE.clone()), + Box::new(CONCURRENT_DOWNLOAD_LIMIT.clone()), + Box::new(CONCURRENT_UPLOAD_LIMIT.clone()), + Box::new(INFLIGHT_DOWNLOAD_SIZE.clone()), + Box::new(INFLIGHT_UPLOAD_SIZE.clone()), + // Legacy metrics + Box::new(VOLUMES_TOTAL.clone()), + Box::new(DISK_SIZE_BYTES.clone()), + Box::new(DISK_FREE_BYTES.clone()), + Box::new(INFLIGHT_REQUESTS.clone()), + Box::new(VOLUME_FILE_COUNT.clone()), + ]; + for m in metrics { + REGISTRY.register(m).expect("metric registered"); + } }); } @@ -157,9 +284,9 @@ mod tests { #[test] fn test_gather_metrics_returns_text() { register_metrics(); - REQUEST_COUNTER.with_label_values(&["read"]).inc(); + REQUEST_COUNTER.with_label_values(&["GET", "200"]).inc(); let output = gather_metrics(); - assert!(output.contains("volume_server_request_counter")); + assert!(output.contains("SeaweedFS_volumeServer_request_total")); } #[test] @@ -209,7 +336,7 @@ mod tests { .unwrap(); let body = captured.lock().unwrap().clone().unwrap(); - assert!(body.contains("volume_server_request_counter")); + assert!(body.contains("SeaweedFS_volumeServer_request_total")); server.abort(); } diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 386eb4ed2..9b115f16d 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -314,6 +314,7 @@ impl VolumeServer for VolumeGrpcService { let (tx, rx) = tokio::sync::mpsc::channel(16); tokio::task::spawn_blocking(move || { + let compact_start = std::time::Instant::now(); let report_interval: i64 = 128 * 1024 * 1024; let next_report = std::sync::atomic::AtomicI64::new(report_interval); @@ -337,6 +338,14 @@ impl VolumeServer for VolumeGrpcService { }) }; + let success = result.is_ok(); + crate::metrics::VACUUMING_HISTOGRAM + .with_label_values(&["compact"]) + .observe(compact_start.elapsed().as_secs_f64()); + crate::metrics::VACUUMING_COMPACT_COUNTER + .with_label_values(&[if success { "true" } else { "false" }]) + .inc(); + if let Err(e) = result { let _ = tx.blocking_send(Err(Status::internal(e))); } @@ -354,8 +363,16 @@ impl VolumeServer for VolumeGrpcService { ) -> Result, Status> { self.state.check_maintenance()?; let vid = VolumeId(request.into_inner().volume_id); + let commit_start = std::time::Instant::now(); let mut store = self.state.store.write().unwrap(); - match store.commit_compact_volume(vid) { + let result = store.commit_compact_volume(vid); + crate::metrics::VACUUMING_HISTOGRAM + .with_label_values(&["commit"]) + .observe(commit_start.elapsed().as_secs_f64()); + crate::metrics::VACUUMING_COMMIT_COUNTER + .with_label_values(&[if result.is_ok() { "true" } else { "false" }]) + .inc(); + match result { Ok((is_read_only, volume_size)) => Ok(Response::new( volume_server_pb::VacuumVolumeCommitResponse { is_read_only, diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index 985bc9936..ea3742369 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -29,11 +29,13 @@ struct InflightGuard<'a> { counter: &'a std::sync::atomic::AtomicI64, bytes: i64, notify: &'a tokio::sync::Notify, + metric: &'a prometheus::IntGauge, } impl<'a> Drop for InflightGuard<'a> { fn drop(&mut self) { - self.counter.fetch_sub(self.bytes, Ordering::Relaxed); + let new_val = self.counter.fetch_sub(self.bytes, Ordering::Relaxed) - self.bytes; + self.metric.set(new_val); self.notify.notify_waiters(); } } @@ -63,9 +65,12 @@ impl http_body::Body for TrackedBody { impl Drop for TrackedBody { fn drop(&mut self) { - self.state + let new_val = self + .state .inflight_download_bytes - .fetch_sub(self.bytes, Ordering::Relaxed); + .fetch_sub(self.bytes, Ordering::Relaxed) + - self.bytes; + metrics::INFLIGHT_DOWNLOAD_SIZE.set(new_val); self.state.download_notify.notify_waiters(); } } @@ -177,8 +182,11 @@ impl http_body::Body for StreamingBody { impl Drop for StreamingBody { fn drop(&mut self) { if let Some(ref st) = self.state { - st.inflight_download_bytes - .fetch_sub(self.tracked_bytes, Ordering::Relaxed); + let new_val = st + .inflight_download_bytes + .fetch_sub(self.tracked_bytes, Ordering::Relaxed) + - self.tracked_bytes; + metrics::INFLIGHT_DOWNLOAD_SIZE.set(new_val); st.download_notify.notify_waiters(); } } @@ -606,9 +614,6 @@ async fn get_or_head_handler_inner( query: ReadQueryParams, request: Request, ) -> Response { - let start = std::time::Instant::now(); - metrics::REQUEST_COUNTER.with_label_values(&["read"]).inc(); - let path = request.uri().path().to_string(); let method = request.method().clone(); @@ -699,6 +704,9 @@ async fn get_or_head_handler_inner( .await .is_err() { + metrics::HANDLER_COUNTER + .with_label_values(&[metrics::DOWNLOAD_LIMIT_COND]) + .inc(); return (StatusCode::TOO_MANY_REQUESTS, "download limit exceeded").into_response(); } } @@ -730,12 +738,21 @@ async fn get_or_head_handler_inner( let stream_info = match stream_info { Ok(info) => Some(info), Err(crate::storage::volume::VolumeError::NotFound) => { + metrics::HANDLER_COUNTER + .with_label_values(&[metrics::ERROR_GET_NOT_FOUND]) + .inc(); return StatusCode::NOT_FOUND.into_response(); } Err(crate::storage::volume::VolumeError::Deleted) => { + metrics::HANDLER_COUNTER + .with_label_values(&[metrics::ERROR_GET_NOT_FOUND]) + .inc(); return StatusCode::NOT_FOUND.into_response(); } Err(e) => { + metrics::HANDLER_COUNTER + .with_label_values(&[metrics::ERROR_GET_INTERNAL]) + .inc(); return ( StatusCode::INTERNAL_SERVER_ERROR, format!("read error: {}", e), @@ -990,15 +1007,13 @@ async fn get_or_head_handler_inner( info.data_size.to_string().parse().unwrap(), ); - metrics::REQUEST_DURATION - .with_label_values(&["read"]) - .observe(start.elapsed().as_secs_f64()); - let tracked_bytes = info.data_size as i64; let tracking_state = if download_guard.is_some() { - state + let new_val = state .inflight_download_bytes - .fetch_add(tracked_bytes, Ordering::Relaxed); + .fetch_add(tracked_bytes, Ordering::Relaxed) + + tracked_bytes; + metrics::INFLIGHT_DOWNLOAD_SIZE.set(new_val); Some(state.clone()) } else { None @@ -1097,16 +1112,14 @@ async fn get_or_head_handler_inner( return (StatusCode::OK, response_headers).into_response(); } - metrics::REQUEST_DURATION - .with_label_values(&["read"]) - .observe(start.elapsed().as_secs_f64()); - // If download throttling is active, wrap the body so we track when it's fully sent if download_guard.is_some() { let data_len = data.len() as i64; - state + let new_val = state .inflight_download_bytes - .fetch_add(data_len, Ordering::Relaxed); + .fetch_add(data_len, Ordering::Relaxed) + + data_len; + metrics::INFLIGHT_DOWNLOAD_SIZE.set(new_val); let tracked_body = TrackedBody { data, state: state.clone(), @@ -1364,9 +1377,6 @@ pub async fn post_handler( State(state): State>, request: Request, ) -> Response { - let start = std::time::Instant::now(); - metrics::REQUEST_COUNTER.with_label_values(&["write"]).inc(); - let path = request.uri().path().to_string(); let query = request.uri().query().unwrap_or("").to_string(); let headers = request.headers().clone(); @@ -1419,6 +1429,9 @@ pub async fn post_handler( .await .is_err() { + metrics::HANDLER_COUNTER + .with_label_values(&[metrics::UPLOAD_LIMIT_COND]) + .inc(); return json_error_with_query( StatusCode::TOO_MANY_REQUESTS, "upload limit exceeded", @@ -1426,9 +1439,11 @@ pub async fn post_handler( ); } } - state + let new_val = state .inflight_upload_bytes - .fetch_add(content_length, Ordering::Relaxed); + .fetch_add(content_length, Ordering::Relaxed) + + content_length; + metrics::INFLIGHT_UPLOAD_SIZE.set(new_val); } // RAII guard to release upload throttle on any exit path @@ -1437,6 +1452,7 @@ pub async fn post_handler( counter: &state.inflight_upload_bytes, bytes: content_length, notify: &state.upload_notify, + metric: &metrics::INFLIGHT_UPLOAD_SIZE, }) } else { None @@ -1797,9 +1813,6 @@ pub async fn post_handler( }, content_md5: Some(content_md5_value.clone()), }; - metrics::REQUEST_DURATION - .with_label_values(&["write"]) - .observe(start.elapsed().as_secs_f64()); let etag = n.etag(); let etag_header = if etag.starts_with('"') { etag.clone() @@ -1820,11 +1833,16 @@ pub async fn post_handler( Err(crate::storage::volume::VolumeError::ReadOnly) => { json_error_with_query(StatusCode::FORBIDDEN, "volume is read-only", Some(&query)) } - Err(e) => json_error_with_query( - StatusCode::INTERNAL_SERVER_ERROR, - format!("write error: {}", e), - Some(&query), - ), + Err(e) => { + metrics::HANDLER_COUNTER + .with_label_values(&[metrics::ERROR_WRITE_TO_LOCAL_DISK]) + .inc(); + json_error_with_query( + StatusCode::INTERNAL_SERVER_ERROR, + format!("write error: {}", e), + Some(&query), + ) + } }; // _upload_guard drops here, releasing inflight bytes @@ -1844,11 +1862,6 @@ pub async fn delete_handler( State(state): State>, request: Request, ) -> Response { - let start = std::time::Instant::now(); - metrics::REQUEST_COUNTER - .with_label_values(&["delete"]) - .inc(); - let path = request.uri().path().to_string(); let del_query = request.uri().query().unwrap_or("").to_string(); let headers = request.headers().clone(); @@ -1991,9 +2004,6 @@ pub async fn delete_handler( Some(&del_query), ); } - metrics::REQUEST_DURATION - .with_label_values(&["delete"]) - .observe(start.elapsed().as_secs_f64()); // Return the manifest's declared size (matches Go behavior) let result = DeleteResult { size: manifest.size as i64, @@ -2039,9 +2049,6 @@ pub async fn delete_handler( match delete_result { Ok(size) => { - metrics::REQUEST_DURATION - .with_label_values(&["delete"]) - .observe(start.elapsed().as_secs_f64()); let result = DeleteResult { size: size.0 as i64, }; diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index b84651ad1..48d1a9355 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -422,6 +422,10 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb let mut max_file_key = NeedleId(0); let mut max_volume_counts: HashMap = HashMap::new(); + // Collect per-collection disk size and read-only counts for metrics + let mut disk_sizes: HashMap = HashMap::new(); // (normal, deleted) + let mut ro_counts: HashMap = HashMap::new(); + for loc in &store.locations { let disk_type_str = loc.disk_type.to_string(); let max_count = loc @@ -435,6 +439,16 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb max_file_key = cur_max; } + // Track disk size by collection + let entry = disk_sizes.entry(vol.collection.clone()).or_insert((0, 0)); + entry.0 += vol.content_size(); + entry.1 += vol.deleted_size(); + + // Track read-only count by collection + if vol.is_read_only() { + *ro_counts.entry(vol.collection.clone()).or_insert(0) += 1; + } + volumes.push(master_pb::VolumeInformationMessage { id: vol.id.0, size: vol.content_size(), @@ -453,6 +467,25 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb }); } } + + // Update disk size and read-only gauges + for (col, (normal, deleted)) in &disk_sizes { + crate::metrics::DISK_SIZE_GAUGE + .with_label_values(&[col, "normal"]) + .set(*normal as f64); + crate::metrics::DISK_SIZE_GAUGE + .with_label_values(&[col, "deleted_bytes"]) + .set(*deleted as f64); + } + for (col, count) in &ro_counts { + crate::metrics::READ_ONLY_VOLUME_GAUGE + .with_label_values(&[col, "volume"]) + .set(*count as f64); + } + // Update max volumes gauge + let total_max: i64 = max_volume_counts.values().map(|v| *v as i64).sum(); + crate::metrics::MAX_VOLUMES.set(total_max); + let has_no_volumes = volumes.is_empty(); let (location_uuids, disk_tags) = collect_location_metadata(store); diff --git a/seaweed-volume/src/server/volume_server.rs b/seaweed-volume/src/server/volume_server.rs index a3c043c26..1c4fc37e8 100644 --- a/seaweed-volume/src/server/volume_server.rs +++ b/seaweed-volume/src/server/volume_server.rs @@ -137,7 +137,12 @@ async fn common_headers_middleware(request: Request, next: Next) -> Response { /// Matches Go's privateStoreHandler: GET/HEAD → read, POST/PUT → write, /// DELETE → delete, OPTIONS → CORS headers, anything else → 400. async fn admin_store_handler(state: State>, request: Request) -> Response { - match request.method().clone() { + let start = std::time::Instant::now(); + let method_str = request.method().as_str().to_string(); + crate::metrics::INFLIGHT_REQUESTS_GAUGE + .with_label_values(&[&method_str]) + .inc(); + let response = match request.method().clone() { Method::GET | Method::HEAD => { handlers::get_or_head_handler_from_request(state, request).await } @@ -149,20 +154,45 @@ async fn admin_store_handler(state: State>, request: Requ format!("{{\"error\":\"unsupported method {}\"}}", request.method()), ) .into_response(), - } + }; + crate::metrics::INFLIGHT_REQUESTS_GAUGE + .with_label_values(&[&method_str]) + .dec(); + crate::metrics::REQUEST_COUNTER + .with_label_values(&[&method_str, response.status().as_str()]) + .inc(); + crate::metrics::REQUEST_DURATION + .with_label_values(&[&method_str]) + .observe(start.elapsed().as_secs_f64()); + response } /// Public store handler — dispatches based on HTTP method. /// Matches Go's publicReadOnlyHandler: GET/HEAD → read, OPTIONS → CORS, /// anything else → 200 (passthrough no-op). async fn public_store_handler(state: State>, request: Request) -> Response { - match request.method().clone() { + let start = std::time::Instant::now(); + let method_str = request.method().as_str().to_string(); + crate::metrics::INFLIGHT_REQUESTS_GAUGE + .with_label_values(&[&method_str]) + .inc(); + let response = match request.method().clone() { Method::GET | Method::HEAD => { handlers::get_or_head_handler_from_request(state, request).await } Method::OPTIONS => public_options_response(), _ => StatusCode::OK.into_response(), - } + }; + crate::metrics::INFLIGHT_REQUESTS_GAUGE + .with_label_values(&[&method_str]) + .dec(); + crate::metrics::REQUEST_COUNTER + .with_label_values(&[&method_str, response.status().as_str()]) + .inc(); + crate::metrics::REQUEST_DURATION + .with_label_values(&[&method_str]) + .observe(start.elapsed().as_secs_f64()); + response } /// Build OPTIONS response for admin port. diff --git a/seaweed-volume/src/storage/disk_location.rs b/seaweed-volume/src/storage/disk_location.rs index 7fa89248d..cee6b4fdb 100644 --- a/seaweed-volume/src/storage/disk_location.rs +++ b/seaweed-volume/src/storage/disk_location.rs @@ -175,6 +175,9 @@ impl DiskLocation { Version::current(), ) { Ok(v) => { + crate::metrics::VOLUME_GAUGE + .with_label_values(&[&collection, "volume"]) + .inc(); self.volumes.insert(vid, v); } Err(e) => { @@ -303,7 +306,11 @@ impl DiskLocation { /// Add a volume to this location. pub fn set_volume(&mut self, vid: VolumeId, volume: Volume) { + let collection = volume.collection.clone(); self.volumes.insert(vid, volume); + crate::metrics::VOLUME_GAUGE + .with_label_values(&[&collection, "volume"]) + .inc(); } /// Create a new volume in this location. @@ -328,6 +335,9 @@ impl DiskLocation { preallocate, version, )?; + crate::metrics::VOLUME_GAUGE + .with_label_values(&[collection, "volume"]) + .inc(); self.volumes.insert(vid, v); Ok(()) } @@ -335,6 +345,9 @@ impl DiskLocation { /// Remove and close a volume. pub fn unload_volume(&mut self, vid: VolumeId) -> Option { if let Some(mut v) = self.volumes.remove(&vid) { + crate::metrics::VOLUME_GAUGE + .with_label_values(&[&v.collection, "volume"]) + .dec(); v.close(); Some(v) } else { @@ -345,6 +358,9 @@ impl DiskLocation { /// Remove, close, and delete all files for a volume. pub fn delete_volume(&mut self, vid: VolumeId) -> Result<(), VolumeError> { if let Some(mut v) = self.volumes.remove(&vid) { + crate::metrics::VOLUME_GAUGE + .with_label_values(&[&v.collection, "volume"]) + .dec(); v.destroy()?; Ok(()) } else { @@ -414,6 +430,7 @@ impl DiskLocation { if total == 0 { return; } + let used = total.saturating_sub(free); let is_low = match &self.min_free_space { MinFreeSpace::Percent(pct) => { let free_pct = (free as f64 / total as f64) * 100.0; @@ -423,6 +440,21 @@ impl DiskLocation { }; self.is_disk_space_low.store(is_low, Ordering::Relaxed); self.available_space.store(free, Ordering::Relaxed); + + // Update resource gauges + crate::metrics::RESOURCE_GAUGE + .with_label_values(&[&self.directory, "all"]) + .set(total as f64); + crate::metrics::RESOURCE_GAUGE + .with_label_values(&[&self.directory, "used"]) + .set(used as f64); + crate::metrics::RESOURCE_GAUGE + .with_label_values(&[&self.directory, "free"]) + .set(free as f64); + // "avail" is same as "free" for us (Go subtracts reserved blocks but we use statvfs f_bavail) + crate::metrics::RESOURCE_GAUGE + .with_label_values(&[&self.directory, "avail"]) + .set(free as f64); } /// Close all volumes. diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index 54c86bdef..cad54bc33 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -418,6 +418,9 @@ impl Store { for &shard_id in shard_ids { let shard = EcVolumeShard::new(&dir, collection, vid, shard_id as u8); ec_vol.add_shard(shard).map_err(|e| VolumeError::Io(e))?; + crate::metrics::VOLUME_GAUGE + .with_label_values(&[collection, "ec_shards"]) + .inc(); } Ok(()) @@ -426,8 +429,12 @@ impl Store { /// Unmount EC shards for a volume. pub fn unmount_ec_shards(&mut self, vid: VolumeId, shard_ids: &[u32]) { if let Some(ec_vol) = self.ec_volumes.get_mut(&vid) { + let collection = ec_vol.collection.clone(); for &shard_id in shard_ids { ec_vol.remove_shard(shard_id as u8); + crate::metrics::VOLUME_GAUGE + .with_label_values(&[&collection, "ec_shards"]) + .dec(); } if ec_vol.shard_count() == 0 { let mut vol = self.ec_volumes.remove(&vid).unwrap();