|
|
|
@ -3,6 +3,7 @@ package volume_server_grpc_test |
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"io" |
|
|
|
"net/http" |
|
|
|
"strings" |
|
|
|
"testing" |
|
|
|
"time" |
|
|
|
@ -230,6 +231,71 @@ func TestReadAllNeedlesExistingThenMissingVolumeAbortsStream(t *testing.T) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func TestReadAllNeedlesStreamsAcrossMultipleVolumes(t *testing.T) { |
|
|
|
if testing.Short() { |
|
|
|
t.Skip("skipping integration test in short mode") |
|
|
|
} |
|
|
|
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1()) |
|
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) |
|
|
|
defer conn.Close() |
|
|
|
|
|
|
|
const volumeIDA = uint32(86) |
|
|
|
const volumeIDB = uint32(87) |
|
|
|
const needleIDA = uint64(445561) |
|
|
|
const needleIDB = uint64(445562) |
|
|
|
framework.AllocateVolume(t, grpcClient, volumeIDA, "") |
|
|
|
framework.AllocateVolume(t, grpcClient, volumeIDB, "") |
|
|
|
|
|
|
|
client := framework.NewHTTPClient() |
|
|
|
fidA := framework.NewFileID(volumeIDA, needleIDA, 0xAA11CC22) |
|
|
|
fidB := framework.NewFileID(volumeIDB, needleIDB, 0xBB22DD33) |
|
|
|
payloadA := "read-all-multi-volume-a" |
|
|
|
payloadB := "read-all-multi-volume-b" |
|
|
|
|
|
|
|
uploadA := framework.UploadBytes(t, client, clusterHarness.VolumeAdminURL(), fidA, []byte(payloadA)) |
|
|
|
_ = framework.ReadAllAndClose(t, uploadA) |
|
|
|
if uploadA.StatusCode != http.StatusCreated { |
|
|
|
t.Fatalf("upload A expected 201, got %d", uploadA.StatusCode) |
|
|
|
} |
|
|
|
uploadB := framework.UploadBytes(t, client, clusterHarness.VolumeAdminURL(), fidB, []byte(payloadB)) |
|
|
|
_ = framework.ReadAllAndClose(t, uploadB) |
|
|
|
if uploadB.StatusCode != http.StatusCreated { |
|
|
|
t.Fatalf("upload B expected 201, got %d", uploadB.StatusCode) |
|
|
|
} |
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
|
|
|
defer cancel() |
|
|
|
|
|
|
|
stream, err := grpcClient.ReadAllNeedles(ctx, &volume_server_pb.ReadAllNeedlesRequest{ |
|
|
|
VolumeIds: []uint32{volumeIDA, volumeIDB}, |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
t.Fatalf("ReadAllNeedles start failed: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
seen := map[uint64]string{} |
|
|
|
for { |
|
|
|
msg, recvErr := stream.Recv() |
|
|
|
if recvErr == io.EOF { |
|
|
|
break |
|
|
|
} |
|
|
|
if recvErr != nil { |
|
|
|
t.Fatalf("ReadAllNeedles recv failed: %v", recvErr) |
|
|
|
} |
|
|
|
if msg.GetNeedleId() == needleIDA || msg.GetNeedleId() == needleIDB { |
|
|
|
seen[msg.GetNeedleId()] = string(msg.GetNeedleBlob()) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if got := seen[needleIDA]; got != payloadA { |
|
|
|
t.Fatalf("ReadAllNeedles missing/mismatched payload for volume A needle: got %q want %q", got, payloadA) |
|
|
|
} |
|
|
|
if got := seen[needleIDB]; got != payloadB { |
|
|
|
t.Fatalf("ReadAllNeedles missing/mismatched payload for volume B needle: got %q want %q", got, payloadB) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func copyFileBytes(t testing.TB, grpcClient volume_server_pb.VolumeServerClient, req *volume_server_pb.CopyFileRequest) []byte { |
|
|
|
t.Helper() |
|
|
|
|
|
|
|
|