Browse Source

feat: add GetMasterConfiguration on startup and fix Ping RPC forwarding

Calls GetMasterConfiguration RPC before heartbeat loop to fetch metrics
config, matching Go's checkWithMaster(). Ping RPC now actually connects
to the target volume/master server instead of returning a dummy response.
rust-volume-server
Chris Lu 16 hours ago
parent
commit
33b149a813
  1. 118
      seaweed-volume/src/server/grpc_server.rs
  2. 51
      seaweed-volume/src/server/heartbeat.rs

118
seaweed-volume/src/server/grpc_server.rs

@ -3187,28 +3187,40 @@ impl VolumeServer for VolumeGrpcService {
let start = now_ns();
// Route ping based on target type
let remote_time_ns = if req.target.is_empty() || req.target_type == "volumeServer" {
// Volume self-ping: return our own time
// Route ping based on target type (matches Go's volume_grpc_admin.go Ping)
let remote_time_ns = if req.target.is_empty() {
// Self-ping: no target specified, just return our own time
now_ns()
} else if req.target_type == "volumeServer" {
// Connect to target volume server and call its Ping RPC
match ping_volume_server_target(&req.target).await {
Ok(t) => t,
Err(e) => {
return Err(Status::internal(format!(
"ping {} {}: {}",
req.target_type, req.target, e
)))
}
}
} else if req.target_type == "master" {
// Ping the master server
match ping_grpc_target(&req.target).await {
// Connect to target master and call its Ping RPC
match ping_master_target(&req.target).await {
Ok(t) => t,
Err(e) => {
return Err(Status::internal(format!(
"ping master {}: {}",
req.target, e
"ping {} {}: {}",
req.target_type, req.target, e
)))
}
}
} else if req.target_type == "filer" {
// Filer ping — we don't have a filer client, just connect and return time
match ping_grpc_target(&req.target).await {
Ok(t) => t,
Err(e) => {
return Err(Status::internal(format!(
"ping filer {}: {}",
req.target, e
"ping {} {}: {}",
req.target_type, req.target, e
)))
}
}
@ -3226,28 +3238,74 @@ impl VolumeServer for VolumeGrpcService {
}
}
/// Ping a remote gRPC target and return its time_ns.
/// Build a gRPC endpoint URL from a SeaweedFS server address.
fn to_grpc_endpoint(target: &str) -> Result<String, String> {
let grpc_host_port = parse_grpc_address(target)?;
Ok(format!("http://{}", grpc_host_port))
}
/// Ping a remote volume server target by actually calling its Ping RPC (matches Go behavior).
async fn ping_volume_server_target(target: &str) -> Result<i64, String> {
let addr = to_grpc_endpoint(target)?;
let channel = tonic::transport::Channel::from_shared(addr.clone())
.map_err(|e| e.to_string())?;
let channel = tokio::time::timeout(
std::time::Duration::from_secs(5),
channel.connect(),
)
.await
.map_err(|_| "connection timeout".to_string())?
.map_err(|e| e.to_string())?;
let mut client =
volume_server_pb::volume_server_client::VolumeServerClient::new(channel);
let resp = client
.ping(volume_server_pb::PingRequest {
target: String::new(),
target_type: String::new(),
})
.await
.map_err(|e| e.to_string())?;
Ok(resp.into_inner().start_time_ns)
}
/// Ping a remote master target by actually calling its Ping RPC (matches Go behavior).
async fn ping_master_target(target: &str) -> Result<i64, String> {
let addr = to_grpc_endpoint(target)?;
let channel = tonic::transport::Channel::from_shared(addr.clone())
.map_err(|e| e.to_string())?;
let channel = tokio::time::timeout(
std::time::Duration::from_secs(5),
channel.connect(),
)
.await
.map_err(|_| "connection timeout".to_string())?
.map_err(|e| e.to_string())?;
let mut client = master_pb::seaweed_client::SeaweedClient::new(channel);
let resp = client
.ping(master_pb::PingRequest {
target: String::new(),
target_type: String::new(),
})
.await
.map_err(|e| e.to_string())?;
Ok(resp.into_inner().start_time_ns)
}
/// Ping a remote gRPC target (generic fallback, e.g. filer).
/// Connects and returns the current time (we don't have the filer proto client).
async fn ping_grpc_target(target: &str) -> Result<i64, String> {
// For now, just verify the target is reachable by attempting a gRPC connection.
// The Go implementation actually calls Ping on the target, but we simplify here.
let addr = if target.starts_with("http") {
target.to_string()
} else {
format!("http://{}", target)
};
match tonic::transport::Channel::from_shared(addr) {
Ok(endpoint) => {
match tokio::time::timeout(std::time::Duration::from_secs(5), endpoint.connect()).await
{
Ok(Ok(_channel)) => Ok(std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as i64),
Ok(Err(e)) => Err(e.to_string()),
Err(_) => Err("connection timeout".to_string()),
}
}
Err(e) => Err(e.to_string()),
let addr = to_grpc_endpoint(target)?;
let channel = tonic::transport::Channel::from_shared(addr)
.map_err(|e| e.to_string())?;
match tokio::time::timeout(std::time::Duration::from_secs(5), channel.connect()).await {
Ok(Ok(_channel)) => Ok(std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as i64),
Ok(Err(e)) => Err(e.to_string()),
Err(_) => Err("connection timeout".to_string()),
}
}

51
seaweed-volume/src/server/heartbeat.rs

@ -46,6 +46,9 @@ pub async fn run_heartbeat_with_state(
config.master_addresses
);
// Call GetMasterConfiguration before starting the heartbeat loop (matches Go behavior).
check_with_master(&config, &state).await;
let pulse = Duration::from_secs(config.pulse_seconds.max(1));
let mut new_leader: Option<String> = None;
let mut duplicate_retry_count: u32 = 0;
@ -177,6 +180,54 @@ fn to_grpc_address(master_addr: &str) -> String {
format!("http://{}", master_addr)
}
/// Call GetMasterConfiguration on seed masters before starting the heartbeat loop.
/// Mirrors Go's `checkWithMaster()` in `volume_grpc_client_to_master.go`.
/// Retries across all seed masters with a 1790ms sleep between rounds (matching Go).
/// Stores metrics address/interval from the response into server state.
async fn check_with_master(config: &HeartbeatConfig, state: &Arc<VolumeServerState>) {
loop {
for master_addr in &config.master_addresses {
let grpc_addr = to_grpc_address(master_addr);
match try_get_master_configuration(&grpc_addr).await {
Ok(resp) => {
let changed = apply_metrics_push_settings(
state,
&resp.metrics_address,
resp.metrics_interval_seconds,
);
if changed {
state.metrics_notify.notify_waiters();
}
info!(
"Got master configuration from {}: metrics_address={}, metrics_interval={}s",
master_addr, resp.metrics_address, resp.metrics_interval_seconds
);
return;
}
Err(e) => {
warn!("checkWithMaster {}: {}", master_addr, e);
}
}
}
tokio::time::sleep(Duration::from_millis(1790)).await;
}
}
async fn try_get_master_configuration(
grpc_addr: &str,
) -> Result<master_pb::GetMasterConfigurationResponse, Box<dyn std::error::Error>> {
let channel = Channel::from_shared(grpc_addr.to_string())?
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(10))
.connect()
.await?;
let mut client = SeaweedClient::new(channel);
let resp = client
.get_master_configuration(master_pb::GetMasterConfigurationRequest {})
.await?;
Ok(resp.into_inner())
}
fn is_stopping(state: &VolumeServerState) -> bool {
*state.is_stopping.read().unwrap()
}

Loading…
Cancel
Save