diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index c211065a6..b9b63ec23 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -261,7 +261,6 @@ impl VolumeServer for VolumeGrpcService { } n.last_modified = now; - n.set_has_last_modified_date(); if !is_ec_volume { let mut store = self.state.store.write().unwrap(); @@ -455,7 +454,9 @@ impl VolumeServer for VolumeGrpcService { ) -> Result, Status> { let collection = &request.into_inner().collection; let mut store = self.state.store.write().unwrap(); - store.delete_collection(collection); + store + .delete_collection(collection) + .map_err(|e| Status::internal(e))?; Ok(Response::new(volume_server_pb::DeleteCollectionResponse {})) } @@ -572,9 +573,11 @@ impl VolumeServer for VolumeGrpcService { actual as u64 } } - Err(_e) => { - // On error, fall back to streaming from superblock end - super_block_size + Err(e) => { + return Err(Status::internal(format!( + "fail to locate by appendAtNs {}: {}", + req.since_ns, e + ))); } } }; @@ -625,8 +628,12 @@ impl VolumeServer for VolumeGrpcService { ) -> Result, Status> { let vid = VolumeId(request.into_inner().volume_id); let mut store = self.state.store.write().unwrap(); - // Unmount is idempotent — success even if volume not found - store.unmount_volume(vid); + if !store.unmount_volume(vid) { + return Err(Status::not_found(format!( + "volume {} not found", + vid + ))); + } Ok(Response::new(volume_server_pb::VolumeUnmountResponse {})) } @@ -755,8 +762,8 @@ impl VolumeServer for VolumeGrpcService { let mut store = self.state.store.write().unwrap(); - // Unmount the volume (Go returns nil for non-existent volumes, so we don't - // treat a missing volume as an error here — configure_volume will catch it) + // Unmount the volume (Go propagates unmount errors via resp.Error; + // Rust unmount_volume returns bool, so not-found falls through to configure_volume) store.unmount_volume(vid); // Modify the super block on disk (replica_placement byte) @@ -1507,7 +1514,7 @@ impl VolumeServer for VolumeGrpcService { let since_ns = req.since_ns; let idle_timeout = req.idle_timeout_seconds; let mut last_timestamp_ns = since_ns; - let mut draining_seconds = idle_timeout; + let mut draining_seconds = idle_timeout as i64; loop { // Use binary search to find starting offset, then scan from there @@ -1587,13 +1594,13 @@ impl VolumeServer for VolumeGrpcService { continue; } if last_processed_ns == last_timestamp_ns { - draining_seconds = draining_seconds.saturating_sub(1); - if draining_seconds == 0 { + draining_seconds -= 1; + if draining_seconds <= 0 { return; // EOF } } else { last_timestamp_ns = last_processed_ns; - draining_seconds = idle_timeout; + draining_seconds = idle_timeout as i64; } } }); @@ -2771,28 +2778,27 @@ impl VolumeServer for VolumeGrpcService { .as_secs(); n.set_has_last_modified_date(); - let e_tag; - { - let mut store = self.state.store.write().unwrap(); - match store.write_volume_needle(vid, &mut n) { - Ok(_) => { - e_tag = n.etag(); - } - Err(e) => { - return Err(Status::internal(format!( - "local write needle {} size {}: {}", - req.needle_id, req.size, e - ))); - } - } - } + // Run local write and replica writes concurrently (matches Go's WaitGroup) + let mut handles: Vec>> = Vec::new(); + + // Spawn local write as a concurrent task + let state_clone = self.state.clone(); + let mut n_clone = n.clone(); + let needle_id = req.needle_id; + let size = req.size; + let local_handle = tokio::task::spawn_blocking(move || { + let mut store = state_clone.store.write().unwrap(); + store + .write_volume_needle(vid, &mut n_clone) + .map(|_| ()) + .map_err(|e| format!("local write needle {} size {}: {}", needle_id, size, e)) + }); - // Replicate to peers concurrently (matches Go's sync.WaitGroup + goroutines) + // Spawn replica writes concurrently if !req.replicas.is_empty() { let file_id = format!("{},{:x}{:08x}", vid, req.needle_id, req.cookie); let http_client = self.state.http_client.clone(); let scheme = self.state.outgoing_http_scheme.clone(); - let mut handles = Vec::new(); for replica in &req.replicas { let raw_target = format!("{}/{}?type=replicate", replica.url, file_id); let url = @@ -2810,18 +2816,32 @@ impl VolumeServer for VolumeGrpcService { .multipart(form) .send() .await + .map(|_| ()) .map_err(|e| { format!("remote write needle {} size {}: {}", needle_id, size, e) }) })); } - for handle in handles { - match handle.await { - Ok(Ok(_)) => {} - Ok(Err(e)) => return Err(Status::internal(e)), - Err(e) => { - return Err(Status::internal(format!("replication task failed: {}", e))) - } + } + + // Await local write + match local_handle.await { + Ok(Ok(())) => {} + Ok(Err(e)) => return Err(Status::internal(e)), + Err(e) => { + return Err(Status::internal(format!("local write task failed: {}", e))) + } + } + + let e_tag = n.etag(); + + // Await replica writes + for handle in handles { + match handle.await { + Ok(Ok(())) => {} + Ok(Err(e)) => return Err(Status::internal(e)), + Err(e) => { + return Err(Status::internal(format!("replication task failed: {}", e))) } } } @@ -2867,6 +2887,8 @@ impl VolumeServer for VolumeGrpcService { .ok_or_else(|| Status::not_found(format!("volume id {} not found", vid.0)))?; total_volumes += 1; + // INDEX mode (1) calls scrub_index; LOCAL (3) and FULL (2) call scrub + // TODO: implement scrub_index() for INDEX-only mode; currently falls through to full scrub match v.scrub() { Ok((files, broken)) => { total_files += files; @@ -3001,7 +3023,7 @@ impl VolumeServer for VolumeGrpcService { for fid_str in &req.from_file_ids { let file_id = - needle::FileId::parse(fid_str).map_err(|e| Status::invalid_argument(e))?; + needle::FileId::parse(fid_str).map_err(|e| Status::internal(e))?; let mut n = Needle { id: file_id.key,