From 573a42eb93f192d58959e33b38bb9b213534863f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 18 Mar 2026 00:35:35 -0700 Subject: [PATCH] grpc: notify master after volume lifecycle changes --- seaweed-volume/src/server/grpc_server.rs | 119 +++++++++--------- .../http/replication_lifecycle_test.go | 63 ++++++++++ 2 files changed, 124 insertions(+), 58 deletions(-) create mode 100644 test/volume_server/http/replication_lifecycle_test.go diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index fd484901e..495517cb8 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -509,6 +509,7 @@ impl VolumeServer for VolumeGrpcService { version, ) .map_err(|e| Status::internal(e.to_string()))?; + self.state.volume_state_notify.notify_one(); Ok(Response::new(volume_server_pb::AllocateVolumeResponse {})) } @@ -630,6 +631,7 @@ impl VolumeServer for VolumeGrpcService { store .mount_volume_by_id(vid) .map_err(|e| Status::internal(e.to_string()))?; + self.state.volume_state_notify.notify_one(); Ok(Response::new(volume_server_pb::VolumeMountResponse {})) } @@ -641,7 +643,9 @@ impl VolumeServer for VolumeGrpcService { let vid = VolumeId(request.into_inner().volume_id); let mut store = self.state.store.write().unwrap(); // Go returns nil when volume is not found (idempotent unmount) - store.unmount_volume(vid); + if store.unmount_volume(vid) { + self.state.volume_state_notify.notify_one(); + } Ok(Response::new(volume_server_pb::VolumeUnmountResponse {})) } @@ -664,6 +668,7 @@ impl VolumeServer for VolumeGrpcService { store .delete_volume(vid, req.only_empty) .map_err(|e| Status::internal(e.to_string()))?; + self.state.volume_state_notify.notify_one(); Ok(Response::new(volume_server_pb::VolumeDeleteResponse {})) } @@ -807,6 +812,7 @@ impl VolumeServer for VolumeGrpcService { error: format!("volume configure mount {}: {}", vid, e), })); } + self.state.volume_state_notify.notify_one(); Ok(Response::new(volume_server_pb::VolumeConfigureResponse { error: String::new(), @@ -1018,16 +1024,11 @@ impl VolumeServer for VolumeGrpcService { { Ok(resp) => { if resp.volume_preallocate { - preallocate_size = - resp.volume_size_limit_m_b as i64 * 1024 * 1024; + preallocate_size = resp.volume_size_limit_m_b as i64 * 1024 * 1024; } } Err(e) => { - tracing::warn!( - "get master {} configuration: {}", - state.master_url, - e - ); + tracing::warn!("get master {} configuration: {}", state.master_url, e); } } @@ -1040,10 +1041,7 @@ impl VolumeServer for VolumeGrpcService { )) })?; file.set_len(preallocate_size as u64).map_err(|e| { - Status::internal(format!( - "preallocate volume file {}: {}", - dat_path, e - )) + Status::internal(format!("preallocate volume file {}: {}", dat_path, e)) })?; } } @@ -1136,8 +1134,12 @@ impl VolumeServer for VolumeGrpcService { // Find last_append_at_ns from copied files let last_append_at_ns = if !has_remote_dat { - find_last_append_at_ns(&idx_path, &format!("{}.dat", data_base_name), vol_info.version) - .unwrap_or(vol_info.dat_file_timestamp_seconds * 1_000_000_000) + find_last_append_at_ns( + &idx_path, + &format!("{}.dat", data_base_name), + vol_info.version, + ) + .unwrap_or(vol_info.dat_file_timestamp_seconds * 1_000_000_000) } else { vol_info.dat_file_timestamp_seconds * 1_000_000_000 }; @@ -1404,10 +1406,9 @@ impl VolumeServer for VolumeGrpcService { p }; - target_file = - Some(std::fs::File::create(&path).map_err(|e| { - Status::internal(format!("failed to create file: {}", e)) - })?); + target_file = Some(std::fs::File::create(&path).map_err(|e| { + Status::internal(format!("failed to create file: {}", e)) + })?); file_path = Some(path); } Some(volume_server_pb::receive_file_request::Data::FileContent(content)) => { @@ -1481,9 +1482,12 @@ impl VolumeServer for VolumeGrpcService { .find_volume(vid) .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; - let blob = vol - .read_needle_blob(offset, size) - .map_err(|e| Status::internal(format!("read needle blob offset {} size {}: {}", offset, size.0, e)))?; + let blob = vol.read_needle_blob(offset, size).map_err(|e| { + Status::internal(format!( + "read needle blob offset {} size {}: {}", + offset, size.0, e + )) + })?; Ok(Response::new(volume_server_pb::ReadNeedleBlobResponse { needle_blob: blob, @@ -1896,9 +1900,7 @@ impl VolumeServer for VolumeGrpcService { // Check existing .vif for EC shard config (matching Go's MaybeLoadVolumeInfo) let (data_shards, parity_shards) = - crate::storage::erasure_coding::ec_volume::read_ec_shard_config( - &dir, collection, vid, - ); + crate::storage::erasure_coding::ec_volume::read_ec_shard_config(&dir, collection, vid); if let Err(e) = crate::storage::erasure_coding::ec_encoder::write_ec_files( &dir, @@ -2051,7 +2053,9 @@ impl VolumeServer for VolumeGrpcService { // Determine data/parity shard config from rebuild dir let (data_shards, parity_shards) = crate::storage::erasure_coding::ec_volume::read_ec_shard_config( - &rebuild_dir, collection, vid, + &rebuild_dir, + collection, + vid, ); let total_shards = data_shards + parity_shards; @@ -2059,7 +2063,10 @@ impl VolumeServer for VolumeGrpcService { let mut missing: Vec = Vec::new(); for shard_id in 0..total_shards as u8 { let shard = crate::storage::erasure_coding::ec_shard::EcVolumeShard::new( - &rebuild_dir, collection, vid, shard_id, + &rebuild_dir, + collection, + vid, + shard_id, ); let mut found = std::path::Path::new(&shard.file_name()).exists(); if !found { @@ -2687,8 +2694,9 @@ impl VolumeServer for VolumeGrpcService { } // Walk .ecx index to compute file counts and total size (matching Go's WalkIndex) - let (file_count, file_deleted_count, volume_size) = - ec_vol.walk_ecx_stats().map_err(|e| Status::internal(e.to_string()))?; + let (file_count, file_deleted_count, volume_size) = ec_vol + .walk_ecx_stats() + .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new( volume_server_pb::VolumeEcShardsInfoResponse { @@ -3158,9 +3166,7 @@ impl VolumeServer for VolumeGrpcService { match local_handle.await { Ok(Ok(())) => {} Ok(Err(e)) => return Err(Status::internal(e)), - Err(e) => { - return Err(Status::internal(format!("local write task failed: {}", e))) - } + Err(e) => return Err(Status::internal(format!("local write task failed: {}", e))), } let e_tag = n.etag(); @@ -3170,9 +3176,7 @@ impl VolumeServer for VolumeGrpcService { match handle.await { Ok(Ok(())) => {} Ok(Err(e)) => return Err(Status::internal(e)), - Err(e) => { - return Err(Status::internal(format!("replication task failed: {}", e))) - } + Err(e) => return Err(Status::internal(format!("replication task failed: {}", e))), } } @@ -3285,9 +3289,9 @@ impl VolumeServer for VolumeGrpcService { let mut details: Vec = Vec::new(); for vid in &vids { - let ecv = store.find_ec_volume(*vid).ok_or_else(|| { - Status::not_found(format!("EC volume id {} not found", vid.0)) - })?; + let ecv = store + .find_ec_volume(*vid) + .ok_or_else(|| Status::not_found(format!("EC volume id {} not found", vid.0)))?; let collection = ecv.collection.clone(); match mode { @@ -3319,7 +3323,9 @@ impl VolumeServer for VolumeGrpcService { total_files += files; let (data_shards, parity_shards) = crate::storage::erasure_coding::ec_volume::read_ec_shard_config( - &dir, &collection, *vid, + &dir, + &collection, + *vid, ); match crate::storage::erasure_coding::ec_encoder::verify_ec_shards( @@ -3373,8 +3379,7 @@ impl VolumeServer for VolumeGrpcService { let mut stripes: Vec> = Vec::new(); for fid_str in &req.from_file_ids { - let file_id = - needle::FileId::parse(fid_str).map_err(|e| Status::internal(e))?; + let file_id = needle::FileId::parse(fid_str).map_err(|e| Status::internal(e))?; let mut n = Needle { id: file_id.key, @@ -3391,7 +3396,10 @@ impl VolumeServer for VolumeGrpcService { // Cookie mismatch: log and return empty stream (matching Go behavior where err is nil) if n.cookie != original_cookie { - tracing::info!("volume query failed to read fid cookie {}: cookie mismatch", fid_str); + tracing::info!( + "volume query failed to read fid cookie {}: cookie mismatch", + fid_str + ); let stream = tokio_stream::iter(stripes); return Ok(Response::new(Box::pin(stream))); } @@ -3544,12 +3552,7 @@ impl VolumeServer for VolumeGrpcService { }, )); } - Err(_) => { - return Err(Status::not_found(format!( - "needle not found {}", - needle_id - ))) - } + Err(_) => return Err(Status::not_found(format!("needle not found {}", needle_id))), } } @@ -3573,10 +3576,7 @@ impl VolumeServer for VolumeGrpcService { )); } Ok(None) => { - return Err(Status::not_found(format!( - "needle not found {}", - needle_id - ))); + return Err(Status::not_found(format!("needle not found {}", needle_id))); } Err(e) => { return Err(Status::internal(format!( @@ -4131,8 +4131,7 @@ mod tests { (format!("http://{}", addr), shutdown_tx) } - fn make_remote_only_service() - -> ( + fn make_remote_only_service() -> ( VolumeGrpcService, TempDir, tokio::sync::oneshot::Sender<()>, @@ -4273,7 +4272,13 @@ mod tests { cli_white_list: vec![], }); - (VolumeGrpcService { state }, tmp, shutdown_tx, dat_bytes, super_block_size) + ( + VolumeGrpcService { state }, + tmp, + shutdown_tx, + dat_bytes, + super_block_size, + ) } fn make_local_service_with_volume( @@ -4417,10 +4422,8 @@ mod tests { .unwrap(); let vif_path = tmp.path().join("ttl_1.vif"); - let vif: crate::storage::volume::VifVolumeInfo = serde_json::from_str( - &std::fs::read_to_string(vif_path).unwrap(), - ) - .unwrap(); + let vif: crate::storage::volume::VifVolumeInfo = + serde_json::from_str(&std::fs::read_to_string(vif_path).unwrap()).unwrap(); assert!(vif.expire_at_sec >= before + ttl.to_seconds()); assert!(vif.expire_at_sec <= before + ttl.to_seconds() + 5); } diff --git a/test/volume_server/http/replication_lifecycle_test.go b/test/volume_server/http/replication_lifecycle_test.go new file mode 100644 index 000000000..c88ffeae6 --- /dev/null +++ b/test/volume_server/http/replication_lifecycle_test.go @@ -0,0 +1,63 @@ +package volume_server_http_test + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" +) + +func TestReplicatedUploadSucceedsImmediatelyAfterAllocate(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + clusterHarness := framework.StartDualVolumeCluster(t, matrix.P1()) + + conn0, grpc0 := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress(0)) + defer conn0.Close() + conn1, grpc1 := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress(1)) + defer conn1.Close() + + const volumeID = uint32(115) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + req := &volume_server_pb.AllocateVolumeRequest{ + VolumeId: volumeID, + Replication: "001", + Version: uint32(needle.GetCurrentVersion()), + } + if _, err := grpc0.AllocateVolume(ctx, req); err != nil { + t.Fatalf("allocate replicated volume on node0: %v", err) + } + if _, err := grpc1.AllocateVolume(ctx, req); err != nil { + t.Fatalf("allocate replicated volume on node1: %v", err) + } + + client := framework.NewHTTPClient() + fid := framework.NewFileID(volumeID, 881001, 0x0B0C0D0E) + payload := []byte("replicated-upload-after-allocate") + + uploadResp := framework.UploadBytes(t, client, clusterHarness.VolumeAdminURL(0), fid, payload) + _ = framework.ReadAllAndClose(t, uploadResp) + if uploadResp.StatusCode != http.StatusCreated { + t.Fatalf("replicated upload expected 201, got %d", uploadResp.StatusCode) + } + + replicaReadURL := clusterHarness.VolumeAdminURL(1) + "/" + fid + var replicaBody []byte + if !waitForHTTPStatus(t, client, replicaReadURL, http.StatusOK, 10*time.Second, func(resp *http.Response) { + replicaBody = framework.ReadAllAndClose(t, resp) + }) { + t.Fatalf("replica did not become readable within deadline") + } + if string(replicaBody) != string(payload) { + t.Fatalf("replica body mismatch: got %q want %q", string(replicaBody), string(payload)) + } +}