From fb028556add95a8346a6627f9188e41e9396c816 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Mar 2026 18:23:51 -0700 Subject: [PATCH] Match Go heartbeat: keep is_heartbeating on error, add EC shard identification --- seaweed-volume/src/server/heartbeat.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index b1aa0a992..53e3b061b 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -100,7 +100,6 @@ pub async fn run_heartbeat_with_state( let err_msg = e.to_string(); // Drop `e` (non-Send) before any .await drop(e); - state.is_heartbeating.store(false, Ordering::Relaxed); warn!("Heartbeat to {} error: {}", grpc_addr, err_msg); if err_msg.contains("duplicate") && err_msg.contains("UUID") { @@ -276,7 +275,7 @@ async fn do_heartbeat( // the server won't send response headers until it receives the first message, // but send_heartbeat().await waits for response headers. tx.send(initial_hb).await?; - tx.send(collect_ec_heartbeat(state)).await?; + tx.send(collect_ec_heartbeat(config, state)).await?; let stream = tokio_stream::wrappers::ReceiverStream::new(rx); let mut response_stream = client.send_heartbeat(stream).await?.into_inner(); @@ -350,7 +349,7 @@ async fn do_heartbeat( } _ = ec_tick.tick() => { - if tx.send(collect_ec_heartbeat(state)).await.is_err() { + if tx.send(collect_ec_heartbeat(config, state)).await.is_err() { return Ok(None); } } @@ -658,7 +657,7 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb } /// Collect EC shard information into a Heartbeat message. -fn collect_ec_heartbeat(state: &Arc) -> master_pb::Heartbeat { +fn collect_ec_heartbeat(config: &HeartbeatConfig, state: &Arc) -> master_pb::Heartbeat { let store = state.store.read().unwrap(); let mut ec_shards = Vec::new(); @@ -683,6 +682,11 @@ fn collect_ec_heartbeat(state: &Arc) -> master_pb::Heartbeat let has_no = ec_shards.is_empty(); master_pb::Heartbeat { + ip: config.ip.clone(), + port: config.port as u32, + grpc_port: config.grpc_port as u32, + data_center: config.data_center.clone(), + rack: config.rack.clone(), ec_shards, has_no_ec_shards: has_no, ..Default::default()