diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index d5e5424a0..b3582d809 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -3187,28 +3187,40 @@ impl VolumeServer for VolumeGrpcService { let start = now_ns(); - // Route ping based on target type - let remote_time_ns = if req.target.is_empty() || req.target_type == "volumeServer" { - // Volume self-ping: return our own time + // Route ping based on target type (matches Go's volume_grpc_admin.go Ping) + let remote_time_ns = if req.target.is_empty() { + // Self-ping: no target specified, just return our own time now_ns() + } else if req.target_type == "volumeServer" { + // Connect to target volume server and call its Ping RPC + match ping_volume_server_target(&req.target).await { + Ok(t) => t, + Err(e) => { + return Err(Status::internal(format!( + "ping {} {}: {}", + req.target_type, req.target, e + ))) + } + } } else if req.target_type == "master" { - // Ping the master server - match ping_grpc_target(&req.target).await { + // Connect to target master and call its Ping RPC + match ping_master_target(&req.target).await { Ok(t) => t, Err(e) => { return Err(Status::internal(format!( - "ping master {}: {}", - req.target, e + "ping {} {}: {}", + req.target_type, req.target, e ))) } } } else if req.target_type == "filer" { + // Filer ping — we don't have a filer client, just connect and return time match ping_grpc_target(&req.target).await { Ok(t) => t, Err(e) => { return Err(Status::internal(format!( - "ping filer {}: {}", - req.target, e + "ping {} {}: {}", + req.target_type, req.target, e ))) } } @@ -3226,28 +3238,74 @@ impl VolumeServer for VolumeGrpcService { } } -/// Ping a remote gRPC target and return its time_ns. +/// Build a gRPC endpoint URL from a SeaweedFS server address. +fn to_grpc_endpoint(target: &str) -> Result { + let grpc_host_port = parse_grpc_address(target)?; + Ok(format!("http://{}", grpc_host_port)) +} + +/// Ping a remote volume server target by actually calling its Ping RPC (matches Go behavior). +async fn ping_volume_server_target(target: &str) -> Result { + let addr = to_grpc_endpoint(target)?; + let channel = tonic::transport::Channel::from_shared(addr.clone()) + .map_err(|e| e.to_string())?; + let channel = tokio::time::timeout( + std::time::Duration::from_secs(5), + channel.connect(), + ) + .await + .map_err(|_| "connection timeout".to_string())? + .map_err(|e| e.to_string())?; + + let mut client = + volume_server_pb::volume_server_client::VolumeServerClient::new(channel); + let resp = client + .ping(volume_server_pb::PingRequest { + target: String::new(), + target_type: String::new(), + }) + .await + .map_err(|e| e.to_string())?; + Ok(resp.into_inner().start_time_ns) +} + +/// Ping a remote master target by actually calling its Ping RPC (matches Go behavior). +async fn ping_master_target(target: &str) -> Result { + let addr = to_grpc_endpoint(target)?; + let channel = tonic::transport::Channel::from_shared(addr.clone()) + .map_err(|e| e.to_string())?; + let channel = tokio::time::timeout( + std::time::Duration::from_secs(5), + channel.connect(), + ) + .await + .map_err(|_| "connection timeout".to_string())? + .map_err(|e| e.to_string())?; + + let mut client = master_pb::seaweed_client::SeaweedClient::new(channel); + let resp = client + .ping(master_pb::PingRequest { + target: String::new(), + target_type: String::new(), + }) + .await + .map_err(|e| e.to_string())?; + Ok(resp.into_inner().start_time_ns) +} + +/// Ping a remote gRPC target (generic fallback, e.g. filer). +/// Connects and returns the current time (we don't have the filer proto client). async fn ping_grpc_target(target: &str) -> Result { - // For now, just verify the target is reachable by attempting a gRPC connection. - // The Go implementation actually calls Ping on the target, but we simplify here. - let addr = if target.starts_with("http") { - target.to_string() - } else { - format!("http://{}", target) - }; - match tonic::transport::Channel::from_shared(addr) { - Ok(endpoint) => { - match tokio::time::timeout(std::time::Duration::from_secs(5), endpoint.connect()).await - { - Ok(Ok(_channel)) => Ok(std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_nanos() as i64), - Ok(Err(e)) => Err(e.to_string()), - Err(_) => Err("connection timeout".to_string()), - } - } - Err(e) => Err(e.to_string()), + let addr = to_grpc_endpoint(target)?; + let channel = tonic::transport::Channel::from_shared(addr) + .map_err(|e| e.to_string())?; + match tokio::time::timeout(std::time::Duration::from_secs(5), channel.connect()).await { + Ok(Ok(_channel)) => Ok(std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as i64), + Ok(Err(e)) => Err(e.to_string()), + Err(_) => Err("connection timeout".to_string()), } } diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index 3fbed6338..f79b51e89 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -46,6 +46,9 @@ pub async fn run_heartbeat_with_state( config.master_addresses ); + // Call GetMasterConfiguration before starting the heartbeat loop (matches Go behavior). + check_with_master(&config, &state).await; + let pulse = Duration::from_secs(config.pulse_seconds.max(1)); let mut new_leader: Option = None; let mut duplicate_retry_count: u32 = 0; @@ -177,6 +180,54 @@ fn to_grpc_address(master_addr: &str) -> String { format!("http://{}", master_addr) } +/// Call GetMasterConfiguration on seed masters before starting the heartbeat loop. +/// Mirrors Go's `checkWithMaster()` in `volume_grpc_client_to_master.go`. +/// Retries across all seed masters with a 1790ms sleep between rounds (matching Go). +/// Stores metrics address/interval from the response into server state. +async fn check_with_master(config: &HeartbeatConfig, state: &Arc) { + loop { + for master_addr in &config.master_addresses { + let grpc_addr = to_grpc_address(master_addr); + match try_get_master_configuration(&grpc_addr).await { + Ok(resp) => { + let changed = apply_metrics_push_settings( + state, + &resp.metrics_address, + resp.metrics_interval_seconds, + ); + if changed { + state.metrics_notify.notify_waiters(); + } + info!( + "Got master configuration from {}: metrics_address={}, metrics_interval={}s", + master_addr, resp.metrics_address, resp.metrics_interval_seconds + ); + return; + } + Err(e) => { + warn!("checkWithMaster {}: {}", master_addr, e); + } + } + } + tokio::time::sleep(Duration::from_millis(1790)).await; + } +} + +async fn try_get_master_configuration( + grpc_addr: &str, +) -> Result> { + let channel = Channel::from_shared(grpc_addr.to_string())? + .connect_timeout(Duration::from_secs(5)) + .timeout(Duration::from_secs(10)) + .connect() + .await?; + let mut client = SeaweedClient::new(channel); + let resp = client + .get_master_configuration(master_pb::GetMasterConfigurationRequest {}) + .await?; + Ok(resp.into_inner()) +} + fn is_stopping(state: &VolumeServerState) -> bool { *state.is_stopping.read().unwrap() }