Browse Source

Match Go heartbeat: send state-only delta on volume state changes

rust-volume-server
Chris Lu 3 days ago
parent
commit
246f91679a
  1. 25
      seaweed-volume/src/server/heartbeat.rs

25
seaweed-volume/src/server/heartbeat.rs

@ -15,6 +15,7 @@ use super::grpc_client::{build_grpc_endpoint, GRPC_MAX_MESSAGE_SIZE};
use super::volume_server::VolumeServerState;
use crate::pb::master_pb;
use crate::pb::master_pb::seaweed_client::SeaweedClient;
use crate::pb::volume_server_pb;
use crate::remote_storage::s3_tier::{S3TierBackend, S3TierConfig};
use crate::storage::store::Store;
use crate::storage::types::NeedleId;
@ -395,7 +396,15 @@ async fn do_heartbeat(
}
}
// Collect current state for state-only or combined delta heartbeats.
// Mirrors Go's StateUpdateChan case which sends state changes immediately.
let current_state = Some(volume_server_pb::VolumeServerState {
maintenance: state.maintenance.load(Ordering::Relaxed),
version: state.state_version.load(Ordering::Relaxed),
});
if !new_vols.is_empty() || !del_vols.is_empty() {
// Volume delta heartbeat with state
let delta_hb = master_pb::Heartbeat {
ip: config.ip.clone(),
port: config.port as u32,
@ -405,12 +414,28 @@ async fn do_heartbeat(
rack: config.rack.clone(),
new_volumes: new_vols,
deleted_volumes: del_vols,
state: current_state,
..Default::default()
};
if tx.send(delta_hb).await.is_err() {
return Ok(None);
}
last_volumes = current_volumes;
} else {
// State-only heartbeat (e.g., MarkReadonly/MarkWritable changed state
// without adding/removing volumes). Mirrors Go's StateUpdateChan case.
let state_hb = master_pb::Heartbeat {
ip: config.ip.clone(),
port: config.port as u32,
grpc_port: config.grpc_port as u32,
data_center: config.data_center.clone(),
rack: config.rack.clone(),
state: current_state,
..Default::default()
};
if tx.send(state_hb).await.is_err() {
return Ok(None);
}
}
}

Loading…
Cancel
Save