Browse Source

Match Go heartbeat metric accounting

rust-volume-server
Chris Lu 4 days ago
parent
commit
c67fba4b9d
  1. 9
      seaweed-volume/src/metrics.rs
  2. 213
      seaweed-volume/src/server/heartbeat.rs
  3. 1
      seaweed-volume/src/storage/disk_location.rs
  4. 8
      seaweed-volume/src/storage/volume.rs

9
seaweed-volume/src/metrics.rs

@ -198,6 +198,15 @@ pub const ERROR_WRITE_TO_LOCAL_DISK: &str = "errorWriteToLocalDisk";
pub const ERROR_UNMARSHAL_PAIRS: &str = "errorUnmarshalPairs"; pub const ERROR_UNMARSHAL_PAIRS: &str = "errorUnmarshalPairs";
pub const ERROR_WRITE_TO_REPLICAS: &str = "errorWriteToReplicas"; pub const ERROR_WRITE_TO_REPLICAS: &str = "errorWriteToReplicas";
// Go volume heartbeat metric label values.
pub const READ_ONLY_LABEL_IS_READ_ONLY: &str = "IsReadOnly";
pub const READ_ONLY_LABEL_NO_WRITE_OR_DELETE: &str = "noWriteOrDelete";
pub const READ_ONLY_LABEL_NO_WRITE_CAN_DELETE: &str = "noWriteCanDelete";
pub const READ_ONLY_LABEL_IS_DISK_SPACE_LOW: &str = "isDiskSpaceLow";
pub const DISK_SIZE_LABEL_NORMAL: &str = "normal";
pub const DISK_SIZE_LABEL_DELETED_BYTES: &str = "deleted_bytes";
pub const DISK_SIZE_LABEL_EC: &str = "ec";
static REGISTER_METRICS: Once = Once::new(); static REGISTER_METRICS: Once = Once::new();
/// Register all metrics with the custom registry. /// Register all metrics with the custom registry.

213
seaweed-volume/src/server/heartbeat.rs

