Browse Source

Emit EC heartbeat deltas on shard changes

rust-volume-server
Chris Lu 3 days ago
parent
commit
e78bd10793
  1. 6
      seaweed-volume/src/server/grpc_server.rs
  2. 138
      seaweed-volume/src/server/heartbeat.rs

6
seaweed-volume/src/server/grpc_server.rs

@ -2311,6 +2311,8 @@ impl VolumeServer for VolumeGrpcService {
let vid = VolumeId(req.volume_id);
let mut store = self.state.store.write().unwrap();
store.delete_ec_shards(vid, &req.collection, &req.shard_ids);
drop(store);
self.state.volume_state_notify.notify_one();
Ok(Response::new(
volume_server_pb::VolumeEcShardsDeleteResponse {},
))
@ -2333,6 +2335,8 @@ impl VolumeServer for VolumeGrpcService {
Status::internal(format!("mount {}.{}: {}", req.volume_id, shard_id, e))
})?;
}
drop(store);
self.state.volume_state_notify.notify_one();
Ok(Response::new(
volume_server_pb::VolumeEcShardsMountResponse {},
@ -2354,6 +2358,8 @@ impl VolumeServer for VolumeGrpcService {
Status::internal(format!("unmount {}.{}: {}", req.volume_id, shard_id, e))
})?;
}
drop(store);
self.state.volume_state_notify.notify_one();
Ok(Response::new(
volume_server_pb::VolumeEcShardsUnmountResponse {},
))

138
seaweed-volume/src/server/heartbeat.rs

