diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index f4c111736..20837d8f0 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -107,10 +107,14 @@ async fn do_heartbeat( let (tx, rx) = tokio::sync::mpsc::channel::(32); + // Keep track of what we sent, to generate delta updates + let initial_hb = collect_heartbeat(config, state); + let mut last_volumes: HashMap = initial_hb.volumes.iter().map(|v| (v.id, v.clone())).collect(); + // Send initial heartbeats BEFORE calling send_heartbeat to avoid deadlock: // the server won't send response headers until it receives the first message, // but send_heartbeat().await waits for response headers. - tx.send(collect_heartbeat(config, state)).await?; + tx.send(initial_hb).await?; tx.send(collect_ec_heartbeat(state)).await?; let stream = tokio_stream::wrappers::ReceiverStream::new(rx); @@ -157,7 +161,9 @@ async fn do_heartbeat( } _ = volume_tick.tick() => { - if tx.send(collect_heartbeat(config, state)).await.is_err() { + let current_hb = collect_heartbeat(config, state); + last_volumes = current_hb.volumes.iter().map(|v| (v.id, v.clone())).collect(); + if tx.send(current_hb).await.is_err() { return Ok(None); } } @@ -169,8 +175,54 @@ async fn do_heartbeat( } _ = state.volume_state_notify.notified() => { - if tx.send(collect_heartbeat(config, state)).await.is_err() { - return Ok(None); + let current_hb = collect_heartbeat(config, state); + let current_volumes: HashMap = current_hb.volumes.iter().map(|v| (v.id, v.clone())).collect(); + + let mut new_vols = Vec::new(); + let mut del_vols = Vec::new(); + + for (id, vol) in ¤t_volumes { + if !last_volumes.contains_key(id) { + new_vols.push(master_pb::VolumeShortInformationMessage { + id: *id, + collection: vol.collection.clone(), + version: vol.version, + replica_placement: vol.replica_placement, + ttl: vol.ttl, + disk_type: vol.disk_type.clone(), + disk_id: vol.disk_id, + }); + } + } + + for (id, vol) in &last_volumes { + if !current_volumes.contains_key(id) { + del_vols.push(master_pb::VolumeShortInformationMessage { + id: *id, + collection: vol.collection.clone(), + version: vol.version, + replica_placement: vol.replica_placement, + ttl: vol.ttl, + disk_type: vol.disk_type.clone(), + disk_id: vol.disk_id, + }); + } + } + + if !new_vols.is_empty() || !del_vols.is_empty() { + let delta_hb = master_pb::Heartbeat { + ip: config.ip.clone(), + port: config.port as u32, + data_center: config.data_center.clone(), + rack: config.rack.clone(), + new_volumes: new_vols, + deleted_volumes: del_vols, + ..Default::default() + }; + if tx.send(delta_hb).await.is_err() { + return Ok(None); + } + last_volumes = current_volumes; } }