Browse Source

feat(server): send delta updates in master heartbeats

rust-volume-server
Chris Lu 2 days ago
parent
commit
fbd2b6df88
  1. 60
      seaweed-volume/src/server/heartbeat.rs

60
seaweed-volume/src/server/heartbeat.rs

@ -107,10 +107,14 @@ async fn do_heartbeat(
let (tx, rx) = tokio::sync::mpsc::channel::<master_pb::Heartbeat>(32);
// Keep track of what we sent, to generate delta updates
let initial_hb = collect_heartbeat(config, state);
let mut last_volumes: HashMap<u32, master_pb::VolumeInformationMessage> = initial_hb.volumes.iter().map(|v| (v.id, v.clone())).collect();
// Send initial heartbeats BEFORE calling send_heartbeat to avoid deadlock:
// the server won't send response headers until it receives the first message,
// but send_heartbeat().await waits for response headers.
tx.send(collect_heartbeat(config, state)).await?;
tx.send(initial_hb).await?;
tx.send(collect_ec_heartbeat(state)).await?;
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
@ -157,7 +161,9 @@ async fn do_heartbeat(
}
_ = volume_tick.tick() => {
if tx.send(collect_heartbeat(config, state)).await.is_err() {
let current_hb = collect_heartbeat(config, state);
last_volumes = current_hb.volumes.iter().map(|v| (v.id, v.clone())).collect();
if tx.send(current_hb).await.is_err() {
return Ok(None);
}
}
@ -169,8 +175,54 @@ async fn do_heartbeat(
}
_ = state.volume_state_notify.notified() => {
if tx.send(collect_heartbeat(config, state)).await.is_err() {
return Ok(None);
let current_hb = collect_heartbeat(config, state);
let current_volumes: HashMap<u32, _> = current_hb.volumes.iter().map(|v| (v.id, v.clone())).collect();
let mut new_vols = Vec::new();
let mut del_vols = Vec::new();
for (id, vol) in &current_volumes {
if !last_volumes.contains_key(id) {
new_vols.push(master_pb::VolumeShortInformationMessage {
id: *id,
collection: vol.collection.clone(),
version: vol.version,
replica_placement: vol.replica_placement,
ttl: vol.ttl,
disk_type: vol.disk_type.clone(),
disk_id: vol.disk_id,
});
}
}
for (id, vol) in &last_volumes {
if !current_volumes.contains_key(id) {
del_vols.push(master_pb::VolumeShortInformationMessage {
id: *id,
collection: vol.collection.clone(),
version: vol.version,
replica_placement: vol.replica_placement,
ttl: vol.ttl,
disk_type: vol.disk_type.clone(),
disk_id: vol.disk_id,
});
}
}
if !new_vols.is_empty() || !del_vols.is_empty() {
let delta_hb = master_pb::Heartbeat {
ip: config.ip.clone(),
port: config.port as u32,
data_center: config.data_center.clone(),
rack: config.rack.clone(),
new_volumes: new_vols,
deleted_volumes: del_vols,
..Default::default()
};
if tx.send(delta_hb).await.is_err() {
return Ok(None);
}
last_volumes = current_volumes;
}
}

Loading…
Cancel
Save