From eedc60afecb6bd9959a8a6dff35479a0dd10714d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 17 Mar 2026 15:49:58 -0700 Subject: [PATCH] Match Go gRPC: fix TailSender error propagation, EcShardsInfo all slots, EcShardRead .ecx check Three fixes: (1) VolumeTailSender now propagates binary search errors instead of silently falling back to start. (2) VolumeEcShardsInfo returns entries for all shard slots including unmounted. (3) VolumeEcShardRead checks .ecx index for deletions instead of .ecj. --- seaweed-volume/src/server/grpc_server.rs | 79 +++++++++++++++++------- 1 file changed, 56 insertions(+), 23 deletions(-) diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 560d94380..09409d9ac 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -1640,23 +1640,44 @@ impl VolumeServer for VolumeGrpcService { match vol.binary_search_by_append_at_ns(last_timestamp_ns) { Ok((offset, _is_last)) => { if offset.is_zero() { - sb_size + Ok(sb_size) } else { - offset.to_actual_offset() as u64 + Ok(offset.to_actual_offset() as u64) } } - Err(_) => sb_size, + Err(e) => { + tracing::warn!( + "fail to locate by appendAtNs {}: {}", + last_timestamp_ns, + e + ); + Err(format!( + "fail to locate by appendAtNs {}: {}", + last_timestamp_ns, e + )) + } } } else { - sb_size + Ok(sb_size) }; - vol.scan_raw_needles_from(start_offset) + match start_offset { + Ok(off) => Ok(vol.scan_raw_needles_from(off)), + Err(msg) => Err(msg), + } } else { break; } }; - let entries = match scan_result { + let scan_inner = match scan_result { + Ok(r) => r, + Err(msg) => { + let _ = tx.send(Err(Status::internal(msg))).await; + return; + } + }; + + let entries = match scan_inner { Ok(e) => e, Err(_) => break, }; @@ -2427,18 +2448,20 @@ impl VolumeServer for VolumeGrpcService { )) })?; - // Check if the requested needle is deleted + // Check if the requested needle is deleted (via .ecx index, matching Go) if req.file_key > 0 { let needle_id = NeedleId(req.file_key); - let deleted_needles = ec_vol - .read_deleted_needles() - .map_err(|e| Status::internal(e.to_string()))?; - if deleted_needles.contains(&needle_id) { - let results = vec![Ok(volume_server_pb::VolumeEcShardReadResponse { - is_deleted: true, - ..Default::default() - })]; - return Ok(Response::new(Box::pin(tokio_stream::iter(results)))); + if let Some((_offset, size)) = ec_vol + .find_needle_from_ecx(needle_id) + .map_err(|e| Status::internal(e.to_string()))? + { + if size.is_deleted() { + let results = vec![Ok(volume_server_pb::VolumeEcShardReadResponse { + is_deleted: true, + ..Default::default() + })]; + return Ok(Response::new(Box::pin(tokio_stream::iter(results)))); + } } } @@ -2635,13 +2658,23 @@ impl VolumeServer for VolumeGrpcService { let mut shard_infos = Vec::new(); for (i, shard) in ec_vol.shards.iter().enumerate() { - if let Some(s) = shard { - shard_infos.push(volume_server_pb::EcShardInfo { - shard_id: i as u32, - size: s.file_size(), - collection: ec_vol.collection.clone(), - volume_id: req.volume_id, - }); + match shard { + Some(s) => { + shard_infos.push(volume_server_pb::EcShardInfo { + shard_id: i as u32, + size: s.file_size(), + collection: ec_vol.collection.clone(), + volume_id: req.volume_id, + }); + } + None => { + shard_infos.push(volume_server_pb::EcShardInfo { + shard_id: i as u32, + collection: ec_vol.collection.clone(), + volume_id: req.volume_id, + ..Default::default() + }); + } } }