diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 941600a65..236ab2025 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -2530,6 +2530,9 @@ impl VolumeServer for VolumeGrpcService { _request: Request, ) -> Result, Status> { *self.state.is_stopping.write().unwrap() = true; + self.state.is_heartbeating.store(false, Ordering::Relaxed); + // Wake heartbeat loop to send deregistration. + self.state.volume_state_notify.notify_one(); Ok(Response::new( volume_server_pb::VolumeServerLeaveResponse {}, )) diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index dd589e768..b84651ad1 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -52,6 +52,11 @@ pub async fn run_heartbeat_with_state( loop { for master_addr in &config.master_addresses { + if is_stopping(&state) { + state.is_heartbeating.store(false, Ordering::Relaxed); + info!("Heartbeat stopping"); + return; + } if shutdown_rx.try_recv().is_ok() { state.is_heartbeating.store(false, Ordering::Relaxed); info!("Heartbeat shutting down"); @@ -172,6 +177,10 @@ fn to_grpc_address(master_addr: &str) -> String { format!("http://{}", master_addr) } +fn is_stopping(state: &VolumeServerState) -> bool { + *state.is_stopping.read().unwrap() +} + /// Perform one heartbeat session with a master server. async fn do_heartbeat( config: &HeartbeatConfig, @@ -208,6 +217,12 @@ async fn do_heartbeat( let mut response_stream = client.send_heartbeat(stream).await?.into_inner(); info!("Heartbeat stream established with {}", grpc_addr); + if is_stopping(state) { + state.is_heartbeating.store(false, Ordering::Relaxed); + send_deregister_heartbeat(config, state, &tx).await; + info!("Heartbeat stopping"); + return Ok(None); + } state.is_heartbeating.store(true, Ordering::Relaxed); let mut volume_tick = tokio::time::interval(pulse); @@ -262,6 +277,12 @@ async fn do_heartbeat( } _ = state.volume_state_notify.notified() => { + if is_stopping(state) { + state.is_heartbeating.store(false, Ordering::Relaxed); + send_deregister_heartbeat(config, state, &tx).await; + info!("Heartbeat stopping"); + return Ok(None); + } let current_hb = collect_heartbeat(config, state); let current_volumes: HashMap = current_hb.volumes.iter().map(|v| (v.id, v.clone())).collect(); @@ -317,27 +338,7 @@ async fn do_heartbeat( _ = shutdown_rx.recv() => { state.is_heartbeating.store(false, Ordering::Relaxed); - let empty = { - let store = state.store.read().unwrap(); - let (location_uuids, disk_tags) = collect_location_metadata(&store); - master_pb::Heartbeat { - id: store.id.clone(), - ip: config.ip.clone(), - port: config.port as u32, - public_url: config.public_url.clone(), - max_file_key: 0, - data_center: config.data_center.clone(), - rack: config.rack.clone(), - has_no_volumes: true, - has_no_ec_shards: true, - grpc_port: config.grpc_port as u32, - location_uuids, - disk_tags, - ..Default::default() - } - }; - let _ = tx.send(empty).await; - tokio::time::sleep(Duration::from_millis(200)).await; + send_deregister_heartbeat(config, state, &tx).await; info!("Sent deregistration heartbeat"); return Ok(None); } @@ -345,6 +346,34 @@ async fn do_heartbeat( } } +async fn send_deregister_heartbeat( + config: &HeartbeatConfig, + state: &Arc, + tx: &tokio::sync::mpsc::Sender, +) { + let empty = { + let store = state.store.read().unwrap(); + let (location_uuids, disk_tags) = collect_location_metadata(&store); + master_pb::Heartbeat { + id: store.id.clone(), + ip: config.ip.clone(), + port: config.port as u32, + public_url: config.public_url.clone(), + max_file_key: 0, + data_center: config.data_center.clone(), + rack: config.rack.clone(), + has_no_volumes: true, + has_no_ec_shards: true, + grpc_port: config.grpc_port as u32, + location_uuids, + disk_tags, + ..Default::default() + } + }; + let _ = tx.send(empty).await; + tokio::time::sleep(Duration::from_millis(200)).await; +} + fn apply_metrics_push_settings( state: &VolumeServerState, address: &str,