Browse Source

Match Go gRPC: fix TailSender error propagation, EcShardsInfo all slots, EcShardRead .ecx check

Three fixes: (1) VolumeTailSender now propagates binary search errors
instead of silently falling back to start. (2) VolumeEcShardsInfo
returns entries for all shard slots including unmounted. (3)
VolumeEcShardRead checks .ecx index for deletions instead of .ecj.
rust-volume-server
Chris Lu 3 days ago
parent
commit
eedc60afec
  1. 79
      seaweed-volume/src/server/grpc_server.rs

79
seaweed-volume/src/server/grpc_server.rs

@ -1640,23 +1640,44 @@ impl VolumeServer for VolumeGrpcService {
match vol.binary_search_by_append_at_ns(last_timestamp_ns) {
Ok((offset, _is_last)) => {
if offset.is_zero() {
sb_size
Ok(sb_size)
} else {
offset.to_actual_offset() as u64
Ok(offset.to_actual_offset() as u64)
}
}
Err(_) => sb_size,
Err(e) => {
tracing::warn!(
"fail to locate by appendAtNs {}: {}",
last_timestamp_ns,
e
);
Err(format!(
"fail to locate by appendAtNs {}: {}",
last_timestamp_ns, e
))
}
}
} else {
sb_size
Ok(sb_size)
};
vol.scan_raw_needles_from(start_offset)
match start_offset {
Ok(off) => Ok(vol.scan_raw_needles_from(off)),
Err(msg) => Err(msg),
}
} else {
break;
}
};
let entries = match scan_result {
let scan_inner = match scan_result {
Ok(r) => r,
Err(msg) => {
let _ = tx.send(Err(Status::internal(msg))).await;
return;
}
};
let entries = match scan_inner {
Ok(e) => e,
Err(_) => break,
};
@ -2427,18 +2448,20 @@ impl VolumeServer for VolumeGrpcService {
))
})?;
// Check if the requested needle is deleted
// Check if the requested needle is deleted (via .ecx index, matching Go)
if req.file_key > 0 {
let needle_id = NeedleId(req.file_key);
let deleted_needles = ec_vol
.read_deleted_needles()
.map_err(|e| Status::internal(e.to_string()))?;
if deleted_needles.contains(&needle_id) {
let results = vec![Ok(volume_server_pb::VolumeEcShardReadResponse {
is_deleted: true,
..Default::default()
})];
return Ok(Response::new(Box::pin(tokio_stream::iter(results))));
if let Some((_offset, size)) = ec_vol
.find_needle_from_ecx(needle_id)
.map_err(|e| Status::internal(e.to_string()))?
{
if size.is_deleted() {
let results = vec![Ok(volume_server_pb::VolumeEcShardReadResponse {
is_deleted: true,
..Default::default()
})];
return Ok(Response::new(Box::pin(tokio_stream::iter(results))));
}
}
}
@ -2635,13 +2658,23 @@ impl VolumeServer for VolumeGrpcService {
let mut shard_infos = Vec::new();
for (i, shard) in ec_vol.shards.iter().enumerate() {
if let Some(s) = shard {
shard_infos.push(volume_server_pb::EcShardInfo {
shard_id: i as u32,
size: s.file_size(),
collection: ec_vol.collection.clone(),
volume_id: req.volume_id,
});
match shard {
Some(s) => {
shard_infos.push(volume_server_pb::EcShardInfo {
shard_id: i as u32,
size: s.file_size(),
collection: ec_vol.collection.clone(),
volume_id: req.volume_id,
});
}
None => {
shard_infos.push(volume_server_pb::EcShardInfo {
shard_id: i as u32,
collection: ec_vol.collection.clone(),
volume_id: req.volume_id,
..Default::default()
});
}
}
}

Loading…
Cancel
Save