From e78bd10793b1b52d4732db166164d3d49a2632fa Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Mar 2026 21:51:06 -0700 Subject: [PATCH] Emit EC heartbeat deltas on shard changes --- seaweed-volume/src/server/grpc_server.rs | 6 + seaweed-volume/src/server/heartbeat.rs | 138 ++++++++++++++++++++++- 2 files changed, 141 insertions(+), 3 deletions(-) diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index e8c66a521..532e2ba00 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -2311,6 +2311,8 @@ impl VolumeServer for VolumeGrpcService { let vid = VolumeId(req.volume_id); let mut store = self.state.store.write().unwrap(); store.delete_ec_shards(vid, &req.collection, &req.shard_ids); + drop(store); + self.state.volume_state_notify.notify_one(); Ok(Response::new( volume_server_pb::VolumeEcShardsDeleteResponse {}, )) @@ -2333,6 +2335,8 @@ impl VolumeServer for VolumeGrpcService { Status::internal(format!("mount {}.{}: {}", req.volume_id, shard_id, e)) })?; } + drop(store); + self.state.volume_state_notify.notify_one(); Ok(Response::new( volume_server_pb::VolumeEcShardsMountResponse {}, @@ -2354,6 +2358,8 @@ impl VolumeServer for VolumeGrpcService { Status::internal(format!("unmount {}.{}: {}", req.volume_id, shard_id, e)) })?; } + drop(store); + self.state.volume_state_notify.notify_one(); Ok(Response::new( volume_server_pb::VolumeEcShardsUnmountResponse {}, )) diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index 6930e5aa5..a5886b8ac 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -279,6 +279,68 @@ fn apply_master_volume_options(store: &Store, hb_resp: &master_pb::HeartbeatResp volume_opts_changed && store.maybe_adjust_volume_max() } +type EcShardDeltaKey = (u32, String, u32, u32); + +fn collect_ec_shard_delta_messages( + store: &Store, +) -> HashMap { + let mut messages = HashMap::new(); + + for (disk_id, loc) in store.locations.iter().enumerate() { + for (_, ec_vol) in loc.ec_volumes() { + for shard in ec_vol.shards.iter().flatten() { + messages.insert( + ( + ec_vol.volume_id.0, + ec_vol.collection.clone(), + disk_id as u32, + shard.shard_id as u32, + ), + master_pb::VolumeEcShardInformationMessage { + id: ec_vol.volume_id.0, + collection: ec_vol.collection.clone(), + ec_index_bits: 1u32 << shard.shard_id, + shard_sizes: vec![shard.file_size()], + disk_type: ec_vol.disk_type.to_string(), + expire_at_sec: ec_vol.expire_at_sec, + disk_id: disk_id as u32, + ..Default::default() + }, + ); + } + } + } + + messages +} + +fn diff_ec_shard_delta_messages( + previous: &HashMap, + current: &HashMap, +) -> ( + Vec, + Vec, +) { + let mut new_ec_shards = Vec::new(); + let mut deleted_ec_shards = Vec::new(); + + for (key, message) in current { + if previous.get(key) != Some(message) { + new_ec_shards.push(message.clone()); + } + } + + for (key, message) in previous { + if !current.contains_key(key) { + let mut deleted = message.clone(); + deleted.shard_sizes = vec![0]; + deleted_ec_shards.push(deleted); + } + } + + (new_ec_shards, deleted_ec_shards) +} + /// Perform one heartbeat session with a master server. async fn do_heartbeat( config: &HeartbeatConfig, @@ -309,6 +371,10 @@ async fn do_heartbeat( .iter() .map(|v| (v.id, v.clone())) .collect(); + let mut last_ec_shards = { + let store = state.store.read().unwrap(); + collect_ec_shard_delta_messages(&store) + }; // Send initial heartbeats BEFORE calling send_heartbeat to avoid deadlock: // the server won't send response headers until it receives the first message, @@ -346,6 +412,10 @@ async fn do_heartbeat( let adjusted_hb = collect_heartbeat(config, state); last_volumes = adjusted_hb.volumes.iter().map(|v| (v.id, v.clone())).collect(); + last_ec_shards = { + let store = state.store.read().unwrap(); + collect_ec_shard_delta_messages(&store) + }; if tx.send(adjusted_hb).await.is_err() { return Ok(None); } @@ -389,13 +459,22 @@ async fn do_heartbeat( } let current_hb = collect_heartbeat(config, state); last_volumes = current_hb.volumes.iter().map(|v| (v.id, v.clone())).collect(); + last_ec_shards = { + let store = state.store.read().unwrap(); + collect_ec_shard_delta_messages(&store) + }; if tx.send(current_hb).await.is_err() { return Ok(None); } } _ = ec_tick.tick() => { - if tx.send(collect_ec_heartbeat(config, state)).await.is_err() { + let current_ec_hb = collect_ec_heartbeat(config, state); + last_ec_shards = { + let store = state.store.read().unwrap(); + collect_ec_shard_delta_messages(&store) + }; + if tx.send(current_ec_hb).await.is_err() { return Ok(None); } } @@ -409,6 +488,10 @@ async fn do_heartbeat( } let current_hb = collect_heartbeat(config, state); let current_volumes: HashMap = current_hb.volumes.iter().map(|v| (v.id, v.clone())).collect(); + let current_ec_shards = { + let store = state.store.read().unwrap(); + collect_ec_shard_delta_messages(&store) + }; let mut new_vols = Vec::new(); let mut del_vols = Vec::new(); @@ -441,6 +524,9 @@ async fn do_heartbeat( } } + let (new_ec_shards, deleted_ec_shards) = + diff_ec_shard_delta_messages(&last_ec_shards, ¤t_ec_shards); + // Collect current state for state-only or combined delta heartbeats. // Mirrors Go's StateUpdateChan case which sends state changes immediately. let current_state = Some(volume_server_pb::VolumeServerState { @@ -448,8 +534,11 @@ async fn do_heartbeat( version: state.state_version.load(Ordering::Relaxed), }); - if !new_vols.is_empty() || !del_vols.is_empty() { - // Volume delta heartbeat with state + if !new_vols.is_empty() + || !del_vols.is_empty() + || !new_ec_shards.is_empty() + || !deleted_ec_shards.is_empty() + { let delta_hb = master_pb::Heartbeat { ip: config.ip.clone(), port: config.port as u32, @@ -459,6 +548,8 @@ async fn do_heartbeat( rack: config.rack.clone(), new_volumes: new_vols, deleted_volumes: del_vols, + new_ec_shards, + deleted_ec_shards, state: current_state, ..Default::default() }; @@ -466,6 +557,7 @@ async fn do_heartbeat( return Ok(None); } last_volumes = current_volumes; + last_ec_shards = current_ec_shards; } else { // State-only heartbeat (e.g., MarkReadonly/MarkWritable changed state // without adding/removing volumes). Mirrors Go's StateUpdateChan case. @@ -1435,4 +1527,44 @@ mod tests { assert_eq!(store.volume_size_limit.load(Ordering::Relaxed), 2048); assert!(!changed); } + + #[test] + fn test_diff_ec_shard_delta_messages_reports_mounts_and_unmounts() { + let temp_dir = tempfile::tempdir().unwrap(); + let dir = temp_dir.path().to_str().unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store + .add_location( + dir, + dir, + 8, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + + let previous = collect_ec_shard_delta_messages(&store); + + std::fs::write(format!("{}/ec_delta_case_81.ec00", dir), b"delta").unwrap(); + store.locations[0] + .mount_ec_shards(VolumeId(81), "ec_delta_case", &[0]) + .unwrap(); + let current = collect_ec_shard_delta_messages(&store); + let (new_ec_shards, deleted_ec_shards) = + diff_ec_shard_delta_messages(&previous, ¤t); + + assert_eq!(new_ec_shards.len(), 1); + assert!(deleted_ec_shards.is_empty()); + assert_eq!(new_ec_shards[0].ec_index_bits, 1); + assert_eq!(new_ec_shards[0].shard_sizes, vec![5]); + + let (new_after_delete, deleted_after_delete) = + diff_ec_shard_delta_messages(¤t, &HashMap::new()); + assert!(new_after_delete.is_empty()); + assert_eq!(deleted_after_delete.len(), 1); + assert_eq!(deleted_after_delete[0].ec_index_bits, 1); + assert_eq!(deleted_after_delete[0].shard_sizes, vec![0]); + } }