Browse Source

Load tier backends from master config

rust-volume-server
Chris Lu 3 days ago
parent
commit
24106ea8da
  1. 102
      seaweed-volume/src/server/heartbeat.rs

102
seaweed-volume/src/server/heartbeat.rs

@ -15,6 +15,7 @@ use super::grpc_client::{build_grpc_endpoint, GRPC_MAX_MESSAGE_SIZE};
use super::volume_server::VolumeServerState;
use crate::pb::master_pb;
use crate::pb::master_pb::seaweed_client::SeaweedClient;
use crate::remote_storage::s3_tier::{S3TierBackend, S3TierConfig};
use crate::storage::store::Store;
use crate::storage::types::NeedleId;
@ -198,6 +199,7 @@ async fn check_with_master(config: &HeartbeatConfig, state: &Arc<VolumeServerSta
if changed {
state.metrics_notify.notify_waiters();
}
apply_storage_backends(state, &resp.storage_backends);
info!(
"Got master configuration from {}: metrics_address={}, metrics_interval={}s",
master_addr, resp.metrics_address, resp.metrics_interval_seconds
@ -466,6 +468,63 @@ fn apply_metrics_push_settings(
true
}
fn apply_storage_backends(
state: &VolumeServerState,
storage_backends: &[master_pb::StorageBackend],
) {
if storage_backends.is_empty() {
return;
}
let mut registry = state.s3_tier_registry.write().unwrap();
for backend in storage_backends {
if backend.r#type != "s3" {
continue;
}
let properties = &backend.properties;
let config = S3TierConfig {
access_key: properties
.get("aws_access_key_id")
.cloned()
.unwrap_or_default(),
secret_key: properties
.get("aws_secret_access_key")
.cloned()
.unwrap_or_default(),
region: properties.get("region").cloned().unwrap_or_default(),
bucket: properties.get("bucket").cloned().unwrap_or_default(),
endpoint: properties.get("endpoint").cloned().unwrap_or_default(),
storage_class: properties.get("storage_class").cloned().unwrap_or_default(),
force_path_style: parse_bool_property(properties.get("force_path_style")),
};
let backend_id = if backend.id.is_empty() {
"default"
} else {
backend.id.as_str()
};
let qualified_name = format!("{}.{}", backend.r#type, backend_id);
if registry.get(&qualified_name).is_none() {
registry.register(qualified_name, S3TierBackend::new(&config));
}
if backend_id == "default" && registry.get(&backend.r#type).is_none() {
registry.register(backend.r#type.clone(), S3TierBackend::new(&config));
}
}
}
fn parse_bool_property(value: Option<&String>) -> bool {
value
.map(|v| {
matches!(
v.trim().to_ascii_lowercase().as_str(),
"1" | "t" | "true" | "y" | "yes" | "on"
)
})
.unwrap_or(true)
}
/// Collect volume information into a Heartbeat message.
fn collect_heartbeat(
config: &HeartbeatConfig,
@ -758,6 +817,49 @@ mod tests {
assert!(heartbeat.has_no_volumes);
}
#[test]
fn test_apply_storage_backends_registers_s3_default_aliases() {
let state = test_state_with_store(Store::new(NeedleMapKind::InMemory));
apply_storage_backends(
&state,
&[master_pb::StorageBackend {
r#type: "s3".to_string(),
id: "default".to_string(),
properties: std::collections::HashMap::from([
("aws_access_key_id".to_string(), "access".to_string()),
("aws_secret_access_key".to_string(), "secret".to_string()),
("bucket".to_string(), "bucket-a".to_string()),
("region".to_string(), "us-west-2".to_string()),
("endpoint".to_string(), "http://127.0.0.1:8333".to_string()),
("storage_class".to_string(), "STANDARD".to_string()),
("force_path_style".to_string(), "false".to_string()),
]),
}],
);
let registry = state.s3_tier_registry.read().unwrap();
assert!(registry.get("s3.default").is_some());
assert!(registry.get("s3").is_some());
}
#[test]
fn test_apply_storage_backends_ignores_unsupported_types() {
let state = test_state_with_store(Store::new(NeedleMapKind::InMemory));
apply_storage_backends(
&state,
&[master_pb::StorageBackend {
r#type: "rclone".to_string(),
id: "default".to_string(),
properties: std::collections::HashMap::new(),
}],
);
let registry = state.s3_tier_registry.read().unwrap();
assert!(registry.names().is_empty());
}
#[test]
fn test_apply_metrics_push_settings_updates_runtime_state() {
let store = Store::new(NeedleMapKind::InMemory);

Loading…
Cancel
Save