Browse Source

Match Go volume heartbeat pruning

rust-volume-server
Chris Lu 3 days ago
parent
commit
139b06a01d
  1. 173
      seaweed-volume/src/server/heartbeat.rs
  2. 45
      seaweed-volume/src/storage/volume.rs

173
seaweed-volume/src/server/heartbeat.rs

@ -604,7 +604,7 @@ fn collect_heartbeat(
let (ec_shards, deleted_ec_shards) = store.delete_expired_ec_volumes();
build_heartbeat_with_ec_status(
config,
&store,
&mut store,
deleted_ec_shards,
ec_shards.is_empty(),
)
@ -629,17 +629,19 @@ fn collect_location_metadata(store: &Store) -> (Vec<String>, Vec<master_pb::Disk
}
#[cfg(test)]
fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartbeat {
fn build_heartbeat(config: &HeartbeatConfig, store: &mut Store) -> master_pb::Heartbeat {
let has_no_ec_shards = collect_live_ec_shards(store, false).is_empty();
build_heartbeat_with_ec_status(config, store, Vec::new(), has_no_ec_shards)
}
fn build_heartbeat_with_ec_status(
config: &HeartbeatConfig,
store: &Store,
store: &mut Store,
deleted_ec_shards: Vec<master_pb::VolumeEcShardInformationMessage>,
has_no_ec_shards: bool,
) -> master_pb::Heartbeat {
const MAX_TTL_VOLUME_REMOVAL_DELAY: u32 = 10;
#[derive(Default)]
struct ReadOnlyCounts {
is_read_only: u32,
@ -656,7 +658,9 @@ fn build_heartbeat_with_ec_status(
let mut disk_sizes: HashMap<String, (u64, u64)> = HashMap::new(); // (normal, deleted)
let mut ro_counts: HashMap<String, ReadOnlyCounts> = HashMap::new();
for (disk_id, loc) in store.locations.iter().enumerate() {
let volume_size_limit = store.volume_size_limit.load(Ordering::Relaxed);
for (disk_id, loc) in store.locations.iter_mut().enumerate() {
let disk_type_str = loc.disk_type.to_string();
let mut effective_max_count = loc.max_volume_count.load(Ordering::Relaxed);
if loc.is_disk_space_low.load(Ordering::Relaxed) {
@ -673,19 +677,51 @@ fn build_heartbeat_with_ec_status(
}
*max_volume_counts.entry(disk_type_str).or_insert(0) += effective_max_count as u32;
let mut delete_vids = Vec::new();
for (_, vol) in loc.iter_volumes() {
let cur_max = vol.max_file_key();
if cur_max > max_file_key {
max_file_key = cur_max;
}
let volume_size = vol.dat_file_size().unwrap_or(0);
let mut should_delete_volume = false;
if vol.last_io_error().is_some() {
delete_vids.push(vol.id);
should_delete_volume = true;
} else if !vol.is_expired(volume_size, volume_size_limit) {
volumes.push(master_pb::VolumeInformationMessage {
id: vol.id.0,
size: volume_size,
collection: vol.collection.clone(),
file_count: vol.file_count() as u64,
delete_count: vol.deleted_count() as u64,
deleted_byte_count: vol.deleted_size(),
read_only: vol.is_read_only(),
replica_placement: vol.super_block.replica_placement.to_byte() as u32,
version: vol.super_block.version.0 as u32,
ttl: vol.super_block.ttl.to_u32(),
compact_revision: vol.last_compact_revision() as u32,
modified_at_second: vol.last_modified_ts() as i64,
disk_type: loc.disk_type.to_string(),
disk_id: disk_id as u32,
..Default::default()
});
} else if vol.is_expired_long_enough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
delete_vids.push(vol.id);
should_delete_volume = true;
}
// Track disk size by collection
let entry = disk_sizes.entry(vol.collection.clone()).or_insert((0, 0));
entry.0 += vol.content_size();
entry.1 += vol.deleted_size();
if !should_delete_volume {
entry.0 += volume_size;
entry.1 += vol.deleted_size();
}
let read_only = ro_counts.entry(vol.collection.clone()).or_default();
if vol.is_read_only() {
if !should_delete_volume && vol.is_read_only() {
read_only.is_read_only += 1;
if vol.is_no_write_or_delete() {
read_only.no_write_or_delete += 1;
@ -698,23 +734,10 @@ fn build_heartbeat_with_ec_status(
}
}
volumes.push(master_pb::VolumeInformationMessage {
id: vol.id.0,
size: vol.content_size(),
collection: vol.collection.clone(),
file_count: vol.file_count() as u64,
delete_count: vol.deleted_count() as u64,
deleted_byte_count: vol.deleted_size(),
read_only: vol.is_read_only(),
replica_placement: vol.super_block.replica_placement.to_byte() as u32,
version: vol.super_block.version.0 as u32,
ttl: vol.super_block.ttl.to_u32(),
compact_revision: vol.last_compact_revision() as u32,
modified_at_second: vol.last_modified_ts() as i64,
disk_type: loc.disk_type.to_string(),
disk_id: disk_id as u32,
..Default::default()
});
}
for vid in delete_vids {
let _ = loc.delete_volume(vid);
}
}
@ -838,6 +861,7 @@ mod tests {
use crate::storage::types::{DiskType, Version, VolumeId};
use std::sync::atomic::Ordering;
use std::sync::RwLock;
use std::time::{SystemTime, UNIX_EPOCH};
fn test_config() -> HeartbeatConfig {
HeartbeatConfig {
@ -929,7 +953,7 @@ mod tests {
)
.unwrap();
let heartbeat = build_heartbeat(&test_config(), &store);
let heartbeat = build_heartbeat(&test_config(), &mut store);
assert_eq!(heartbeat.id, "volume-node-a");
assert_eq!(heartbeat.volumes.len(), 1);
@ -964,7 +988,7 @@ mod tests {
)
.unwrap();
let heartbeat = build_heartbeat(&test_config(), &store);
let heartbeat = build_heartbeat(&test_config(), &mut store);
assert!(heartbeat.volumes.is_empty());
assert!(heartbeat.has_no_volumes);
@ -1008,7 +1032,7 @@ mod tests {
volume.refresh_remote_write_mode();
}
let heartbeat = build_heartbeat(&test_config(), &store);
let heartbeat = build_heartbeat(&test_config(), &mut store);
let collection = "heartbeat_metrics_case";
let disk_type = store.locations[0].disk_type.to_string();
@ -1043,7 +1067,7 @@ mod tests {
DISK_SIZE_GAUGE
.with_label_values(&[collection, DISK_SIZE_LABEL_NORMAL])
.get(),
0.0
crate::storage::super_block::SUPER_BLOCK_SIZE as f64
);
assert_eq!(
DISK_SIZE_GAUGE
@ -1131,6 +1155,99 @@ mod tests {
assert!(!state.store.read().unwrap().has_ec_volume(VolumeId(31)));
}
#[test]
fn test_collect_heartbeat_excludes_expired_volume_until_removal_delay() {
let temp_dir = tempfile::tempdir().unwrap();
let dir = temp_dir.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory);
store
.add_location(
dir,
dir,
8,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
store.volume_size_limit.store(1, Ordering::Relaxed);
store
.add_volume(
VolumeId(41),
"expired_volume_case",
None,
Some(crate::storage::needle::ttl::TTL::read("20m").unwrap()),
1024,
DiskType::HardDrive,
Version::current(),
)
.unwrap();
let dat_path = {
let (_, volume) = store.find_volume_mut(VolumeId(41)).unwrap();
volume.set_last_io_error_for_test(None);
volume.set_last_modified_ts_for_test(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
.saturating_sub(21 * 60),
);
volume.dat_path()
};
std::fs::OpenOptions::new()
.write(true)
.open(&dat_path)
.unwrap()
.set_len((crate::storage::super_block::SUPER_BLOCK_SIZE + 1) as u64)
.unwrap();
let volume_size_limit = store.volume_size_limit.load(Ordering::Relaxed);
let (_, volume) = store.find_volume(VolumeId(41)).unwrap();
assert!(volume.is_expired(volume.dat_file_size().unwrap_or(0), volume_size_limit));
assert!(!volume.is_expired_long_enough(10));
let heartbeat = build_heartbeat(&test_config(), &mut store);
assert!(heartbeat.volumes.is_empty());
assert!(store.has_volume(VolumeId(41)));
}
#[test]
fn test_collect_heartbeat_deletes_io_error_volume() {
let temp_dir = tempfile::tempdir().unwrap();
let dir = temp_dir.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory);
store
.add_location(
dir,
dir,
8,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
store
.add_volume(
VolumeId(51),
"io_error_case",
None,
None,
0,
DiskType::HardDrive,
Version::current(),
)
.unwrap();
let (_, volume) = store.find_volume_mut(VolumeId(51)).unwrap();
volume.set_last_io_error_for_test(Some("input/output error"));
let heartbeat = build_heartbeat(&test_config(), &mut store);
assert!(heartbeat.volumes.is_empty());
assert!(!store.has_volume(VolumeId(51)));
}
#[test]
fn test_apply_storage_backends_registers_s3_default_aliases() {
let state = test_state_with_store(Store::new(NeedleMapKind::InMemory));

45
seaweed-volume/src/storage/volume.rs

@ -1579,6 +1579,39 @@ impl Volume {
self.last_modified_ts_seconds
}
pub fn is_expired(&self, volume_size: u64, volume_size_limit: u64) -> bool {
if volume_size_limit == 0 {
return false;
}
if volume_size <= SUPER_BLOCK_SIZE as u64 {
return false;
}
let ttl_minutes = self.super_block.ttl.minutes();
if ttl_minutes == 0 {
return false;
}
let lived_minutes = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
.saturating_sub(self.last_modified_ts_seconds)
/ 60;
(ttl_minutes as u64) < lived_minutes
}
pub fn is_expired_long_enough(&self, max_delay_minutes: u32) -> bool {
let ttl_minutes = self.super_block.ttl.minutes();
if ttl_minutes == 0 {
return false;
}
let removal_delay = std::cmp::min(ttl_minutes / 10, max_delay_minutes);
((ttl_minutes + removal_delay) as u64) * 60 + self.last_modified_ts_seconds
< SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
/// Read all live needles from the volume (for ReadAllNeedles streaming RPC).
pub fn read_all_needles(&self) -> Result<Vec<Needle>, VolumeError> {
let _guard = self.data_file_access_control.read_lock();
@ -2774,6 +2807,18 @@ impl Volume {
pub fn last_io_error(&self) -> Option<String> {
self.last_io_error.lock().ok()?.clone()
}
#[cfg(test)]
pub(crate) fn set_last_io_error_for_test(&self, err: Option<&str>) {
if let Ok(mut guard) = self.last_io_error.lock() {
*guard = err.map(|value| value.to_string());
}
}
#[cfg(test)]
pub(crate) fn set_last_modified_ts_for_test(&mut self, ts_seconds: u64) {
self.last_modified_ts_seconds = ts_seconds;
}
}
// ============================================================================

Loading…
Cancel
Save