From 57b382c1057d299c8baa59f8662cc174424fab71 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Mar 2026 16:02:19 -0700 Subject: [PATCH] Delete remote tier data on volume destroy --- seaweed-volume/src/remote_storage/s3_tier.rs | 45 +++++++++++++++++++- seaweed-volume/src/server/heartbeat.rs | 44 ++++++++++++++++--- seaweed-volume/src/storage/volume.rs | 25 +++++++++++ 3 files changed, 106 insertions(+), 8 deletions(-) diff --git a/seaweed-volume/src/remote_storage/s3_tier.rs b/seaweed-volume/src/remote_storage/s3_tier.rs index 97444826c..f1b7a69f4 100644 --- a/seaweed-volume/src/remote_storage/s3_tier.rs +++ b/seaweed-volume/src/remote_storage/s3_tier.rs @@ -4,7 +4,8 @@ //! matching the Go SeaweedFS S3 backend behavior. use std::collections::HashMap; -use std::sync::Arc; +use std::future::Future; +use std::sync::{Arc, OnceLock, RwLock}; use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region}; use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; @@ -377,6 +378,22 @@ impl S3TierBackend { .map_err(|e| format!("failed to delete object {}: {}", key, e))?; Ok(()) } + + pub fn delete_file_blocking(&self, key: &str) -> Result<(), String> { + let client = self.client.clone(); + let bucket = self.bucket.clone(); + let key = key.to_string(); + block_on_tier_future(async move { + client + .delete_object() + .bucket(&bucket) + .key(&key) + .send() + .await + .map_err(|e| format!("failed to delete object {}: {}", key, e))?; + Ok(()) + }) + } } /// Parse a backend name like "s3" or "s3.default" into (backend_type, backend_id). @@ -417,4 +434,30 @@ impl S3TierRegistry { pub fn names(&self) -> Vec { self.backends.keys().cloned().collect() } + + pub fn clear(&mut self) { + self.backends.clear(); + } +} + +static GLOBAL_S3_TIER_REGISTRY: OnceLock> = OnceLock::new(); + +pub fn global_s3_tier_registry() -> &'static RwLock { + GLOBAL_S3_TIER_REGISTRY.get_or_init(|| RwLock::new(S3TierRegistry::new())) +} + +fn block_on_tier_future(future: F) -> Result +where + F: Future> + Send + 'static, + T: Send + 'static, +{ + std::thread::spawn(move || { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| format!("failed to build tokio runtime: {}", e))?; + runtime.block_on(future) + }) + .join() + .map_err(|_| "tier runtime thread panicked".to_string())? } diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index f243de27c..b1aa0a992 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -478,6 +478,9 @@ fn apply_storage_backends( } let mut registry = state.s3_tier_registry.write().unwrap(); + let mut global_registry = crate::remote_storage::s3_tier::global_s3_tier_registry() + .write() + .unwrap(); for backend in storage_backends { if backend.r#type != "s3" { continue; @@ -505,13 +508,23 @@ fn apply_storage_backends( } else { backend.id.as_str() }; - let qualified_name = format!("{}.{}", backend.r#type, backend_id); - if registry.get(&qualified_name).is_none() { - registry.register(qualified_name, S3TierBackend::new(&config)); - } - if backend_id == "default" && registry.get(&backend.r#type).is_none() { - registry.register(backend.r#type.clone(), S3TierBackend::new(&config)); - } + register_s3_backend(&mut registry, backend, backend_id, &config); + register_s3_backend(&mut global_registry, backend, backend_id, &config); + } +} + +fn register_s3_backend( + registry: &mut crate::remote_storage::s3_tier::S3TierRegistry, + backend: &master_pb::StorageBackend, + backend_id: &str, + config: &S3TierConfig, +) { + let qualified_name = format!("{}.{}", backend.r#type, backend_id); + if registry.get(&qualified_name).is_none() { + registry.register(qualified_name, S3TierBackend::new(config)); + } + if backend_id == "default" && registry.get(&backend.r#type).is_none() { + registry.register(backend.r#type.clone(), S3TierBackend::new(config)); } } @@ -821,6 +834,10 @@ mod tests { #[test] fn test_apply_storage_backends_registers_s3_default_aliases() { let state = test_state_with_store(Store::new(NeedleMapKind::InMemory)); + crate::remote_storage::s3_tier::global_s3_tier_registry() + .write() + .unwrap() + .clear(); apply_storage_backends( &state, @@ -842,11 +859,20 @@ mod tests { let registry = state.s3_tier_registry.read().unwrap(); assert!(registry.get("s3.default").is_some()); assert!(registry.get("s3").is_some()); + let global_registry = crate::remote_storage::s3_tier::global_s3_tier_registry() + .read() + .unwrap(); + assert!(global_registry.get("s3.default").is_some()); + assert!(global_registry.get("s3").is_some()); } #[test] fn test_apply_storage_backends_ignores_unsupported_types() { let state = test_state_with_store(Store::new(NeedleMapKind::InMemory)); + crate::remote_storage::s3_tier::global_s3_tier_registry() + .write() + .unwrap() + .clear(); apply_storage_backends( &state, @@ -859,6 +885,10 @@ mod tests { let registry = state.s3_tier_registry.read().unwrap(); assert!(registry.names().is_empty()); + let global_registry = crate::remote_storage::s3_tier::global_s3_tier_registry() + .read() + .unwrap(); + assert!(global_registry.names().is_empty()); } #[test] diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 9a6fed451..0c4935539 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -2143,6 +2143,31 @@ impl Volume { format!("volume {} is compacting", self.id), ))); } + + let (storage_name, storage_key) = self.remote_storage_name_key(); + if self.has_remote_file && !storage_name.is_empty() && !storage_key.is_empty() { + let backend = crate::remote_storage::s3_tier::global_s3_tier_registry() + .read() + .unwrap() + .get(&storage_name); + if let Some(backend) = backend { + if let Err(e) = backend.delete_file_blocking(&storage_key) { + warn!( + volume_id = self.id.0, + storage_name, + storage_key, + error = %e, + "failed to delete remote tier file during destroy" + ); + } + } else { + warn!( + volume_id = self.id.0, + storage_name, storage_key, "remote tier backend not found during destroy" + ); + } + } + self.close(); remove_volume_files(&self.data_file_name()); remove_volume_files(&self.index_file_name());