|
|
|
@ -100,7 +100,6 @@ pub async fn run_heartbeat_with_state( |
|
|
|
let err_msg = e.to_string();
|
|
|
|
// Drop `e` (non-Send) before any .await
|
|
|
|
drop(e);
|
|
|
|
state.is_heartbeating.store(false, Ordering::Relaxed);
|
|
|
|
warn!("Heartbeat to {} error: {}", grpc_addr, err_msg);
|
|
|
|
|
|
|
|
if err_msg.contains("duplicate") && err_msg.contains("UUID") {
|
|
|
|
@ -276,7 +275,7 @@ async fn do_heartbeat( |
|
|
|
// the server won't send response headers until it receives the first message,
|
|
|
|
// but send_heartbeat().await waits for response headers.
|
|
|
|
tx.send(initial_hb).await?;
|
|
|
|
tx.send(collect_ec_heartbeat(state)).await?;
|
|
|
|
tx.send(collect_ec_heartbeat(config, state)).await?;
|
|
|
|
|
|
|
|
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
|
|
|
|
let mut response_stream = client.send_heartbeat(stream).await?.into_inner();
|
|
|
|
@ -350,7 +349,7 @@ async fn do_heartbeat( |
|
|
|
}
|
|
|
|
|
|
|
|
_ = ec_tick.tick() => {
|
|
|
|
if tx.send(collect_ec_heartbeat(state)).await.is_err() {
|
|
|
|
if tx.send(collect_ec_heartbeat(config, state)).await.is_err() {
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@ -658,7 +657,7 @@ fn build_heartbeat(config: &HeartbeatConfig, store: &Store) -> master_pb::Heartb |
|
|
|
}
|
|
|
|
|
|
|
|
/// Collect EC shard information into a Heartbeat message.
|
|
|
|
fn collect_ec_heartbeat(state: &Arc<VolumeServerState>) -> master_pb::Heartbeat {
|
|
|
|
fn collect_ec_heartbeat(config: &HeartbeatConfig, state: &Arc<VolumeServerState>) -> master_pb::Heartbeat {
|
|
|
|
let store = state.store.read().unwrap();
|
|
|
|
|
|
|
|
let mut ec_shards = Vec::new();
|
|
|
|
@ -683,6 +682,11 @@ fn collect_ec_heartbeat(state: &Arc<VolumeServerState>) -> master_pb::Heartbeat |
|
|
|
|
|
|
|
let has_no = ec_shards.is_empty();
|
|
|
|
master_pb::Heartbeat {
|
|
|
|
ip: config.ip.clone(),
|
|
|
|
port: config.port as u32,
|
|
|
|
grpc_port: config.grpc_port as u32,
|
|
|
|
data_center: config.data_center.clone(),
|
|
|
|
rack: config.rack.clone(),
|
|
|
|
ec_shards,
|
|
|
|
has_no_ec_shards: has_no,
|
|
|
|
..Default::default()
|
|
|
|
|