Browse Source

volume: add volume existence checks to streaming gRPC stubs

Add volume/collection validation and maintenance checks to unimplemented
streaming RPCs so they return proper errors instead of generic unimplemented:
- VolumeIncrementalCopy, CopyFile, ReadAllNeedles: not found volume
- VolumeTailSender/Receiver: not found volume (receiver prefix)
- FetchAndWriteNeedle: maintenance + volume + remote config checks
- VolumeTierMoveDat{To,From}Remote: maintenance + volume + collection + backend

gRPC: 46/75 pass (was 38/75)
rust-volume-server
Chris Lu 4 days ago
parent
commit
93bdf0beeb
  1. 106
      seaweed-volume/src/server/grpc_server.rs

106
seaweed-volume/src/server/grpc_server.rs

@ -238,8 +238,13 @@ impl VolumeServer for VolumeGrpcService {
type VolumeIncrementalCopyStream = BoxStream<volume_server_pb::VolumeIncrementalCopyResponse>;
async fn volume_incremental_copy(
&self,
_request: Request<volume_server_pb::VolumeIncrementalCopyRequest>,
request: Request<volume_server_pb::VolumeIncrementalCopyRequest>,
) -> Result<Response<Self::VolumeIncrementalCopyStream>, Status> {
let vid = VolumeId(request.into_inner().volume_id);
let store = self.state.store.read().unwrap();
store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
drop(store);
Err(Status::unimplemented("volume_incremental_copy not yet implemented"))
}
@ -455,8 +460,16 @@ impl VolumeServer for VolumeGrpcService {
type CopyFileStream = BoxStream<volume_server_pb::CopyFileResponse>;
async fn copy_file(
&self,
_request: Request<volume_server_pb::CopyFileRequest>,
request: Request<volume_server_pb::CopyFileRequest>,
) -> Result<Response<Self::CopyFileStream>, Status> {
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let store = self.state.store.read().unwrap();
// Check regular volume or EC volume
if store.find_volume(vid).is_none() {
return Err(Status::not_found(format!("not found volume id {}", vid)));
}
drop(store);
Err(Status::unimplemented("copy_file not yet implemented"))
}
@ -548,23 +561,43 @@ impl VolumeServer for VolumeGrpcService {
type ReadAllNeedlesStream = BoxStream<volume_server_pb::ReadAllNeedlesResponse>;
async fn read_all_needles(
&self,
_request: Request<volume_server_pb::ReadAllNeedlesRequest>,
request: Request<volume_server_pb::ReadAllNeedlesRequest>,
) -> Result<Response<Self::ReadAllNeedlesStream>, Status> {
let req = request.into_inner();
if let Some(&first_vid) = req.volume_ids.first() {
let vid = VolumeId(first_vid);
let store = self.state.store.read().unwrap();
store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
drop(store);
}
Err(Status::unimplemented("read_all_needles not yet implemented"))
}
type VolumeTailSenderStream = BoxStream<volume_server_pb::VolumeTailSenderResponse>;
async fn volume_tail_sender(
&self,
_request: Request<volume_server_pb::VolumeTailSenderRequest>,
request: Request<volume_server_pb::VolumeTailSenderRequest>,
) -> Result<Response<Self::VolumeTailSenderStream>, Status> {
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let store = self.state.store.read().unwrap();
store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
drop(store);
Err(Status::unimplemented("volume_tail_sender not yet implemented"))
}
async fn volume_tail_receiver(
&self,
_request: Request<volume_server_pb::VolumeTailReceiverRequest>,
request: Request<volume_server_pb::VolumeTailReceiverRequest>,
) -> Result<Response<volume_server_pb::VolumeTailReceiverResponse>, Status> {
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let store = self.state.store.read().unwrap();
store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("receiver not found volume id {}", vid)))?;
drop(store);
Err(Status::unimplemented("volume_tail_receiver not yet implemented"))
}
@ -667,17 +700,51 @@ impl VolumeServer for VolumeGrpcService {
type VolumeTierMoveDatToRemoteStream = BoxStream<volume_server_pb::VolumeTierMoveDatToRemoteResponse>;
async fn volume_tier_move_dat_to_remote(
&self,
_request: Request<volume_server_pb::VolumeTierMoveDatToRemoteRequest>,
request: Request<volume_server_pb::VolumeTierMoveDatToRemoteRequest>,
) -> Result<Response<Self::VolumeTierMoveDatToRemoteStream>, Status> {
Err(Status::unimplemented("volume_tier_move_dat_to_remote not yet implemented"))
self.state.check_maintenance()?;
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let store = self.state.store.read().unwrap();
let (_, vol) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
if vol.collection != req.collection {
return Err(Status::invalid_argument(format!(
"unexpected input {}, expected collection {}", req.collection, vol.collection
)));
}
drop(store);
// Backend not supported
Err(Status::internal(format!(
"destination {} not found", req.destination_backend_name
)))
}
type VolumeTierMoveDatFromRemoteStream = BoxStream<volume_server_pb::VolumeTierMoveDatFromRemoteResponse>;
async fn volume_tier_move_dat_from_remote(
&self,
_request: Request<volume_server_pb::VolumeTierMoveDatFromRemoteRequest>,
request: Request<volume_server_pb::VolumeTierMoveDatFromRemoteRequest>,
) -> Result<Response<Self::VolumeTierMoveDatFromRemoteStream>, Status> {
Err(Status::unimplemented("volume_tier_move_dat_from_remote not yet implemented"))
self.state.check_maintenance()?;
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let store = self.state.store.read().unwrap();
let (_, vol) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
if vol.collection != req.collection {
return Err(Status::invalid_argument(format!(
"unexpected input {}, expected collection {}", req.collection, vol.collection
)));
}
drop(store);
// Volume is already on local disk (no remote storage support)
Err(Status::internal(format!("volume {} already on local disk", vid)))
}
// ---- Server management ----
@ -735,9 +802,26 @@ impl VolumeServer for VolumeGrpcService {
async fn fetch_and_write_needle(
&self,
_request: Request<volume_server_pb::FetchAndWriteNeedleRequest>,
request: Request<volume_server_pb::FetchAndWriteNeedleRequest>,
) -> Result<Response<volume_server_pb::FetchAndWriteNeedleResponse>, Status> {
Err(Status::unimplemented("fetch_and_write_needle not yet implemented"))
self.state.check_maintenance()?;
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
// Check volume exists
let store = self.state.store.read().unwrap();
store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
drop(store);
// Remote storage is not supported — fail with appropriate error
if req.remote_conf.is_some() {
return Err(Status::internal(format!(
"get remote client: remote storage type not supported"
)));
}
Err(Status::unimplemented("fetch_and_write_needle: remote storage not configured"))
}
async fn scrub_volume(

Loading…
Cancel
Save