diff --git a/test/volume_server/grpc/data_stream_success_test.go b/test/volume_server/grpc/data_stream_success_test.go index 90f2a8248..2cfdee9d6 100644 --- a/test/volume_server/grpc/data_stream_success_test.go +++ b/test/volume_server/grpc/data_stream_success_test.go @@ -3,6 +3,7 @@ package volume_server_grpc_test import ( "context" "io" + "net/http" "strings" "testing" "time" @@ -230,6 +231,71 @@ func TestReadAllNeedlesExistingThenMissingVolumeAbortsStream(t *testing.T) { } } +func TestReadAllNeedlesStreamsAcrossMultipleVolumes(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + const volumeIDA = uint32(86) + const volumeIDB = uint32(87) + const needleIDA = uint64(445561) + const needleIDB = uint64(445562) + framework.AllocateVolume(t, grpcClient, volumeIDA, "") + framework.AllocateVolume(t, grpcClient, volumeIDB, "") + + client := framework.NewHTTPClient() + fidA := framework.NewFileID(volumeIDA, needleIDA, 0xAA11CC22) + fidB := framework.NewFileID(volumeIDB, needleIDB, 0xBB22DD33) + payloadA := "read-all-multi-volume-a" + payloadB := "read-all-multi-volume-b" + + uploadA := framework.UploadBytes(t, client, clusterHarness.VolumeAdminURL(), fidA, []byte(payloadA)) + _ = framework.ReadAllAndClose(t, uploadA) + if uploadA.StatusCode != http.StatusCreated { + t.Fatalf("upload A expected 201, got %d", uploadA.StatusCode) + } + uploadB := framework.UploadBytes(t, client, clusterHarness.VolumeAdminURL(), fidB, []byte(payloadB)) + _ = framework.ReadAllAndClose(t, uploadB) + if uploadB.StatusCode != http.StatusCreated { + t.Fatalf("upload B expected 201, got %d", uploadB.StatusCode) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + stream, err := grpcClient.ReadAllNeedles(ctx, &volume_server_pb.ReadAllNeedlesRequest{ + VolumeIds: []uint32{volumeIDA, volumeIDB}, + }) + if err != nil { + t.Fatalf("ReadAllNeedles start failed: %v", err) + } + + seen := map[uint64]string{} + for { + msg, recvErr := stream.Recv() + if recvErr == io.EOF { + break + } + if recvErr != nil { + t.Fatalf("ReadAllNeedles recv failed: %v", recvErr) + } + if msg.GetNeedleId() == needleIDA || msg.GetNeedleId() == needleIDB { + seen[msg.GetNeedleId()] = string(msg.GetNeedleBlob()) + } + } + + if got := seen[needleIDA]; got != payloadA { + t.Fatalf("ReadAllNeedles missing/mismatched payload for volume A needle: got %q want %q", got, payloadA) + } + if got := seen[needleIDB]; got != payloadB { + t.Fatalf("ReadAllNeedles missing/mismatched payload for volume B needle: got %q want %q", got, payloadB) + } +} + func copyFileBytes(t testing.TB, grpcClient volume_server_pb.VolumeServerClient, req *volume_server_pb.CopyFileRequest) []byte { t.Helper()