diff --git a/test/volume_server/grpc/tail_test.go b/test/volume_server/grpc/tail_test.go index 09657edb5..c5113a35f 100644 --- a/test/volume_server/grpc/tail_test.go +++ b/test/volume_server/grpc/tail_test.go @@ -12,6 +12,8 @@ import ( "github.com/seaweedfs/seaweedfs/test/volume_server/framework" "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func TestVolumeTailSenderMissingVolume(t *testing.T) { @@ -204,3 +206,46 @@ func TestVolumeTailSenderLargeNeedleChunking(t *testing.T) { t.Fatalf("VolumeTailSender expected a final data chunk marked IsLastChunk=true") } } + +func TestVolumeTailSenderStreamCancellation(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + const volumeID = uint32(74) + framework.AllocateVolume(t, grpcClient, volumeID, "") + + ctx, cancel := context.WithCancel(context.Background()) + stream, err := grpcClient.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{ + VolumeId: volumeID, + SinceNs: 0, + IdleTimeoutSeconds: 30, + }) + if err != nil { + cancel() + t.Fatalf("VolumeTailSender start failed: %v", err) + } + + firstMsg, err := stream.Recv() + if err != nil { + cancel() + t.Fatalf("VolumeTailSender first recv failed: %v", err) + } + if !firstMsg.GetIsLastChunk() { + cancel() + t.Fatalf("expected heartbeat first message before cancellation") + } + + cancel() + _, err = stream.Recv() + if err == nil { + t.Fatalf("VolumeTailSender recv after cancellation expected error") + } + if status.Code(err) != codes.Canceled { + t.Fatalf("VolumeTailSender cancellation expected grpc code Canceled, got %v (%v)", status.Code(err), err) + } +}