Browse Source

Match Go gRPC: BatchDelete no flag, IncrementalCopy error, FetchAndWrite concurrent, VolumeUnmount/DeleteCollection errors, tail draining, query error code

rust-volume-server
Chris Lu 4 days ago
parent
commit
1eae9f771e
  1. 98
      seaweed-volume/src/server/grpc_server.rs

98
seaweed-volume/src/server/grpc_server.rs

@ -261,7 +261,6 @@ impl VolumeServer for VolumeGrpcService {
} }
n.last_modified = now; n.last_modified = now;
n.set_has_last_modified_date();
if !is_ec_volume { if !is_ec_volume {
let mut store = self.state.store.write().unwrap(); let mut store = self.state.store.write().unwrap();
@ -455,7 +454,9 @@ impl VolumeServer for VolumeGrpcService {
) -> Result<Response<volume_server_pb::DeleteCollectionResponse>, Status> { ) -> Result<Response<volume_server_pb::DeleteCollectionResponse>, Status> {
let collection = &request.into_inner().collection; let collection = &request.into_inner().collection;
let mut store = self.state.store.write().unwrap(); let mut store = self.state.store.write().unwrap();
store.delete_collection(collection);
store
.delete_collection(collection)
.map_err(|e| Status::internal(e))?;
Ok(Response::new(volume_server_pb::DeleteCollectionResponse {})) Ok(Response::new(volume_server_pb::DeleteCollectionResponse {}))
} }
@ -572,9 +573,11 @@ impl VolumeServer for VolumeGrpcService {
actual as u64 actual as u64
} }
} }
Err(_e) => {
// On error, fall back to streaming from superblock end
super_block_size
Err(e) => {
return Err(Status::internal(format!(
"fail to locate by appendAtNs {}: {}",
req.since_ns, e
)));
} }
} }
}; };
@ -625,8 +628,12 @@ impl VolumeServer for VolumeGrpcService {
) -> Result<Response<volume_server_pb::VolumeUnmountResponse>, Status> { ) -> Result<Response<volume_server_pb::VolumeUnmountResponse>, Status> {
let vid = VolumeId(request.into_inner().volume_id); let vid = VolumeId(request.into_inner().volume_id);
let mut store = self.state.store.write().unwrap(); let mut store = self.state.store.write().unwrap();
// Unmount is idempotent — success even if volume not found
store.unmount_volume(vid);
if !store.unmount_volume(vid) {
return Err(Status::not_found(format!(
"volume {} not found",
vid
)));
}
Ok(Response::new(volume_server_pb::VolumeUnmountResponse {})) Ok(Response::new(volume_server_pb::VolumeUnmountResponse {}))
} }
@ -755,8 +762,8 @@ impl VolumeServer for VolumeGrpcService {
let mut store = self.state.store.write().unwrap(); let mut store = self.state.store.write().unwrap();
// Unmount the volume (Go returns nil for non-existent volumes, so we don't
// treat a missing volume as an error here — configure_volume will catch it)
// Unmount the volume (Go propagates unmount errors via resp.Error;
// Rust unmount_volume returns bool, so not-found falls through to configure_volume)
store.unmount_volume(vid); store.unmount_volume(vid);
// Modify the super block on disk (replica_placement byte) // Modify the super block on disk (replica_placement byte)
@ -1507,7 +1514,7 @@ impl VolumeServer for VolumeGrpcService {
let since_ns = req.since_ns; let since_ns = req.since_ns;
let idle_timeout = req.idle_timeout_seconds; let idle_timeout = req.idle_timeout_seconds;
let mut last_timestamp_ns = since_ns; let mut last_timestamp_ns = since_ns;
let mut draining_seconds = idle_timeout;
let mut draining_seconds = idle_timeout as i64;
loop { loop {
// Use binary search to find starting offset, then scan from there // Use binary search to find starting offset, then scan from there
@ -1587,13 +1594,13 @@ impl VolumeServer for VolumeGrpcService {
continue; continue;
} }
if last_processed_ns == last_timestamp_ns { if last_processed_ns == last_timestamp_ns {
draining_seconds = draining_seconds.saturating_sub(1);
if draining_seconds == 0 {
draining_seconds -= 1;
if draining_seconds <= 0 {
return; // EOF return; // EOF
} }
} else { } else {
last_timestamp_ns = last_processed_ns; last_timestamp_ns = last_processed_ns;
draining_seconds = idle_timeout;
draining_seconds = idle_timeout as i64;
} }
} }
}); });
@ -2771,28 +2778,27 @@ impl VolumeServer for VolumeGrpcService {
.as_secs(); .as_secs();
n.set_has_last_modified_date(); n.set_has_last_modified_date();
let e_tag;
{
let mut store = self.state.store.write().unwrap();
match store.write_volume_needle(vid, &mut n) {
Ok(_) => {
e_tag = n.etag();
}
Err(e) => {
return Err(Status::internal(format!(
"local write needle {} size {}: {}",
req.needle_id, req.size, e
)));
}
}
}
// Run local write and replica writes concurrently (matches Go's WaitGroup)
let mut handles: Vec<tokio::task::JoinHandle<Result<(), String>>> = Vec::new();
// Spawn local write as a concurrent task
let state_clone = self.state.clone();
let mut n_clone = n.clone();
let needle_id = req.needle_id;
let size = req.size;
let local_handle = tokio::task::spawn_blocking(move || {
let mut store = state_clone.store.write().unwrap();
store
.write_volume_needle(vid, &mut n_clone)
.map(|_| ())
.map_err(|e| format!("local write needle {} size {}: {}", needle_id, size, e))
});
// Replicate to peers concurrently (matches Go's sync.WaitGroup + goroutines)
// Spawn replica writes concurrently
if !req.replicas.is_empty() { if !req.replicas.is_empty() {
let file_id = format!("{},{:x}{:08x}", vid, req.needle_id, req.cookie); let file_id = format!("{},{:x}{:08x}", vid, req.needle_id, req.cookie);
let http_client = self.state.http_client.clone(); let http_client = self.state.http_client.clone();
let scheme = self.state.outgoing_http_scheme.clone(); let scheme = self.state.outgoing_http_scheme.clone();
let mut handles = Vec::new();
for replica in &req.replicas { for replica in &req.replicas {
let raw_target = format!("{}/{}?type=replicate", replica.url, file_id); let raw_target = format!("{}/{}?type=replicate", replica.url, file_id);
let url = let url =
@ -2810,18 +2816,32 @@ impl VolumeServer for VolumeGrpcService {
.multipart(form) .multipart(form)
.send() .send()
.await .await
.map(|_| ())
.map_err(|e| { .map_err(|e| {
format!("remote write needle {} size {}: {}", needle_id, size, e) format!("remote write needle {} size {}: {}", needle_id, size, e)
}) })
})); }));
} }
for handle in handles {
match handle.await {
Ok(Ok(_)) => {}
Ok(Err(e)) => return Err(Status::internal(e)),
Err(e) => {
return Err(Status::internal(format!("replication task failed: {}", e)))
}
}
// Await local write
match local_handle.await {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(Status::internal(e)),
Err(e) => {
return Err(Status::internal(format!("local write task failed: {}", e)))
}
}
let e_tag = n.etag();
// Await replica writes
for handle in handles {
match handle.await {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(Status::internal(e)),
Err(e) => {
return Err(Status::internal(format!("replication task failed: {}", e)))
} }
} }
} }
@ -2867,6 +2887,8 @@ impl VolumeServer for VolumeGrpcService {
.ok_or_else(|| Status::not_found(format!("volume id {} not found", vid.0)))?; .ok_or_else(|| Status::not_found(format!("volume id {} not found", vid.0)))?;
total_volumes += 1; total_volumes += 1;
// INDEX mode (1) calls scrub_index; LOCAL (3) and FULL (2) call scrub
// TODO: implement scrub_index() for INDEX-only mode; currently falls through to full scrub
match v.scrub() { match v.scrub() {
Ok((files, broken)) => { Ok((files, broken)) => {
total_files += files; total_files += files;
@ -3001,7 +3023,7 @@ impl VolumeServer for VolumeGrpcService {
for fid_str in &req.from_file_ids { for fid_str in &req.from_file_ids {
let file_id = let file_id =
needle::FileId::parse(fid_str).map_err(|e| Status::invalid_argument(e))?;
needle::FileId::parse(fid_str).map_err(|e| Status::internal(e))?;
let mut n = Needle { let mut n = Needle {
id: file_id.key, id: file_id.key,

Loading…
Cancel
Save