diff --git a/seaweed-volume/src/metrics.rs b/seaweed-volume/src/metrics.rs index c857fe3d6..95ddd7309 100644 --- a/seaweed-volume/src/metrics.rs +++ b/seaweed-volume/src/metrics.rs @@ -198,6 +198,15 @@ pub const ERROR_WRITE_TO_LOCAL_DISK: &str = "errorWriteToLocalDisk"; pub const ERROR_UNMARSHAL_PAIRS: &str = "errorUnmarshalPairs"; pub const ERROR_WRITE_TO_REPLICAS: &str = "errorWriteToReplicas"; +// Go volume heartbeat metric label values. +pub const READ_ONLY_LABEL_IS_READ_ONLY: &str = "IsReadOnly"; +pub const READ_ONLY_LABEL_NO_WRITE_OR_DELETE: &str = "noWriteOrDelete"; +pub const READ_ONLY_LABEL_NO_WRITE_CAN_DELETE: &str = "noWriteCanDelete"; +pub const READ_ONLY_LABEL_IS_DISK_SPACE_LOW: &str = "isDiskSpaceLow"; +pub const DISK_SIZE_LABEL_NORMAL: &str = "normal"; +pub const DISK_SIZE_LABEL_DELETED_BYTES: &str = "deleted_bytes"; +pub const DISK_SIZE_LABEL_EC: &str = "ec"; + static REGISTER_METRICS: Once = Once::new(); /// Register all metrics with the custom registry. diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index 1228c7c6c..33cb5d67c 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -591,20 +591,38 @@ fn collect_location_metadata(store: &Store) -> (Vec, Vec master_pb::Heartbeat { + #[derive(Default)] + struct ReadOnlyCounts { + is_read_only: u32, + no_write_or_delete: u32, + no_write_can_delete: u32, + is_disk_space_low: u32, + } + let mut volumes = Vec::new(); let mut max_file_key = NeedleId(0); let mut max_volume_counts: HashMap = HashMap::new(); // Collect per-collection disk size and read-only counts for metrics let mut disk_sizes: HashMap = HashMap::new(); // (normal, deleted) - let mut ro_counts: HashMap = HashMap::new(); + let mut ro_counts: HashMap = HashMap::new(); - for loc in &store.locations { + for (disk_id, loc) in store.locations.iter().enumerate() { let disk_type_str = loc.disk_type.to_string(); - let max_count = loc - .max_volume_count - .load(std::sync::atomic::Ordering::Relaxed); - *max_volume_counts.entry(disk_type_str).or_insert(0) += max_count as u32; + let mut effective_max_count = loc.max_volume_count.load(Ordering::Relaxed); + if loc.is_disk_space_low.load(Ordering::Relaxed) { + let used_slots = loc.volumes_len() as i32 + + ((loc.ec_shard_count() + + crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT + - 1) + / crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT) + as i32; + effective_max_count = used_slots; + } + if effective_max_count < 0 { + effective_max_count = 0; + } + *max_volume_counts.entry(disk_type_str).or_insert(0) += effective_max_count as u32; for (_, vol) in loc.iter_volumes() { let cur_max = vol.max_file_key(); @@ -617,9 +635,18 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb entry.0 += vol.content_size(); entry.1 += vol.deleted_size(); - // Track read-only count by collection + let read_only = ro_counts.entry(vol.collection.clone()).or_default(); if vol.is_read_only() { - *ro_counts.entry(vol.collection.clone()).or_insert(0) += 1; + read_only.is_read_only += 1; + if vol.is_no_write_or_delete() { + read_only.no_write_or_delete += 1; + } + if vol.is_no_write_can_delete() { + read_only.no_write_can_delete += 1; + } + if loc.is_disk_space_low.load(Ordering::Relaxed) { + read_only.is_disk_space_low += 1; + } } volumes.push(master_pb::VolumeInformationMessage { @@ -636,6 +663,7 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb compact_revision: vol.last_compact_revision() as u32, modified_at_second: vol.last_modified_ts() as i64, disk_type: loc.disk_type.to_string(), + disk_id: disk_id as u32, ..Default::default() }); } @@ -644,16 +672,25 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb // Update disk size and read-only gauges for (col, (normal, deleted)) in &disk_sizes { crate::metrics::DISK_SIZE_GAUGE - .with_label_values(&[col, "normal"]) + .with_label_values(&[col, crate::metrics::DISK_SIZE_LABEL_NORMAL]) .set(*normal as f64); crate::metrics::DISK_SIZE_GAUGE - .with_label_values(&[col, "deleted_bytes"]) + .with_label_values(&[col, crate::metrics::DISK_SIZE_LABEL_DELETED_BYTES]) .set(*deleted as f64); } - for (col, count) in &ro_counts { + for (col, counts) in &ro_counts { + crate::metrics::READ_ONLY_VOLUME_GAUGE + .with_label_values(&[col, crate::metrics::READ_ONLY_LABEL_IS_READ_ONLY]) + .set(counts.is_read_only as f64); + crate::metrics::READ_ONLY_VOLUME_GAUGE + .with_label_values(&[col, crate::metrics::READ_ONLY_LABEL_NO_WRITE_OR_DELETE]) + .set(counts.no_write_or_delete as f64); crate::metrics::READ_ONLY_VOLUME_GAUGE - .with_label_values(&[col, "volume"]) - .set(*count as f64); + .with_label_values(&[col, crate::metrics::READ_ONLY_LABEL_NO_WRITE_CAN_DELETE]) + .set(counts.no_write_can_delete as f64); + crate::metrics::READ_ONLY_VOLUME_GAUGE + .with_label_values(&[col, crate::metrics::READ_ONLY_LABEL_IS_DISK_SPACE_LOW]) + .set(counts.is_disk_space_low as f64); } // Update max volumes gauge let total_max: i64 = max_volume_counts.values().map(|v| *v as i64).sum(); @@ -686,12 +723,17 @@ fn collect_ec_heartbeat(config: &HeartbeatConfig, state: &Arc let store = state.store.read().unwrap(); let mut ec_shards = Vec::new(); - for loc in &store.locations { + let mut ec_sizes: HashMap = HashMap::new(); + for (disk_id, loc) in store.locations.iter().enumerate() { for (vid, ec_vol) in loc.ec_volumes() { let mut ec_index_bits: u32 = 0; + let mut shard_sizes = Vec::new(); for shard_opt in &ec_vol.shards { if let Some(shard) = shard_opt { ec_index_bits |= 1u32 << shard.shard_id; + shard_sizes.push(shard.file_size()); + *ec_sizes.entry(ec_vol.collection.clone()).or_insert(0) += + shard.file_size().max(0) as u64; } } if ec_index_bits > 0 { @@ -699,12 +741,22 @@ fn collect_ec_heartbeat(config: &HeartbeatConfig, state: &Arc id: vid.0, collection: ec_vol.collection.clone(), ec_index_bits, + shard_sizes, + disk_type: ec_vol.disk_type.to_string(), + expire_at_sec: ec_vol.expire_at_sec, + disk_id: disk_id as u32, ..Default::default() }); } } } + for (col, size) in &ec_sizes { + crate::metrics::DISK_SIZE_GAUGE + .with_label_values(&[col, crate::metrics::DISK_SIZE_LABEL_EC]) + .set(*size as f64); + } + let has_no = ec_shards.is_empty(); master_pb::Heartbeat { ip: config.ip.clone(), @@ -723,10 +775,17 @@ mod tests { use super::*; use crate::config::MinFreeSpace; use crate::config::ReadMode; + use crate::metrics::{ + DISK_SIZE_GAUGE, DISK_SIZE_LABEL_DELETED_BYTES, DISK_SIZE_LABEL_EC, + DISK_SIZE_LABEL_NORMAL, READ_ONLY_LABEL_IS_DISK_SPACE_LOW, + READ_ONLY_LABEL_IS_READ_ONLY, READ_ONLY_LABEL_NO_WRITE_CAN_DELETE, + READ_ONLY_LABEL_NO_WRITE_OR_DELETE, READ_ONLY_VOLUME_GAUGE, + }; use crate::remote_storage::s3_tier::S3TierRegistry; use crate::security::{Guard, SigningKey}; use crate::storage::needle_map::NeedleMapKind; use crate::storage::types::{DiskType, Version, VolumeId}; + use std::sync::atomic::Ordering; use std::sync::RwLock; fn test_config() -> HeartbeatConfig { @@ -860,6 +919,132 @@ mod tests { assert!(heartbeat.has_no_volumes); } + #[test] + fn test_build_heartbeat_tracks_go_read_only_labels_and_disk_id() { + let temp_dir = tempfile::tempdir().unwrap(); + let dir = temp_dir.path().to_str().unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store + .add_location( + dir, + dir, + 8, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + store + .add_volume( + VolumeId(17), + "heartbeat_metrics_case", + None, + None, + 0, + DiskType::HardDrive, + Version::current(), + ) + .unwrap(); + store.locations[0] + .is_disk_space_low + .store(true, Ordering::Relaxed); + + { + let (_, volume) = store.find_volume_mut(VolumeId(17)).unwrap(); + volume.set_read_only(); + volume.volume_info.files.push(Default::default()); + volume.refresh_remote_write_mode(); + } + + let heartbeat = build_heartbeat(&test_config(), &store); + let collection = "heartbeat_metrics_case"; + let disk_type = store.locations[0].disk_type.to_string(); + + assert_eq!(heartbeat.volumes.len(), 1); + assert_eq!(heartbeat.volumes[0].disk_id, 0); + assert_eq!(heartbeat.max_volume_counts[&disk_type], 1); + assert_eq!( + READ_ONLY_VOLUME_GAUGE + .with_label_values(&[collection, READ_ONLY_LABEL_IS_READ_ONLY]) + .get(), + 1.0 + ); + assert_eq!( + READ_ONLY_VOLUME_GAUGE + .with_label_values(&[collection, READ_ONLY_LABEL_NO_WRITE_OR_DELETE]) + .get(), + 0.0 + ); + assert_eq!( + READ_ONLY_VOLUME_GAUGE + .with_label_values(&[collection, READ_ONLY_LABEL_NO_WRITE_CAN_DELETE]) + .get(), + 1.0 + ); + assert_eq!( + READ_ONLY_VOLUME_GAUGE + .with_label_values(&[collection, READ_ONLY_LABEL_IS_DISK_SPACE_LOW]) + .get(), + 1.0 + ); + assert_eq!( + DISK_SIZE_GAUGE + .with_label_values(&[collection, DISK_SIZE_LABEL_NORMAL]) + .get(), + 0.0 + ); + assert_eq!( + DISK_SIZE_GAUGE + .with_label_values(&[collection, DISK_SIZE_LABEL_DELETED_BYTES]) + .get(), + 0.0 + ); + } + + #[test] + fn test_collect_ec_heartbeat_sets_go_metadata_and_ec_metrics() { + let temp_dir = tempfile::tempdir().unwrap(); + let dir = temp_dir.path().to_str().unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store + .add_location( + dir, + dir, + 8, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + + let shard_path = format!("{}/ec_metrics_case_27.ec00", dir); + std::fs::write(&shard_path, b"ec-shard").unwrap(); + store.locations[0] + .mount_ec_shards(VolumeId(27), "ec_metrics_case", &[0]) + .unwrap(); + + let state = test_state_with_store(store); + let heartbeat = collect_ec_heartbeat(&test_config(), &state); + + assert_eq!(heartbeat.ec_shards.len(), 1); + assert!(!heartbeat.has_no_ec_shards); + assert_eq!(heartbeat.ec_shards[0].disk_id, 0); + assert_eq!( + heartbeat.ec_shards[0].disk_type, + state.store.read().unwrap().locations[0].disk_type.to_string() + ); + assert_eq!(heartbeat.ec_shards[0].ec_index_bits, 1); + assert_eq!(heartbeat.ec_shards[0].shard_sizes, vec![8]); + assert_eq!( + DISK_SIZE_GAUGE + .with_label_values(&["ec_metrics_case", DISK_SIZE_LABEL_EC]) + .get(), + 8.0 + ); + } + #[test] fn test_apply_storage_backends_registers_s3_default_aliases() { let state = test_state_with_store(Store::new(NeedleMapKind::InMemory)); diff --git a/seaweed-volume/src/storage/disk_location.rs b/seaweed-volume/src/storage/disk_location.rs index 70ff6538e..6585e2555 100644 --- a/seaweed-volume/src/storage/disk_location.rs +++ b/seaweed-volume/src/storage/disk_location.rs @@ -551,6 +551,7 @@ impl DiskLocation { .ec_volumes .entry(vid) .or_insert_with(|| EcVolume::new(&dir, &idx_dir, collection, vid).unwrap()); + ec_vol.disk_type = self.disk_type.clone(); for &shard_id in shard_ids { let shard = EcVolumeShard::new(&dir, collection, vid, shard_id as u8); diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 08d2156d8..a7c8fc090 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -1563,6 +1563,14 @@ impl Volume { self.no_write_or_delete || self.no_write_can_delete } + pub fn is_no_write_or_delete(&self) -> bool { + self.no_write_or_delete + } + + pub fn is_no_write_can_delete(&self) -> bool { + self.no_write_can_delete + } + pub fn last_compact_revision(&self) -> u16 { self.last_compact_revision }