diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index 362363510..e52c9b235 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -604,7 +604,7 @@ fn collect_heartbeat( let (ec_shards, deleted_ec_shards) = store.delete_expired_ec_volumes(); build_heartbeat_with_ec_status( config, - &store, + &mut store, deleted_ec_shards, ec_shards.is_empty(), ) @@ -629,17 +629,19 @@ fn collect_location_metadata(store: &Store) -> (Vec, Vec master_pb::Heartbeat { +fn build_heartbeat(config: &HeartbeatConfig, store: &mut Store) -> master_pb::Heartbeat { let has_no_ec_shards = collect_live_ec_shards(store, false).is_empty(); build_heartbeat_with_ec_status(config, store, Vec::new(), has_no_ec_shards) } fn build_heartbeat_with_ec_status( config: &HeartbeatConfig, - store: &Store, + store: &mut Store, deleted_ec_shards: Vec, has_no_ec_shards: bool, ) -> master_pb::Heartbeat { + const MAX_TTL_VOLUME_REMOVAL_DELAY: u32 = 10; + #[derive(Default)] struct ReadOnlyCounts { is_read_only: u32, @@ -656,7 +658,9 @@ fn build_heartbeat_with_ec_status( let mut disk_sizes: HashMap = HashMap::new(); // (normal, deleted) let mut ro_counts: HashMap = HashMap::new(); - for (disk_id, loc) in store.locations.iter().enumerate() { + let volume_size_limit = store.volume_size_limit.load(Ordering::Relaxed); + + for (disk_id, loc) in store.locations.iter_mut().enumerate() { let disk_type_str = loc.disk_type.to_string(); let mut effective_max_count = loc.max_volume_count.load(Ordering::Relaxed); if loc.is_disk_space_low.load(Ordering::Relaxed) { @@ -673,19 +677,51 @@ fn build_heartbeat_with_ec_status( } *max_volume_counts.entry(disk_type_str).or_insert(0) += effective_max_count as u32; + let mut delete_vids = Vec::new(); for (_, vol) in loc.iter_volumes() { let cur_max = vol.max_file_key(); if cur_max > max_file_key { max_file_key = cur_max; } + let volume_size = vol.dat_file_size().unwrap_or(0); + let mut should_delete_volume = false; + + if vol.last_io_error().is_some() { + delete_vids.push(vol.id); + should_delete_volume = true; + } else if !vol.is_expired(volume_size, volume_size_limit) { + volumes.push(master_pb::VolumeInformationMessage { + id: vol.id.0, + size: volume_size, + collection: vol.collection.clone(), + file_count: vol.file_count() as u64, + delete_count: vol.deleted_count() as u64, + deleted_byte_count: vol.deleted_size(), + read_only: vol.is_read_only(), + replica_placement: vol.super_block.replica_placement.to_byte() as u32, + version: vol.super_block.version.0 as u32, + ttl: vol.super_block.ttl.to_u32(), + compact_revision: vol.last_compact_revision() as u32, + modified_at_second: vol.last_modified_ts() as i64, + disk_type: loc.disk_type.to_string(), + disk_id: disk_id as u32, + ..Default::default() + }); + } else if vol.is_expired_long_enough(MAX_TTL_VOLUME_REMOVAL_DELAY) { + delete_vids.push(vol.id); + should_delete_volume = true; + } + // Track disk size by collection let entry = disk_sizes.entry(vol.collection.clone()).or_insert((0, 0)); - entry.0 += vol.content_size(); - entry.1 += vol.deleted_size(); + if !should_delete_volume { + entry.0 += volume_size; + entry.1 += vol.deleted_size(); + } let read_only = ro_counts.entry(vol.collection.clone()).or_default(); - if vol.is_read_only() { + if !should_delete_volume && vol.is_read_only() { read_only.is_read_only += 1; if vol.is_no_write_or_delete() { read_only.no_write_or_delete += 1; @@ -698,23 +734,10 @@ fn build_heartbeat_with_ec_status( } } - volumes.push(master_pb::VolumeInformationMessage { - id: vol.id.0, - size: vol.content_size(), - collection: vol.collection.clone(), - file_count: vol.file_count() as u64, - delete_count: vol.deleted_count() as u64, - deleted_byte_count: vol.deleted_size(), - read_only: vol.is_read_only(), - replica_placement: vol.super_block.replica_placement.to_byte() as u32, - version: vol.super_block.version.0 as u32, - ttl: vol.super_block.ttl.to_u32(), - compact_revision: vol.last_compact_revision() as u32, - modified_at_second: vol.last_modified_ts() as i64, - disk_type: loc.disk_type.to_string(), - disk_id: disk_id as u32, - ..Default::default() - }); + } + + for vid in delete_vids { + let _ = loc.delete_volume(vid); } } @@ -838,6 +861,7 @@ mod tests { use crate::storage::types::{DiskType, Version, VolumeId}; use std::sync::atomic::Ordering; use std::sync::RwLock; + use std::time::{SystemTime, UNIX_EPOCH}; fn test_config() -> HeartbeatConfig { HeartbeatConfig { @@ -929,7 +953,7 @@ mod tests { ) .unwrap(); - let heartbeat = build_heartbeat(&test_config(), &store); + let heartbeat = build_heartbeat(&test_config(), &mut store); assert_eq!(heartbeat.id, "volume-node-a"); assert_eq!(heartbeat.volumes.len(), 1); @@ -964,7 +988,7 @@ mod tests { ) .unwrap(); - let heartbeat = build_heartbeat(&test_config(), &store); + let heartbeat = build_heartbeat(&test_config(), &mut store); assert!(heartbeat.volumes.is_empty()); assert!(heartbeat.has_no_volumes); @@ -1008,7 +1032,7 @@ mod tests { volume.refresh_remote_write_mode(); } - let heartbeat = build_heartbeat(&test_config(), &store); + let heartbeat = build_heartbeat(&test_config(), &mut store); let collection = "heartbeat_metrics_case"; let disk_type = store.locations[0].disk_type.to_string(); @@ -1043,7 +1067,7 @@ mod tests { DISK_SIZE_GAUGE .with_label_values(&[collection, DISK_SIZE_LABEL_NORMAL]) .get(), - 0.0 + crate::storage::super_block::SUPER_BLOCK_SIZE as f64 ); assert_eq!( DISK_SIZE_GAUGE @@ -1131,6 +1155,99 @@ mod tests { assert!(!state.store.read().unwrap().has_ec_volume(VolumeId(31))); } + #[test] + fn test_collect_heartbeat_excludes_expired_volume_until_removal_delay() { + let temp_dir = tempfile::tempdir().unwrap(); + let dir = temp_dir.path().to_str().unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store + .add_location( + dir, + dir, + 8, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + store.volume_size_limit.store(1, Ordering::Relaxed); + store + .add_volume( + VolumeId(41), + "expired_volume_case", + None, + Some(crate::storage::needle::ttl::TTL::read("20m").unwrap()), + 1024, + DiskType::HardDrive, + Version::current(), + ) + .unwrap(); + let dat_path = { + let (_, volume) = store.find_volume_mut(VolumeId(41)).unwrap(); + volume.set_last_io_error_for_test(None); + volume.set_last_modified_ts_for_test( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + .saturating_sub(21 * 60), + ); + volume.dat_path() + }; + std::fs::OpenOptions::new() + .write(true) + .open(&dat_path) + .unwrap() + .set_len((crate::storage::super_block::SUPER_BLOCK_SIZE + 1) as u64) + .unwrap(); + let volume_size_limit = store.volume_size_limit.load(Ordering::Relaxed); + let (_, volume) = store.find_volume(VolumeId(41)).unwrap(); + assert!(volume.is_expired(volume.dat_file_size().unwrap_or(0), volume_size_limit)); + assert!(!volume.is_expired_long_enough(10)); + + let heartbeat = build_heartbeat(&test_config(), &mut store); + + assert!(heartbeat.volumes.is_empty()); + assert!(store.has_volume(VolumeId(41))); + } + + #[test] + fn test_collect_heartbeat_deletes_io_error_volume() { + let temp_dir = tempfile::tempdir().unwrap(); + let dir = temp_dir.path().to_str().unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store + .add_location( + dir, + dir, + 8, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + store + .add_volume( + VolumeId(51), + "io_error_case", + None, + None, + 0, + DiskType::HardDrive, + Version::current(), + ) + .unwrap(); + let (_, volume) = store.find_volume_mut(VolumeId(51)).unwrap(); + volume.set_last_io_error_for_test(Some("input/output error")); + + let heartbeat = build_heartbeat(&test_config(), &mut store); + + assert!(heartbeat.volumes.is_empty()); + assert!(!store.has_volume(VolumeId(51))); + } + #[test] fn test_apply_storage_backends_registers_s3_default_aliases() { let state = test_state_with_store(Store::new(NeedleMapKind::InMemory)); diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index a7c8fc090..77572877a 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -1579,6 +1579,39 @@ impl Volume { self.last_modified_ts_seconds } + pub fn is_expired(&self, volume_size: u64, volume_size_limit: u64) -> bool { + if volume_size_limit == 0 { + return false; + } + if volume_size <= SUPER_BLOCK_SIZE as u64 { + return false; + } + let ttl_minutes = self.super_block.ttl.minutes(); + if ttl_minutes == 0 { + return false; + } + let lived_minutes = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + .saturating_sub(self.last_modified_ts_seconds) + / 60; + (ttl_minutes as u64) < lived_minutes + } + + pub fn is_expired_long_enough(&self, max_delay_minutes: u32) -> bool { + let ttl_minutes = self.super_block.ttl.minutes(); + if ttl_minutes == 0 { + return false; + } + let removal_delay = std::cmp::min(ttl_minutes / 10, max_delay_minutes); + ((ttl_minutes + removal_delay) as u64) * 60 + self.last_modified_ts_seconds + < SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + } + /// Read all live needles from the volume (for ReadAllNeedles streaming RPC). pub fn read_all_needles(&self) -> Result, VolumeError> { let _guard = self.data_file_access_control.read_lock(); @@ -2774,6 +2807,18 @@ impl Volume { pub fn last_io_error(&self) -> Option { self.last_io_error.lock().ok()?.clone() } + + #[cfg(test)] + pub(crate) fn set_last_io_error_for_test(&self, err: Option<&str>) { + if let Ok(mut guard) = self.last_io_error.lock() { + *guard = err.map(|value| value.to_string()); + } + } + + #[cfg(test)] + pub(crate) fn set_last_modified_ts_for_test(&mut self, ts_seconds: u64) { + self.last_modified_ts_seconds = ts_seconds; + } } // ============================================================================