@ -591,20 +591,38 @@ fn collect_location_metadata(store: &Store) -> (Vec<String>, Vec<master_pb::Disk
} }
fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartbeat { fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartbeat {
#[derive(Default)]
struct ReadOnlyCounts {
is_read_only: u32,
no_write_or_delete: u32,
no_write_can_delete: u32,
is_disk_space_low: u32,
}
let mut volumes = Vec::new(); let mut volumes = Vec::new();
let mut max_file_key = NeedleId(0); let mut max_file_key = NeedleId(0);
let mut max_volume_counts: HashMap<String, u32> = HashMap::new(); let mut max_volume_counts: HashMap<String, u32> = HashMap::new();
// Collect per-collection disk size and read-only counts for metrics // Collect per-collection disk size and read-only counts for metrics
let mut disk_sizes: HashMap<String, (u64, u64)> = HashMap::new(); // (normal, deleted) let mut disk_sizes: HashMap<String, (u64, u64)> = HashMap::new(); // (normal, deleted)
let mut ro_counts: HashMap<String, u32> = HashMap::new();
let mut ro_counts: HashMap<String, ReadOnlyCounts> = HashMap::new();
for loc in &store.locations {
for (disk_id, loc) in store.locations.iter().enumerate() {
let disk_type_str = loc.disk_type.to_string(); let disk_type_str = loc.disk_type.to_string();
let max_count = loc
.max_volume_count
.load(std::sync::atomic::Ordering::Relaxed);
*max_volume_counts.entry(disk_type_str).or_insert(0) += max_count as u32;
let mut effective_max_count = loc.max_volume_count.load(Ordering::Relaxed);
if loc.is_disk_space_low.load(Ordering::Relaxed) {
let used_slots = loc.volumes_len() as i32
+ ((loc.ec_shard_count()
+ crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT
- 1)
/ crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT)
as i32;
effective_max_count = used_slots;
}
if effective_max_count < 0 {
effective_max_count = 0;
}
*max_volume_counts.entry(disk_type_str).or_insert(0) += effective_max_count as u32;
for (_, vol) in loc.iter_volumes() { for (_, vol) in loc.iter_volumes() {
let cur_max = vol.max_file_key(); let cur_max = vol.max_file_key();
@ -617,9 +635,18 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb
entry.0 += vol.content_size(); entry.0 += vol.content_size();
entry.1 += vol.deleted_size(); entry.1 += vol.deleted_size();
// Track read-only count by collection
let read_only = ro_counts.entry(vol.collection.clone()).or_default();
if vol.is_read_only() { if vol.is_read_only() {
*ro_counts.entry(vol.collection.clone()).or_insert(0) += 1;
read_only.is_read_only += 1;
if vol.is_no_write_or_delete() {
read_only.no_write_or_delete += 1;
}
if vol.is_no_write_can_delete() {
read_only.no_write_can_delete += 1;
}
if loc.is_disk_space_low.load(Ordering::Relaxed) {
read_only.is_disk_space_low += 1;
}
} }
volumes.push(master_pb::VolumeInformationMessage { volumes.push(master_pb::VolumeInformationMessage {
@ -636,6 +663,7 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb
compact_revision: vol.last_compact_revision() as u32, compact_revision: vol.last_compact_revision() as u32,
modified_at_second: vol.last_modified_ts() as i64, modified_at_second: vol.last_modified_ts() as i64,
disk_type: loc.disk_type.to_string(), disk_type: loc.disk_type.to_string(),
disk_id: disk_id as u32,
..Default::default() ..Default::default()
}); });
} }
@ -644,16 +672,25 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb
// Update disk size and read-only gauges // Update disk size and read-only gauges
for (col, (normal, deleted)) in &disk_sizes { for (col, (normal, deleted)) in &disk_sizes {
crate::metrics::DISK_SIZE_GAUGE crate::metrics::DISK_SIZE_GAUGE
.with_label_values(&[col, "normal"])
.with_label_values(&[col, crate::metrics::DISK_SIZE_LABEL_NORMAL])
.set(*normal as f64); .set(*normal as f64);
crate::metrics::DISK_SIZE_GAUGE crate::metrics::DISK_SIZE_GAUGE
.with_label_values(&[col, "deleted_bytes"])
.with_label_values(&[col, crate::metrics::DISK_SIZE_LABEL_DELETED_BYTES])
.set(*deleted as f64); .set(*deleted as f64);
} }
for (col, count) in &ro_counts {
for (col, counts) in &ro_counts {
crate::metrics::READ_ONLY_VOLUME_GAUGE
.with_label_values(&[col, crate::metrics::READ_ONLY_LABEL_IS_READ_ONLY])
.set(counts.is_read_only as f64);
crate::metrics::READ_ONLY_VOLUME_GAUGE
.with_label_values(&[col, crate::metrics::READ_ONLY_LABEL_NO_WRITE_OR_DELETE])
.set(counts.no_write_or_delete as f64);
crate::metrics::READ_ONLY_VOLUME_GAUGE crate::metrics::READ_ONLY_VOLUME_GAUGE
.with_label_values(&[col, "volume"])
.set(*count as f64);
.with_label_values(&[col, crate::metrics::READ_ONLY_LABEL_NO_WRITE_CAN_DELETE])
.set(counts.no_write_can_delete as f64);
crate::metrics::READ_ONLY_VOLUME_GAUGE
.with_label_values(&[col, crate::metrics::READ_ONLY_LABEL_IS_DISK_SPACE_LOW])
.set(counts.is_disk_space_low as f64);
} }
// Update max volumes gauge // Update max volumes gauge
let total_max: i64 = max_volume_counts.values().map(|v| *v as i64).sum(); let total_max: i64 = max_volume_counts.values().map(|v| *v as i64).sum();
@ -686,12 +723,17 @@ fn collect_ec_heartbeat(config: &HeartbeatConfig, state: &Arc<VolumeServerState>
let store = state.store.read().unwrap(); let store = state.store.read().unwrap();
let mut ec_shards = Vec::new(); let mut ec_shards = Vec::new();
for loc in &store.locations {
let mut ec_sizes: HashMap<String, u64> = HashMap::new();
for (disk_id, loc) in store.locations.iter().enumerate() {
for (vid, ec_vol) in loc.ec_volumes() { for (vid, ec_vol) in loc.ec_volumes() {
let mut ec_index_bits: u32 = 0; let mut ec_index_bits: u32 = 0;
let mut shard_sizes = Vec::new();
for shard_opt in &ec_vol.shards { for shard_opt in &ec_vol.shards {
if let Some(shard) = shard_opt { if let Some(shard) = shard_opt {
ec_index_bits |= 1u32 << shard.shard_id; ec_index_bits |= 1u32 << shard.shard_id;
shard_sizes.push(shard.file_size());
*ec_sizes.entry(ec_vol.collection.clone()).or_insert(0) +=
shard.file_size().max(0) as u64;
} }
} }
if ec_index_bits > 0 { if ec_index_bits > 0 {
@ -699,12 +741,22 @@ fn collect_ec_heartbeat(config: &HeartbeatConfig, state: &Arc<VolumeServerState>
id: vid.0, id: vid.0,
collection: ec_vol.collection.clone(), collection: ec_vol.collection.clone(),
ec_index_bits, ec_index_bits,
shard_sizes,
disk_type: ec_vol.disk_type.to_string(),
expire_at_sec: ec_vol.expire_at_sec,
disk_id: disk_id as u32,
..Default::default() ..Default::default()
}); });
} }
} }
} }
for (col, size) in &ec_sizes {
crate::metrics::DISK_SIZE_GAUGE
.with_label_values(&[col, crate::metrics::DISK_SIZE_LABEL_EC])
.set(*size as f64);
}
let has_no = ec_shards.is_empty(); let has_no = ec_shards.is_empty();
master_pb::Heartbeat { master_pb::Heartbeat {
ip: config.ip.clone(), ip: config.ip.clone(),
@ -723,10 +775,17 @@ mod tests {
use super::*; use super::*;
use crate::config::MinFreeSpace; use crate::config::MinFreeSpace;
use crate::config::ReadMode; use crate::config::ReadMode;
use crate::metrics::{
DISK_SIZE_GAUGE, DISK_SIZE_LABEL_DELETED_BYTES, DISK_SIZE_LABEL_EC,
DISK_SIZE_LABEL_NORMAL, READ_ONLY_LABEL_IS_DISK_SPACE_LOW,
READ_ONLY_LABEL_IS_READ_ONLY, READ_ONLY_LABEL_NO_WRITE_CAN_DELETE,
READ_ONLY_LABEL_NO_WRITE_OR_DELETE, READ_ONLY_VOLUME_GAUGE,
};
use crate::remote_storage::s3_tier::S3TierRegistry; use crate::remote_storage::s3_tier::S3TierRegistry;
use crate::security::{Guard, SigningKey}; use crate::security::{Guard, SigningKey};
use crate::storage::needle_map::NeedleMapKind; use crate::storage::needle_map::NeedleMapKind;
use crate::storage::types::{DiskType, Version, VolumeId}; use crate::storage::types::{DiskType, Version, VolumeId};
use std::sync::atomic::Ordering;
use std::sync::RwLock; use std::sync::RwLock;
fn test_config() -> HeartbeatConfig { fn test_config() -> HeartbeatConfig {
@ -860,6 +919,132 @@ mod tests {
assert!(heartbeat.has_no_volumes); assert!(heartbeat.has_no_volumes);
} }
#[test]
fn test_build_heartbeat_tracks_go_read_only_labels_and_disk_id() {
let temp_dir = tempfile::tempdir().unwrap();
let dir = temp_dir.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory);
store
.add_location(
dir,
dir,
8,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
store
.add_volume(
VolumeId(17),
"heartbeat_metrics_case",
None,
None,
0,
DiskType::HardDrive,
Version::current(),
)
.unwrap();
store.locations[0]
.is_disk_space_low
.store(true, Ordering::Relaxed);
{
let (_, volume) = store.find_volume_mut(VolumeId(17)).unwrap();
volume.set_read_only();
volume.volume_info.files.push(Default::default());
volume.refresh_remote_write_mode();
}
let heartbeat = build_heartbeat(&test_config(), &store);
let collection = "heartbeat_metrics_case";
let disk_type = store.locations[0].disk_type.to_string();
assert_eq!(heartbeat.volumes.len(), 1);
assert_eq!(heartbeat.volumes[0].disk_id, 0);
assert_eq!(heartbeat.max_volume_counts[&disk_type], 1);
assert_eq!(
READ_ONLY_VOLUME_GAUGE
.with_label_values(&[collection, READ_ONLY_LABEL_IS_READ_ONLY])
.get(),
1.0
);
assert_eq!(
READ_ONLY_VOLUME_GAUGE
.with_label_values(&[collection, READ_ONLY_LABEL_NO_WRITE_OR_DELETE])
.get(),
0.0
);
assert_eq!(
READ_ONLY_VOLUME_GAUGE
.with_label_values(&[collection, READ_ONLY_LABEL_NO_WRITE_CAN_DELETE])
.get(),
1.0
);
assert_eq!(
READ_ONLY_VOLUME_GAUGE
.with_label_values(&[collection, READ_ONLY_LABEL_IS_DISK_SPACE_LOW])
.get(),
1.0
);
assert_eq!(
DISK_SIZE_GAUGE
.with_label_values(&[collection, DISK_SIZE_LABEL_NORMAL])
.get(),
0.0
);
assert_eq!(
DISK_SIZE_GAUGE
.with_label_values(&[collection, DISK_SIZE_LABEL_DELETED_BYTES])
.get(),
0.0
);
}
#[test]
fn test_collect_ec_heartbeat_sets_go_metadata_and_ec_metrics() {
let temp_dir = tempfile::tempdir().unwrap();
let dir = temp_dir.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory);
store
.add_location(
dir,
dir,
8,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
let shard_path = format!("{}/ec_metrics_case_27.ec00", dir);
std::fs::write(&shard_path, b"ec-shard").unwrap();
store.locations[0]
.mount_ec_shards(VolumeId(27), "ec_metrics_case", &[0])
.unwrap();
let state = test_state_with_store(store);
let heartbeat = collect_ec_heartbeat(&test_config(), &state);
assert_eq!(heartbeat.ec_shards.len(), 1);
assert!(!heartbeat.has_no_ec_shards);
assert_eq!(heartbeat.ec_shards[0].disk_id, 0);
assert_eq!(
heartbeat.ec_shards[0].disk_type,
state.store.read().unwrap().locations[0].disk_type.to_string()
);
assert_eq!(heartbeat.ec_shards[0].ec_index_bits, 1);
assert_eq!(heartbeat.ec_shards[0].shard_sizes, vec![8]);
assert_eq!(
DISK_SIZE_GAUGE
.with_label_values(&["ec_metrics_case", DISK_SIZE_LABEL_EC])
.get(),
8.0
);
}
#[test] #[test]
fn test_apply_storage_backends_registers_s3_default_aliases() { fn test_apply_storage_backends_registers_s3_default_aliases() {
let state = test_state_with_store(Store::new(NeedleMapKind::InMemory)); let state = test_state_with_store(Store::new(NeedleMapKind::InMemory));

1
seaweed-volume/src/storage/disk_location.rs

@ -551,6 +551,7 @@ impl DiskLocation {
.ec_volumes .ec_volumes
.entry(vid) .entry(vid)
.or_insert_with(|| EcVolume::new(&dir, &idx_dir, collection, vid).unwrap()); .or_insert_with(|| EcVolume::new(&dir, &idx_dir, collection, vid).unwrap());
ec_vol.disk_type = self.disk_type.clone();
for &shard_id in shard_ids { for &shard_id in shard_ids {
let shard = EcVolumeShard::new(&dir, collection, vid, shard_id as u8); let shard = EcVolumeShard::new(&dir, collection, vid, shard_id as u8);

8
seaweed-volume/src/storage/volume.rs

@ -1563,6 +1563,14 @@ impl Volume {
self.no_write_or_delete || self.no_write_can_delete self.no_write_or_delete || self.no_write_can_delete
} }
pub fn is_no_write_or_delete(&self) -> bool {
self.no_write_or_delete
}
pub fn is_no_write_can_delete(&self) -> bool {
self.no_write_can_delete
}
pub fn last_compact_revision(&self) -> u16 { pub fn last_compact_revision(&self) -> u16 {
self.last_compact_revision self.last_compact_revision
} }

Loading…
Cancel
Save