From 7f0b8e7b03775182880da539f5671d467fe429c1 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Mar 2026 21:37:45 -0700 Subject: [PATCH] Delete expired EC volumes during heartbeat --- seaweed-volume/src/server/heartbeat.rs | 116 +++++++++++++----- .../src/storage/erasure_coding/ec_volume.rs | 38 ++++++ seaweed-volume/src/storage/store.rs | 68 ++++++++++ 3 files changed, 191 insertions(+), 31 deletions(-) diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index 51df51fac..362363510 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -600,8 +600,14 @@ fn collect_heartbeat( config: &HeartbeatConfig, state: &Arc, ) -> master_pb::Heartbeat { - let store = state.store.read().unwrap(); - build_heartbeat(config, &store) + let mut store = state.store.write().unwrap(); + let (ec_shards, deleted_ec_shards) = store.delete_expired_ec_volumes(); + build_heartbeat_with_ec_status( + config, + &store, + deleted_ec_shards, + ec_shards.is_empty(), + ) } fn collect_location_metadata(store: &Store) -> (Vec, Vec) { @@ -622,7 +628,18 @@ fn collect_location_metadata(store: &Store) -> (Vec, Vec master_pb::Heartbeat { + let has_no_ec_shards = collect_live_ec_shards(store, false).is_empty(); + build_heartbeat_with_ec_status(config, store, Vec::new(), has_no_ec_shards) +} + +fn build_heartbeat_with_ec_status( + config: &HeartbeatConfig, + store: &Store, + deleted_ec_shards: Vec, + has_no_ec_shards: bool, +) -> master_pb::Heartbeat { #[derive(Default)] struct ReadOnlyCounts { is_read_only: u32, @@ -741,7 +758,9 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb rack: config.rack.clone(), admin_port: config.port as u32, volumes, + deleted_ec_shards, has_no_volumes, + has_no_ec_shards, max_volume_counts, grpc_port: config.grpc_port as u32, location_uuids, @@ -750,45 +769,45 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb } } -/// Collect EC shard information into a Heartbeat message. -fn collect_ec_heartbeat(config: &HeartbeatConfig, state: &Arc) -> master_pb::Heartbeat { - let store = state.store.read().unwrap(); - +fn collect_live_ec_shards( + store: &Store, + update_metrics: bool, +) -> Vec { let mut ec_shards = Vec::new(); let mut ec_sizes: HashMap = HashMap::new(); + for (disk_id, loc) in store.locations.iter().enumerate() { - for (vid, ec_vol) in loc.ec_volumes() { - let mut ec_index_bits: u32 = 0; - let mut shard_sizes = Vec::new(); - for shard_opt in &ec_vol.shards { - if let Some(shard) = shard_opt { - ec_index_bits |= 1u32 << shard.shard_id; - shard_sizes.push(shard.file_size()); - *ec_sizes.entry(ec_vol.collection.clone()).or_insert(0) += - shard.file_size().max(0) as u64; + for (_, ec_vol) in loc.ec_volumes() { + for message in ec_vol.to_volume_ec_shard_information_messages(disk_id as u32) { + if update_metrics { + let total_size: u64 = message + .shard_sizes + .iter() + .map(|size| (*size).max(0) as u64) + .sum(); + *ec_sizes.entry(message.collection.clone()).or_insert(0) += total_size; } - } - if ec_index_bits > 0 { - ec_shards.push(master_pb::VolumeEcShardInformationMessage { - id: vid.0, - collection: ec_vol.collection.clone(), - ec_index_bits, - shard_sizes, - disk_type: ec_vol.disk_type.to_string(), - expire_at_sec: ec_vol.expire_at_sec, - disk_id: disk_id as u32, - ..Default::default() - }); + ec_shards.push(message); } } } - for (col, size) in &ec_sizes { - crate::metrics::DISK_SIZE_GAUGE - .with_label_values(&[col, crate::metrics::DISK_SIZE_LABEL_EC]) - .set(*size as f64); + if update_metrics { + for (col, size) in &ec_sizes { + crate::metrics::DISK_SIZE_GAUGE + .with_label_values(&[col, crate::metrics::DISK_SIZE_LABEL_EC]) + .set(*size as f64); + } } + ec_shards +} + +/// Collect EC shard information into a Heartbeat message. +fn collect_ec_heartbeat(config: &HeartbeatConfig, state: &Arc) -> master_pb::Heartbeat { + let store = state.store.read().unwrap(); + let ec_shards = collect_live_ec_shards(&store, true); + let has_no = ec_shards.is_empty(); master_pb::Heartbeat { ip: config.ip.clone(), @@ -1077,6 +1096,41 @@ mod tests { ); } + #[test] + fn test_collect_heartbeat_deletes_expired_ec_volumes() { + let temp_dir = tempfile::tempdir().unwrap(); + let dir = temp_dir.path().to_str().unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store + .add_location( + dir, + dir, + 8, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + + std::fs::write(format!("{}/expired_heartbeat_ec_31.ec00", dir), b"expired").unwrap(); + store.locations[0] + .mount_ec_shards(VolumeId(31), "expired_heartbeat_ec", &[0]) + .unwrap(); + store + .find_ec_volume_mut(VolumeId(31)) + .unwrap() + .expire_at_sec = 1; + + let state = test_state_with_store(store); + let heartbeat = collect_heartbeat(&test_config(), &state); + + assert!(heartbeat.has_no_ec_shards); + assert_eq!(heartbeat.deleted_ec_shards.len(), 1); + assert_eq!(heartbeat.deleted_ec_shards[0].id, 31); + assert!(!state.store.read().unwrap().has_ec_volume(VolumeId(31))); + } + #[test] fn test_apply_storage_backends_registers_s3_default_aliases() { let state = test_state_with_store(Store::new(NeedleMapKind::InMemory)); diff --git a/seaweed-volume/src/storage/erasure_coding/ec_volume.rs b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs index cf24ab10c..06b99ddba 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_volume.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs @@ -6,7 +6,9 @@ use std::collections::HashMap; use std::fs::{self, File, OpenOptions}; use std::io::{self, Write}; +use std::time::{SystemTime, UNIX_EPOCH}; +use crate::pb::master_pb; use crate::storage::erasure_coding::ec_locate; use crate::storage::erasure_coding::ec_shard::*; use crate::storage::needle::needle::{get_actual_size, Needle}; @@ -208,6 +210,42 @@ impl EcVolume { self.shards.iter().filter(|s| s.is_some()).count() } + pub fn is_time_to_destroy(&self) -> bool { + self.expire_at_sec > 0 + && SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + > self.expire_at_sec + } + + pub fn to_volume_ec_shard_information_messages( + &self, + disk_id: u32, + ) -> Vec { + let mut ec_index_bits: u32 = 0; + let mut shard_sizes = Vec::new(); + for shard in self.shards.iter().flatten() { + ec_index_bits |= 1u32 << shard.shard_id; + shard_sizes.push(shard.file_size()); + } + + if ec_index_bits == 0 { + return Vec::new(); + } + + vec![master_pb::VolumeEcShardInformationMessage { + id: self.volume_id.0, + collection: self.collection.clone(), + ec_index_bits, + shard_sizes, + disk_type: self.disk_type.to_string(), + expire_at_sec: self.expire_at_sec, + disk_id, + ..Default::default() + }] + } + // ---- Shard locations (distributed tracking) ---- /// Set the list of server addresses for a given shard ID. diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index 76f69cd99..4c6c3caef 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -8,6 +8,7 @@ use std::io; use std::sync::atomic::{AtomicU64, Ordering}; use crate::config::MinFreeSpace; +use crate::pb::master_pb; use crate::storage::disk_location::DiskLocation; use crate::storage::erasure_coding::ec_shard::EcVolumeShard; use crate::storage::erasure_coding::ec_volume::EcVolume; @@ -569,6 +570,49 @@ impl Store { self.locations.iter().any(|loc| loc.has_ec_volume(vid)) } + pub fn delete_expired_ec_volumes( + &mut self, + ) -> ( + Vec, + Vec, + ) { + let mut ec_shards = Vec::new(); + let mut deleted = Vec::new(); + + for (disk_id, loc) in self.locations.iter_mut().enumerate() { + let mut expired_vids = Vec::new(); + for (vid, ec_vol) in loc.ec_volumes() { + if ec_vol.is_time_to_destroy() { + expired_vids.push(*vid); + } else { + ec_shards.extend( + ec_vol.to_volume_ec_shard_information_messages(disk_id as u32), + ); + } + } + + for vid in expired_vids { + let messages = loc + .find_ec_volume(vid) + .map(|ec_vol| ec_vol.to_volume_ec_shard_information_messages(disk_id as u32)) + .unwrap_or_default(); + if let Some(mut ec_vol) = loc.remove_ec_volume(vid) { + for _ in 0..ec_vol.shard_count() { + crate::metrics::VOLUME_GAUGE + .with_label_values(&[&ec_vol.collection, "ec_shards"]) + .dec(); + } + ec_vol.destroy(); + deleted.extend(messages); + } else { + ec_shards.extend(messages); + } + } + } + + (ec_shards, deleted) + } + /// Remove an EC volume from whichever location has it. pub fn remove_ec_volume(&mut self, vid: VolumeId) -> Option { for loc in &mut self.locations { @@ -1043,6 +1087,30 @@ mod tests { assert!(store.has_volume(VolumeId(3))); } + #[test] + fn test_delete_expired_ec_volumes_removes_expired_entries() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let mut store = make_test_store(&[dir]); + + std::fs::write(format!("{}/expired_ec_case_9.ec00", dir), b"expired").unwrap(); + store.locations[0] + .mount_ec_shards(VolumeId(9), "expired_ec_case", &[0]) + .unwrap(); + store + .find_ec_volume_mut(VolumeId(9)) + .unwrap() + .expire_at_sec = 1; + + let (ec_shards, deleted) = store.delete_expired_ec_volumes(); + + assert!(ec_shards.is_empty()); + assert_eq!(deleted.len(), 1); + assert_eq!(deleted[0].id, 9); + assert!(!store.has_ec_volume(VolumeId(9))); + assert!(!std::path::Path::new(&format!("{}/expired_ec_case_9.ec00", dir)).exists()); + } + #[test] fn test_store_volume_not_found() { let tmp = TempDir::new().unwrap();