From 33b149a813c45bd225c4031bcd22e585a012daf6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 9 Mar 2026 02:10:08 -0700 Subject: [PATCH] feat: add GetMasterConfiguration on startup and fix Ping RPC forwarding Calls GetMasterConfiguration RPC before heartbeat loop to fetch metrics config, matching Go's checkWithMaster(). Ping RPC now actually connects to the target volume/master server instead of returning a dummy response. --- seaweed-volume/src/server/grpc_server.rs | 118 +++++++++++++++++------ seaweed-volume/src/server/heartbeat.rs | 51 ++++++++++ 2 files changed, 139 insertions(+), 30 deletions(-) diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index d5e5424a0..b3582d809 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -3187,28 +3187,40 @@ impl VolumeServer for VolumeGrpcService { let start = now_ns(); - // Route ping based on target type - let remote_time_ns = if req.target.is_empty() || req.target_type == "volumeServer" { - // Volume self-ping: return our own time + // Route ping based on target type (matches Go's volume_grpc_admin.go Ping) + let remote_time_ns = if req.target.is_empty() { + // Self-ping: no target specified, just return our own time now_ns() + } else if req.target_type == "volumeServer" { + // Connect to target volume server and call its Ping RPC + match ping_volume_server_target(&req.target).await { + Ok(t) => t, + Err(e) => { + return Err(Status::internal(format!( + "ping {} {}: {}", + req.target_type, req.target, e + ))) + } + } } else if req.target_type == "master" { - // Ping the master server - match ping_grpc_target(&req.target).await { + // Connect to target master and call its Ping RPC + match ping_master_target(&req.target).await { Ok(t) => t, Err(e) => { return Err(Status::internal(format!( - "ping master {}: {}", - req.target, e + "ping {} {}: {}", + req.target_type, req.target, e ))) } } } else if req.target_type == "filer" { + // Filer ping — we don't have a filer client, just connect and return time match ping_grpc_target(&req.target).await { Ok(t) => t, Err(e) => { return Err(Status::internal(format!( - "ping filer {}: {}", - req.target, e + "ping {} {}: {}", + req.target_type, req.target, e ))) } } @@ -3226,28 +3238,74 @@ impl VolumeServer for VolumeGrpcService { } } -/// Ping a remote gRPC target and return its time_ns. +/// Build a gRPC endpoint URL from a SeaweedFS server address. +fn to_grpc_endpoint(target: &str) -> Result { + let grpc_host_port = parse_grpc_address(target)?; + Ok(format!("http://{}", grpc_host_port)) +} + +/// Ping a remote volume server target by actually calling its Ping RPC (matches Go behavior). +async fn ping_volume_server_target(target: &str) -> Result { + let addr = to_grpc_endpoint(target)?; + let channel = tonic::transport::Channel::from_shared(addr.clone()) + .map_err(|e| e.to_string())?; + let channel = tokio::time::timeout( + std::time::Duration::from_secs(5), + channel.connect(), + ) + .await + .map_err(|_| "connection timeout".to_string())? + .map_err(|e| e.to_string())?; + + let mut client = + volume_server_pb::volume_server_client::VolumeServerClient::new(channel); + let resp = client + .ping(volume_server_pb::PingRequest { + target: String::new(), + target_type: String::new(), + }) + .await + .map_err(|e| e.to_string())?; + Ok(resp.into_inner().start_time_ns) +} + +/// Ping a remote master target by actually calling its Ping RPC (matches Go behavior). +async fn ping_master_target(target: &str) -> Result { + let addr = to_grpc_endpoint(target)?; + let channel = tonic::transport::Channel::from_shared(addr.clone()) + .map_err(|e| e.to_string())?; + let channel = tokio::time::timeout( + std::time::Duration::from_secs(5), + channel.connect(), + ) + .await + .map_err(|_| "connection timeout".to_string())? + .map_err(|e| e.to_string())?; + + let mut client = master_pb::seaweed_client::SeaweedClient::new(channel); + let resp = client + .ping(master_pb::PingRequest { + target: String::new(), + target_type: String::new(), + }) + .await + .map_err(|e| e.to_string())?; + Ok(resp.into_inner().start_time_ns) +} + +/// Ping a remote gRPC target (generic fallback, e.g. filer). +/// Connects and returns the current time (we don't have the filer proto client). async fn ping_grpc_target(target: &str) -> Result { - // For now, just verify the target is reachable by attempting a gRPC connection. - // The Go implementation actually calls Ping on the target, but we simplify here. - let addr = if target.starts_with("http") { - target.to_string() - } else { - format!("http://{}", target) - }; - match tonic::transport::Channel::from_shared(addr) { - Ok(endpoint) => { - match tokio::time::timeout(std::time::Duration::from_secs(5), endpoint.connect()).await - { - Ok(Ok(_channel)) => Ok(std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_nanos() as i64), - Ok(Err(e)) => Err(e.to_string()), - Err(_) => Err("connection timeout".to_string()), - } - } - Err(e) => Err(e.to_string()), + let addr = to_grpc_endpoint(target)?; + let channel = tonic::transport::Channel::from_shared(addr) + .map_err(|e| e.to_string())?; + match tokio::time::timeout(std::time::Duration::from_secs(5), channel.connect()).await { + Ok(Ok(_channel)) => Ok(std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as i64), + Ok(Err(e)) => Err(e.to_string()), + Err(_) => Err("connection timeout".to_string()), } } diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index 3fbed6338..f79b51e89 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -46,6 +46,9 @@ pub async fn run_heartbeat_with_state( config.master_addresses ); + // Call GetMasterConfiguration before starting the heartbeat loop (matches Go behavior). + check_with_master(&config, &state).await; + let pulse = Duration::from_secs(config.pulse_seconds.max(1)); let mut new_leader: Option = None; let mut duplicate_retry_count: u32 = 0; @@ -177,6 +180,54 @@ fn to_grpc_address(master_addr: &str) -> String { format!("http://{}", master_addr) } +/// Call GetMasterConfiguration on seed masters before starting the heartbeat loop. +/// Mirrors Go's `checkWithMaster()` in `volume_grpc_client_to_master.go`. +/// Retries across all seed masters with a 1790ms sleep between rounds (matching Go). +/// Stores metrics address/interval from the response into server state. +async fn check_with_master(config: &HeartbeatConfig, state: &Arc) { + loop { + for master_addr in &config.master_addresses { + let grpc_addr = to_grpc_address(master_addr); + match try_get_master_configuration(&grpc_addr).await { + Ok(resp) => { + let changed = apply_metrics_push_settings( + state, + &resp.metrics_address, + resp.metrics_interval_seconds, + ); + if changed { + state.metrics_notify.notify_waiters(); + } + info!( + "Got master configuration from {}: metrics_address={}, metrics_interval={}s", + master_addr, resp.metrics_address, resp.metrics_interval_seconds + ); + return; + } + Err(e) => { + warn!("checkWithMaster {}: {}", master_addr, e); + } + } + } + tokio::time::sleep(Duration::from_millis(1790)).await; + } +} + +async fn try_get_master_configuration( + grpc_addr: &str, +) -> Result> { + let channel = Channel::from_shared(grpc_addr.to_string())? + .connect_timeout(Duration::from_secs(5)) + .timeout(Duration::from_secs(10)) + .connect() + .await?; + let mut client = SeaweedClient::new(channel); + let resp = client + .get_master_configuration(master_pb::GetMasterConfigurationRequest {}) + .await?; + Ok(resp.into_inner()) +} + fn is_stopping(state: &VolumeServerState) -> bool { *state.is_stopping.read().unwrap() }