diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index 53e3b061b..d698c3292 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -15,6 +15,7 @@ use super::grpc_client::{build_grpc_endpoint, GRPC_MAX_MESSAGE_SIZE}; use super::volume_server::VolumeServerState; use crate::pb::master_pb; use crate::pb::master_pb::seaweed_client::SeaweedClient; +use crate::pb::volume_server_pb; use crate::remote_storage::s3_tier::{S3TierBackend, S3TierConfig}; use crate::storage::store::Store; use crate::storage::types::NeedleId; @@ -395,7 +396,15 @@ async fn do_heartbeat( } } + // Collect current state for state-only or combined delta heartbeats. + // Mirrors Go's StateUpdateChan case which sends state changes immediately. + let current_state = Some(volume_server_pb::VolumeServerState { + maintenance: state.maintenance.load(Ordering::Relaxed), + version: state.state_version.load(Ordering::Relaxed), + }); + if !new_vols.is_empty() || !del_vols.is_empty() { + // Volume delta heartbeat with state let delta_hb = master_pb::Heartbeat { ip: config.ip.clone(), port: config.port as u32, @@ -405,12 +414,28 @@ async fn do_heartbeat( rack: config.rack.clone(), new_volumes: new_vols, deleted_volumes: del_vols, + state: current_state, ..Default::default() }; if tx.send(delta_hb).await.is_err() { return Ok(None); } last_volumes = current_volumes; + } else { + // State-only heartbeat (e.g., MarkReadonly/MarkWritable changed state + // without adding/removing volumes). Mirrors Go's StateUpdateChan case. + let state_hb = master_pb::Heartbeat { + ip: config.ip.clone(), + port: config.port as u32, + grpc_port: config.grpc_port as u32, + data_center: config.data_center.clone(), + rack: config.rack.clone(), + state: current_state, + ..Default::default() + }; + if tx.send(state_hb).await.is_err() { + return Ok(None); + } } }