Browse Source

fix: stop heartbeat on leave

rust-volume-server
Chris Lu 4 days ago
parent
commit
8102505c9e
  1. 3
      seaweed-volume/src/server/grpc_server.rs
  2. 71
      seaweed-volume/src/server/heartbeat.rs

3
seaweed-volume/src/server/grpc_server.rs

@ -2530,6 +2530,9 @@ impl VolumeServer for VolumeGrpcService {
_request: Request<volume_server_pb::VolumeServerLeaveRequest>,
) -> Result<Response<volume_server_pb::VolumeServerLeaveResponse>, Status> {
*self.state.is_stopping.write().unwrap() = true;
self.state.is_heartbeating.store(false, Ordering::Relaxed);
// Wake heartbeat loop to send deregistration.
self.state.volume_state_notify.notify_one();
Ok(Response::new(
volume_server_pb::VolumeServerLeaveResponse {},
))

71
seaweed-volume/src/server/heartbeat.rs

@ -52,6 +52,11 @@ pub async fn run_heartbeat_with_state(
loop {
for master_addr in &config.master_addresses {
if is_stopping(&state) {
state.is_heartbeating.store(false, Ordering::Relaxed);
info!("Heartbeat stopping");
return;
}
if shutdown_rx.try_recv().is_ok() {
state.is_heartbeating.store(false, Ordering::Relaxed);
info!("Heartbeat shutting down");
@ -172,6 +177,10 @@ fn to_grpc_address(master_addr: &str) -> String {
format!("http://{}", master_addr)
}
fn is_stopping(state: &VolumeServerState) -> bool {
*state.is_stopping.read().unwrap()
}
/// Perform one heartbeat session with a master server.
async fn do_heartbeat(
config: &HeartbeatConfig,
@ -208,6 +217,12 @@ async fn do_heartbeat(
let mut response_stream = client.send_heartbeat(stream).await?.into_inner();
info!("Heartbeat stream established with {}", grpc_addr);
if is_stopping(state) {
state.is_heartbeating.store(false, Ordering::Relaxed);
send_deregister_heartbeat(config, state, &tx).await;
info!("Heartbeat stopping");
return Ok(None);
}
state.is_heartbeating.store(true, Ordering::Relaxed);
let mut volume_tick = tokio::time::interval(pulse);
@ -262,6 +277,12 @@ async fn do_heartbeat(
}
_ = state.volume_state_notify.notified() => {
if is_stopping(state) {
state.is_heartbeating.store(false, Ordering::Relaxed);
send_deregister_heartbeat(config, state, &tx).await;
info!("Heartbeat stopping");
return Ok(None);
}
let current_hb = collect_heartbeat(config, state);
let current_volumes: HashMap<u32, _> = current_hb.volumes.iter().map(|v| (v.id, v.clone())).collect();
@ -317,27 +338,7 @@ async fn do_heartbeat(
_ = shutdown_rx.recv() => {
state.is_heartbeating.store(false, Ordering::Relaxed);
let empty = {
let store = state.store.read().unwrap();
let (location_uuids, disk_tags) = collect_location_metadata(&store);
master_pb::Heartbeat {
id: store.id.clone(),
ip: config.ip.clone(),
port: config.port as u32,
public_url: config.public_url.clone(),
max_file_key: 0,
data_center: config.data_center.clone(),
rack: config.rack.clone(),
has_no_volumes: true,
has_no_ec_shards: true,
grpc_port: config.grpc_port as u32,
location_uuids,
disk_tags,
..Default::default()
}
};
let _ = tx.send(empty).await;
tokio::time::sleep(Duration::from_millis(200)).await;
send_deregister_heartbeat(config, state, &tx).await;
info!("Sent deregistration heartbeat");
return Ok(None);
}
@ -345,6 +346,34 @@ async fn do_heartbeat(
}
}
async fn send_deregister_heartbeat(
config: &HeartbeatConfig,
state: &Arc<VolumeServerState>,
tx: &tokio::sync::mpsc::Sender<master_pb::Heartbeat>,
) {
let empty = {
let store = state.store.read().unwrap();
let (location_uuids, disk_tags) = collect_location_metadata(&store);
master_pb::Heartbeat {
id: store.id.clone(),
ip: config.ip.clone(),
port: config.port as u32,
public_url: config.public_url.clone(),
max_file_key: 0,
data_center: config.data_center.clone(),
rack: config.rack.clone(),
has_no_volumes: true,
has_no_ec_shards: true,
grpc_port: config.grpc_port as u32,
location_uuids,
disk_tags,
..Default::default()
}
};
let _ = tx.send(empty).await;
tokio::time::sleep(Duration::from_millis(200)).await;
}
fn apply_metrics_push_settings(
state: &VolumeServerState,
address: &str,

Loading…
Cancel
Save