@ -279,6 +279,68 @@ fn apply_master_volume_options(store: &Store, hb_resp: &master_pb::HeartbeatResp
volume_opts_changed && store.maybe_adjust_volume_max()
}
type EcShardDeltaKey = (u32, String, u32, u32);
fn collect_ec_shard_delta_messages(
store: &Store,
) -> HashMap<EcShardDeltaKey, master_pb::VolumeEcShardInformationMessage> {
let mut messages = HashMap::new();
for (disk_id, loc) in store.locations.iter().enumerate() {
for (_, ec_vol) in loc.ec_volumes() {
for shard in ec_vol.shards.iter().flatten() {
messages.insert(
(
ec_vol.volume_id.0,
ec_vol.collection.clone(),
disk_id as u32,
shard.shard_id as u32,
),
master_pb::VolumeEcShardInformationMessage {
id: ec_vol.volume_id.0,
collection: ec_vol.collection.clone(),
ec_index_bits: 1u32 << shard.shard_id,
shard_sizes: vec![shard.file_size()],
disk_type: ec_vol.disk_type.to_string(),
expire_at_sec: ec_vol.expire_at_sec,
disk_id: disk_id as u32,
..Default::default()
},
);
}
}
}
messages
}
fn diff_ec_shard_delta_messages(
previous: &HashMap<EcShardDeltaKey, master_pb::VolumeEcShardInformationMessage>,
current: &HashMap<EcShardDeltaKey, master_pb::VolumeEcShardInformationMessage>,
) -> (
Vec<master_pb::VolumeEcShardInformationMessage>,
Vec<master_pb::VolumeEcShardInformationMessage>,
) {
let mut new_ec_shards = Vec::new();
let mut deleted_ec_shards = Vec::new();
for (key, message) in current {
if previous.get(key) != Some(message) {
new_ec_shards.push(message.clone());
}
}
for (key, message) in previous {
if !current.contains_key(key) {
let mut deleted = message.clone();
deleted.shard_sizes = vec![0];
deleted_ec_shards.push(deleted);
}
}
(new_ec_shards, deleted_ec_shards)
}
/// Perform one heartbeat session with a master server.
async fn do_heartbeat(
config: &HeartbeatConfig,
@ -309,6 +371,10 @@ async fn do_heartbeat(
.iter()
.map(|v| (v.id, v.clone()))
.collect();
let mut last_ec_shards = {
let store = state.store.read().unwrap();
collect_ec_shard_delta_messages(&store)
};
// Send initial heartbeats BEFORE calling send_heartbeat to avoid deadlock:
// the server won't send response headers until it receives the first message,
@ -346,6 +412,10 @@ async fn do_heartbeat(
let adjusted_hb = collect_heartbeat(config, state);
last_volumes =
adjusted_hb.volumes.iter().map(|v| (v.id, v.clone())).collect();
last_ec_shards = {
let store = state.store.read().unwrap();
collect_ec_shard_delta_messages(&store)
};
if tx.send(adjusted_hb).await.is_err() {
return Ok(None);
}
@ -389,13 +459,22 @@ async fn do_heartbeat(
}
let current_hb = collect_heartbeat(config, state);
last_volumes = current_hb.volumes.iter().map(|v| (v.id, v.clone())).collect();
last_ec_shards = {
let store = state.store.read().unwrap();
collect_ec_shard_delta_messages(&store)
};
if tx.send(current_hb).await.is_err() {
return Ok(None);
}
}
_ = ec_tick.tick() => {
if tx.send(collect_ec_heartbeat(config, state)).await.is_err() {
let current_ec_hb = collect_ec_heartbeat(config, state);
last_ec_shards = {
let store = state.store.read().unwrap();
collect_ec_shard_delta_messages(&store)
};
if tx.send(current_ec_hb).await.is_err() {
return Ok(None);
}
}
@ -409,6 +488,10 @@ async fn do_heartbeat(
}
let current_hb = collect_heartbeat(config, state);
let current_volumes: HashMap<u32, _> = current_hb.volumes.iter().map(|v| (v.id, v.clone())).collect();
let current_ec_shards = {
let store = state.store.read().unwrap();
collect_ec_shard_delta_messages(&store)
};
let mut new_vols = Vec::new();
let mut del_vols = Vec::new();
@ -441,6 +524,9 @@ async fn do_heartbeat(
}
}
let (new_ec_shards, deleted_ec_shards) =
diff_ec_shard_delta_messages(&last_ec_shards, &current_ec_shards);
// Collect current state for state-only or combined delta heartbeats.
// Mirrors Go's StateUpdateChan case which sends state changes immediately.
let current_state = Some(volume_server_pb::VolumeServerState {
@ -448,8 +534,11 @@ async fn do_heartbeat(
version: state.state_version.load(Ordering::Relaxed),
});
if !new_vols.is_empty() || !del_vols.is_empty() {
// Volume delta heartbeat with state
if !new_vols.is_empty()
|| !del_vols.is_empty()
|| !new_ec_shards.is_empty()
|| !deleted_ec_shards.is_empty()
{
let delta_hb = master_pb::Heartbeat {
ip: config.ip.clone(),
port: config.port as u32,
@ -459,6 +548,8 @@ async fn do_heartbeat(
rack: config.rack.clone(),
new_volumes: new_vols,
deleted_volumes: del_vols,
new_ec_shards,
deleted_ec_shards,
state: current_state,
..Default::default()
};
@ -466,6 +557,7 @@ async fn do_heartbeat(
return Ok(None);
}
last_volumes = current_volumes;
last_ec_shards = current_ec_shards;
} else {
// State-only heartbeat (e.g., MarkReadonly/MarkWritable changed state
// without adding/removing volumes). Mirrors Go's StateUpdateChan case.
@ -1435,4 +1527,44 @@ mod tests {
assert_eq!(store.volume_size_limit.load(Ordering::Relaxed), 2048);
assert!(!changed);
}
#[test]
fn test_diff_ec_shard_delta_messages_reports_mounts_and_unmounts() {
let temp_dir = tempfile::tempdir().unwrap();
let dir = temp_dir.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory);
store
.add_location(
dir,
dir,
8,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
let previous = collect_ec_shard_delta_messages(&store);
std::fs::write(format!("{}/ec_delta_case_81.ec00", dir), b"delta").unwrap();
store.locations[0]
.mount_ec_shards(VolumeId(81), "ec_delta_case", &[0])
.unwrap();
let current = collect_ec_shard_delta_messages(&store);
let (new_ec_shards, deleted_ec_shards) =
diff_ec_shard_delta_messages(&previous, &current);
assert_eq!(new_ec_shards.len(), 1);
assert!(deleted_ec_shards.is_empty());
assert_eq!(new_ec_shards[0].ec_index_bits, 1);
assert_eq!(new_ec_shards[0].shard_sizes, vec![5]);
let (new_after_delete, deleted_after_delete) =
diff_ec_shard_delta_messages(&current, &HashMap::new());
assert!(new_after_delete.is_empty());
assert_eq!(deleted_after_delete.len(), 1);
assert_eq!(deleted_after_delete[0].ec_index_bits, 1);
assert_eq!(deleted_after_delete[0].shard_sizes, vec![0]);
}
}
Loading…
Cancel
Save