Browse Source

Delete expired EC volumes during heartbeat

rust-volume-server
Chris Lu 3 days ago
parent
commit
7f0b8e7b03
  1. 116
      seaweed-volume/src/server/heartbeat.rs
  2. 38
      seaweed-volume/src/storage/erasure_coding/ec_volume.rs
  3. 68
      seaweed-volume/src/storage/store.rs

116
seaweed-volume/src/server/heartbeat.rs

@ -600,8 +600,14 @@ fn collect_heartbeat(
config: &HeartbeatConfig,
state: &Arc<VolumeServerState>,
) -> master_pb::Heartbeat {
let store = state.store.read().unwrap();
build_heartbeat(config, &store)
let mut store = state.store.write().unwrap();
let (ec_shards, deleted_ec_shards) = store.delete_expired_ec_volumes();
build_heartbeat_with_ec_status(
config,
&store,
deleted_ec_shards,
ec_shards.is_empty(),
)
}
fn collect_location_metadata(store: &Store) -> (Vec<String>, Vec<master_pb::DiskTag>) {
@ -622,7 +628,18 @@ fn collect_location_metadata(store: &Store) -> (Vec<String>, Vec<master_pb::Disk
(location_uuids, disk_tags)
}
#[cfg(test)]
fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartbeat {
let has_no_ec_shards = collect_live_ec_shards(store, false).is_empty();
build_heartbeat_with_ec_status(config, store, Vec::new(), has_no_ec_shards)
}
fn build_heartbeat_with_ec_status(
config: &HeartbeatConfig,
store: &Store,
deleted_ec_shards: Vec<master_pb::VolumeEcShardInformationMessage>,
has_no_ec_shards: bool,
) -> master_pb::Heartbeat {
#[derive(Default)]
struct ReadOnlyCounts {
is_read_only: u32,
@ -741,7 +758,9 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb
rack: config.rack.clone(),
admin_port: config.port as u32,
volumes,
deleted_ec_shards,
has_no_volumes,
has_no_ec_shards,
max_volume_counts,
grpc_port: config.grpc_port as u32,
location_uuids,
@ -750,45 +769,45 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb
}
}
/// Collect EC shard information into a Heartbeat message.
fn collect_ec_heartbeat(config: &HeartbeatConfig, state: &Arc<VolumeServerState>) -> master_pb::Heartbeat {
let store = state.store.read().unwrap();
fn collect_live_ec_shards(
store: &Store,
update_metrics: bool,
) -> Vec<master_pb::VolumeEcShardInformationMessage> {
let mut ec_shards = Vec::new();
let mut ec_sizes: HashMap<String, u64> = HashMap::new();
for (disk_id, loc) in store.locations.iter().enumerate() {
for (vid, ec_vol) in loc.ec_volumes() {
let mut ec_index_bits: u32 = 0;
let mut shard_sizes = Vec::new();
for shard_opt in &ec_vol.shards {
if let Some(shard) = shard_opt {
ec_index_bits |= 1u32 << shard.shard_id;
shard_sizes.push(shard.file_size());
*ec_sizes.entry(ec_vol.collection.clone()).or_insert(0) +=
shard.file_size().max(0) as u64;
for (_, ec_vol) in loc.ec_volumes() {
for message in ec_vol.to_volume_ec_shard_information_messages(disk_id as u32) {
if update_metrics {
let total_size: u64 = message
.shard_sizes
.iter()
.map(|size| (*size).max(0) as u64)
.sum();
*ec_sizes.entry(message.collection.clone()).or_insert(0) += total_size;
}
}
if ec_index_bits > 0 {
ec_shards.push(master_pb::VolumeEcShardInformationMessage {
id: vid.0,
collection: ec_vol.collection.clone(),
ec_index_bits,
shard_sizes,
disk_type: ec_vol.disk_type.to_string(),
expire_at_sec: ec_vol.expire_at_sec,
disk_id: disk_id as u32,
..Default::default()
});
ec_shards.push(message);
}
}
}
for (col, size) in &ec_sizes {
crate::metrics::DISK_SIZE_GAUGE
.with_label_values(&[col, crate::metrics::DISK_SIZE_LABEL_EC])
.set(*size as f64);
if update_metrics {
for (col, size) in &ec_sizes {
crate::metrics::DISK_SIZE_GAUGE
.with_label_values(&[col, crate::metrics::DISK_SIZE_LABEL_EC])
.set(*size as f64);
}
}
ec_shards
}
/// Collect EC shard information into a Heartbeat message.
fn collect_ec_heartbeat(config: &HeartbeatConfig, state: &Arc<VolumeServerState>) -> master_pb::Heartbeat {
let store = state.store.read().unwrap();
let ec_shards = collect_live_ec_shards(&store, true);
let has_no = ec_shards.is_empty();
master_pb::Heartbeat {
ip: config.ip.clone(),
@ -1077,6 +1096,41 @@ mod tests {
);
}
#[test]
fn test_collect_heartbeat_deletes_expired_ec_volumes() {
let temp_dir = tempfile::tempdir().unwrap();
let dir = temp_dir.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory);
store
.add_location(
dir,
dir,
8,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
std::fs::write(format!("{}/expired_heartbeat_ec_31.ec00", dir), b"expired").unwrap();
store.locations[0]
.mount_ec_shards(VolumeId(31), "expired_heartbeat_ec", &[0])
.unwrap();
store
.find_ec_volume_mut(VolumeId(31))
.unwrap()
.expire_at_sec = 1;
let state = test_state_with_store(store);
let heartbeat = collect_heartbeat(&test_config(), &state);
assert!(heartbeat.has_no_ec_shards);
assert_eq!(heartbeat.deleted_ec_shards.len(), 1);
assert_eq!(heartbeat.deleted_ec_shards[0].id, 31);
assert!(!state.store.read().unwrap().has_ec_volume(VolumeId(31)));
}
#[test]
fn test_apply_storage_backends_registers_s3_default_aliases() {
let state = test_state_with_store(Store::new(NeedleMapKind::InMemory));

38
seaweed-volume/src/storage/erasure_coding/ec_volume.rs

@ -6,7 +6,9 @@
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{self, Write};
use std::time::{SystemTime, UNIX_EPOCH};
use crate::pb::master_pb;
use crate::storage::erasure_coding::ec_locate;
use crate::storage::erasure_coding::ec_shard::*;
use crate::storage::needle::needle::{get_actual_size, Needle};
@ -208,6 +210,42 @@ impl EcVolume {
self.shards.iter().filter(|s| s.is_some()).count()
}
pub fn is_time_to_destroy(&self) -> bool {
self.expire_at_sec > 0
&& SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
> self.expire_at_sec
}
pub fn to_volume_ec_shard_information_messages(
&self,
disk_id: u32,
) -> Vec<master_pb::VolumeEcShardInformationMessage> {
let mut ec_index_bits: u32 = 0;
let mut shard_sizes = Vec::new();
for shard in self.shards.iter().flatten() {
ec_index_bits |= 1u32 << shard.shard_id;
shard_sizes.push(shard.file_size());
}
if ec_index_bits == 0 {
return Vec::new();
}
vec![master_pb::VolumeEcShardInformationMessage {
id: self.volume_id.0,
collection: self.collection.clone(),
ec_index_bits,
shard_sizes,
disk_type: self.disk_type.to_string(),
expire_at_sec: self.expire_at_sec,
disk_id,
..Default::default()
}]
}
// ---- Shard locations (distributed tracking) ----
/// Set the list of server addresses for a given shard ID.

68
seaweed-volume/src/storage/store.rs

@ -8,6 +8,7 @@ use std::io;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::config::MinFreeSpace;
use crate::pb::master_pb;
use crate::storage::disk_location::DiskLocation;
use crate::storage::erasure_coding::ec_shard::EcVolumeShard;
use crate::storage::erasure_coding::ec_volume::EcVolume;
@ -569,6 +570,49 @@ impl Store {
self.locations.iter().any(|loc| loc.has_ec_volume(vid))
}
pub fn delete_expired_ec_volumes(
&mut self,
) -> (
Vec<master_pb::VolumeEcShardInformationMessage>,
Vec<master_pb::VolumeEcShardInformationMessage>,
) {
let mut ec_shards = Vec::new();
let mut deleted = Vec::new();
for (disk_id, loc) in self.locations.iter_mut().enumerate() {
let mut expired_vids = Vec::new();
for (vid, ec_vol) in loc.ec_volumes() {
if ec_vol.is_time_to_destroy() {
expired_vids.push(*vid);
} else {
ec_shards.extend(
ec_vol.to_volume_ec_shard_information_messages(disk_id as u32),
);
}
}
for vid in expired_vids {
let messages = loc
.find_ec_volume(vid)
.map(|ec_vol| ec_vol.to_volume_ec_shard_information_messages(disk_id as u32))
.unwrap_or_default();
if let Some(mut ec_vol) = loc.remove_ec_volume(vid) {
for _ in 0..ec_vol.shard_count() {
crate::metrics::VOLUME_GAUGE
.with_label_values(&[&ec_vol.collection, "ec_shards"])
.dec();
}
ec_vol.destroy();
deleted.extend(messages);
} else {
ec_shards.extend(messages);
}
}
}
(ec_shards, deleted)
}
/// Remove an EC volume from whichever location has it.
pub fn remove_ec_volume(&mut self, vid: VolumeId) -> Option<EcVolume> {
for loc in &mut self.locations {
@ -1043,6 +1087,30 @@ mod tests {
assert!(store.has_volume(VolumeId(3)));
}
#[test]
fn test_delete_expired_ec_volumes_removes_expired_entries() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut store = make_test_store(&[dir]);
std::fs::write(format!("{}/expired_ec_case_9.ec00", dir), b"expired").unwrap();
store.locations[0]
.mount_ec_shards(VolumeId(9), "expired_ec_case", &[0])
.unwrap();
store
.find_ec_volume_mut(VolumeId(9))
.unwrap()
.expire_at_sec = 1;
let (ec_shards, deleted) = store.delete_expired_ec_volumes();
assert!(ec_shards.is_empty());
assert_eq!(deleted.len(), 1);
assert_eq!(deleted[0].id, 9);
assert!(!store.has_ec_volume(VolumeId(9)));
assert!(!std::path::Path::new(&format!("{}/expired_ec_case_9.ec00", dir)).exists());
}
#[test]
fn test_store_volume_not_found() {
let tmp = TempDir::new().unwrap();

Loading…
Cancel
Save