diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 560d94380..09409d9ac 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -1640,23 +1640,44 @@ impl VolumeServer for VolumeGrpcService { match vol.binary_search_by_append_at_ns(last_timestamp_ns) { Ok((offset, _is_last)) => { if offset.is_zero() { - sb_size + Ok(sb_size) } else { - offset.to_actual_offset() as u64 + Ok(offset.to_actual_offset() as u64) } } - Err(_) => sb_size, + Err(e) => { + tracing::warn!( + "fail to locate by appendAtNs {}: {}", + last_timestamp_ns, + e + ); + Err(format!( + "fail to locate by appendAtNs {}: {}", + last_timestamp_ns, e + )) + } } } else { - sb_size + Ok(sb_size) }; - vol.scan_raw_needles_from(start_offset) + match start_offset { + Ok(off) => Ok(vol.scan_raw_needles_from(off)), + Err(msg) => Err(msg), + } } else { break; } }; - let entries = match scan_result { + let scan_inner = match scan_result { + Ok(r) => r, + Err(msg) => { + let _ = tx.send(Err(Status::internal(msg))).await; + return; + } + }; + + let entries = match scan_inner { Ok(e) => e, Err(_) => break, }; @@ -2427,18 +2448,20 @@ impl VolumeServer for VolumeGrpcService { )) })?; - // Check if the requested needle is deleted + // Check if the requested needle is deleted (via .ecx index, matching Go) if req.file_key > 0 { let needle_id = NeedleId(req.file_key); - let deleted_needles = ec_vol - .read_deleted_needles() - .map_err(|e| Status::internal(e.to_string()))?; - if deleted_needles.contains(&needle_id) { - let results = vec![Ok(volume_server_pb::VolumeEcShardReadResponse { - is_deleted: true, - ..Default::default() - })]; - return Ok(Response::new(Box::pin(tokio_stream::iter(results)))); + if let Some((_offset, size)) = ec_vol + .find_needle_from_ecx(needle_id) + .map_err(|e| Status::internal(e.to_string()))? + { + if size.is_deleted() { + let results = vec![Ok(volume_server_pb::VolumeEcShardReadResponse { + is_deleted: true, + ..Default::default() + })]; + return Ok(Response::new(Box::pin(tokio_stream::iter(results)))); + } } } @@ -2635,13 +2658,23 @@ impl VolumeServer for VolumeGrpcService { let mut shard_infos = Vec::new(); for (i, shard) in ec_vol.shards.iter().enumerate() { - if let Some(s) = shard { - shard_infos.push(volume_server_pb::EcShardInfo { - shard_id: i as u32, - size: s.file_size(), - collection: ec_vol.collection.clone(), - volume_id: req.volume_id, - }); + match shard { + Some(s) => { + shard_infos.push(volume_server_pb::EcShardInfo { + shard_id: i as u32, + size: s.file_size(), + collection: ec_vol.collection.clone(), + volume_id: req.volume_id, + }); + } + None => { + shard_infos.push(volume_server_pb::EcShardInfo { + shard_id: i as u32, + collection: ec_vol.collection.clone(), + volume_id: req.volume_id, + ..Default::default() + }); + } } }