Browse Source

Delete remote tier data on volume destroy

rust-volume-server
Chris Lu 3 days ago
parent
commit
57b382c105
  1. 45
      seaweed-volume/src/remote_storage/s3_tier.rs
  2. 44
      seaweed-volume/src/server/heartbeat.rs
  3. 25
      seaweed-volume/src/storage/volume.rs

45
seaweed-volume/src/remote_storage/s3_tier.rs

@ -4,7 +4,8 @@
//! matching the Go SeaweedFS S3 backend behavior.
use std::collections::HashMap;
use std::sync::Arc;
use std::future::Future;
use std::sync::{Arc, OnceLock, RwLock};
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
@ -377,6 +378,22 @@ impl S3TierBackend {
.map_err(|e| format!("failed to delete object {}: {}", key, e))?;
Ok(())
}
pub fn delete_file_blocking(&self, key: &str) -> Result<(), String> {
let client = self.client.clone();
let bucket = self.bucket.clone();
let key = key.to_string();
block_on_tier_future(async move {
client
.delete_object()
.bucket(&bucket)
.key(&key)
.send()
.await
.map_err(|e| format!("failed to delete object {}: {}", key, e))?;
Ok(())
})
}
}
/// Parse a backend name like "s3" or "s3.default" into (backend_type, backend_id).
@ -417,4 +434,30 @@ impl S3TierRegistry {
pub fn names(&self) -> Vec<String> {
self.backends.keys().cloned().collect()
}
pub fn clear(&mut self) {
self.backends.clear();
}
}
static GLOBAL_S3_TIER_REGISTRY: OnceLock<RwLock<S3TierRegistry>> = OnceLock::new();
pub fn global_s3_tier_registry() -> &'static RwLock<S3TierRegistry> {
GLOBAL_S3_TIER_REGISTRY.get_or_init(|| RwLock::new(S3TierRegistry::new()))
}
fn block_on_tier_future<F, T>(future: F) -> Result<T, String>
where
F: Future<Output = Result<T, String>> + Send + 'static,
T: Send + 'static,
{
std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| format!("failed to build tokio runtime: {}", e))?;
runtime.block_on(future)
})
.join()
.map_err(|_| "tier runtime thread panicked".to_string())?
}

44
seaweed-volume/src/server/heartbeat.rs

@ -478,6 +478,9 @@ fn apply_storage_backends(
}
let mut registry = state.s3_tier_registry.write().unwrap();
let mut global_registry = crate::remote_storage::s3_tier::global_s3_tier_registry()
.write()
.unwrap();
for backend in storage_backends {
if backend.r#type != "s3" {
continue;
@ -505,13 +508,23 @@ fn apply_storage_backends(
} else {
backend.id.as_str()
};
let qualified_name = format!("{}.{}", backend.r#type, backend_id);
if registry.get(&qualified_name).is_none() {
registry.register(qualified_name, S3TierBackend::new(&config));
}
if backend_id == "default" && registry.get(&backend.r#type).is_none() {
registry.register(backend.r#type.clone(), S3TierBackend::new(&config));
}
register_s3_backend(&mut registry, backend, backend_id, &config);
register_s3_backend(&mut global_registry, backend, backend_id, &config);
}
}
fn register_s3_backend(
registry: &mut crate::remote_storage::s3_tier::S3TierRegistry,
backend: &master_pb::StorageBackend,
backend_id: &str,
config: &S3TierConfig,
) {
let qualified_name = format!("{}.{}", backend.r#type, backend_id);
if registry.get(&qualified_name).is_none() {
registry.register(qualified_name, S3TierBackend::new(config));
}
if backend_id == "default" && registry.get(&backend.r#type).is_none() {
registry.register(backend.r#type.clone(), S3TierBackend::new(config));
}
}
@ -821,6 +834,10 @@ mod tests {
#[test]
fn test_apply_storage_backends_registers_s3_default_aliases() {
let state = test_state_with_store(Store::new(NeedleMapKind::InMemory));
crate::remote_storage::s3_tier::global_s3_tier_registry()
.write()
.unwrap()
.clear();
apply_storage_backends(
&state,
@ -842,11 +859,20 @@ mod tests {
let registry = state.s3_tier_registry.read().unwrap();
assert!(registry.get("s3.default").is_some());
assert!(registry.get("s3").is_some());
let global_registry = crate::remote_storage::s3_tier::global_s3_tier_registry()
.read()
.unwrap();
assert!(global_registry.get("s3.default").is_some());
assert!(global_registry.get("s3").is_some());
}
#[test]
fn test_apply_storage_backends_ignores_unsupported_types() {
let state = test_state_with_store(Store::new(NeedleMapKind::InMemory));
crate::remote_storage::s3_tier::global_s3_tier_registry()
.write()
.unwrap()
.clear();
apply_storage_backends(
&state,
@ -859,6 +885,10 @@ mod tests {
let registry = state.s3_tier_registry.read().unwrap();
assert!(registry.names().is_empty());
let global_registry = crate::remote_storage::s3_tier::global_s3_tier_registry()
.read()
.unwrap();
assert!(global_registry.names().is_empty());
}
#[test]

25
seaweed-volume/src/storage/volume.rs

@ -2143,6 +2143,31 @@ impl Volume {
format!("volume {} is compacting", self.id),
)));
}
let (storage_name, storage_key) = self.remote_storage_name_key();
if self.has_remote_file && !storage_name.is_empty() && !storage_key.is_empty() {
let backend = crate::remote_storage::s3_tier::global_s3_tier_registry()
.read()
.unwrap()
.get(&storage_name);
if let Some(backend) = backend {
if let Err(e) = backend.delete_file_blocking(&storage_key) {
warn!(
volume_id = self.id.0,
storage_name,
storage_key,
error = %e,
"failed to delete remote tier file during destroy"
);
}
} else {
warn!(
volume_id = self.id.0,
storage_name, storage_key, "remote tier backend not found during destroy"
);
}
}
self.close();
remove_volume_files(&self.data_file_name());
remove_volume_files(&self.index_file_name());

Loading…
Cancel
Save