You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
273 lines
8.5 KiB
273 lines
8.5 KiB
package volume_server_grpc_test
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
|
|
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
)
|
|
|
|
func TestReadWriteNeedleBlobAndMetaRoundTrip(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
const volumeID = uint32(83)
|
|
const sourceNeedleID = uint64(333333)
|
|
const sourceCookie = uint32(0xABCD0102)
|
|
const clonedNeedleID = uint64(333334)
|
|
|
|
framework.AllocateVolume(t, grpcClient, volumeID, "")
|
|
|
|
client := framework.NewHTTPClient()
|
|
payload := []byte("blob-roundtrip-content")
|
|
fid := framework.NewFileID(volumeID, sourceNeedleID, sourceCookie)
|
|
uploadResp := framework.UploadBytes(t, client, clusterHarness.VolumeAdminURL(), fid, payload)
|
|
_ = framework.ReadAllAndClose(t, uploadResp)
|
|
if uploadResp.StatusCode != 201 {
|
|
t.Fatalf("upload expected 201, got %d", uploadResp.StatusCode)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
fileStatus, err := grpcClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{VolumeId: volumeID})
|
|
if err != nil {
|
|
t.Fatalf("ReadVolumeFileStatus failed: %v", err)
|
|
}
|
|
if fileStatus.GetIdxFileSize() == 0 {
|
|
t.Fatalf("expected non-zero idx file size after upload")
|
|
}
|
|
|
|
idxBytes := copyFileBytes(t, grpcClient, &volume_server_pb.CopyFileRequest{
|
|
VolumeId: volumeID,
|
|
Ext: ".idx",
|
|
CompactionRevision: fileStatus.GetCompactionRevision(),
|
|
StopOffset: fileStatus.GetIdxFileSize(),
|
|
})
|
|
offset, size := findNeedleOffsetAndSize(t, idxBytes, sourceNeedleID)
|
|
|
|
blobResp, err := grpcClient.ReadNeedleBlob(ctx, &volume_server_pb.ReadNeedleBlobRequest{
|
|
VolumeId: volumeID,
|
|
Offset: offset,
|
|
Size: size,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("ReadNeedleBlob failed: %v", err)
|
|
}
|
|
if len(blobResp.GetNeedleBlob()) == 0 {
|
|
t.Fatalf("ReadNeedleBlob returned empty blob")
|
|
}
|
|
|
|
metaResp, err := grpcClient.ReadNeedleMeta(ctx, &volume_server_pb.ReadNeedleMetaRequest{
|
|
VolumeId: volumeID,
|
|
NeedleId: sourceNeedleID,
|
|
Offset: offset,
|
|
Size: size,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("ReadNeedleMeta failed: %v", err)
|
|
}
|
|
if metaResp.GetCookie() != sourceCookie {
|
|
t.Fatalf("ReadNeedleMeta cookie mismatch: got %d want %d", metaResp.GetCookie(), sourceCookie)
|
|
}
|
|
|
|
_, err = grpcClient.WriteNeedleBlob(ctx, &volume_server_pb.WriteNeedleBlobRequest{
|
|
VolumeId: volumeID,
|
|
NeedleId: clonedNeedleID,
|
|
Size: size,
|
|
NeedleBlob: blobResp.GetNeedleBlob(),
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("WriteNeedleBlob failed: %v", err)
|
|
}
|
|
|
|
clonedStatus, err := grpcClient.VolumeNeedleStatus(ctx, &volume_server_pb.VolumeNeedleStatusRequest{
|
|
VolumeId: volumeID,
|
|
NeedleId: clonedNeedleID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("VolumeNeedleStatus for cloned needle failed: %v", err)
|
|
}
|
|
if clonedStatus.GetNeedleId() != sourceNeedleID {
|
|
t.Fatalf("cloned needle status id mismatch: got %d want %d", clonedStatus.GetNeedleId(), sourceNeedleID)
|
|
}
|
|
if clonedStatus.GetCookie() != sourceCookie {
|
|
t.Fatalf("cloned needle cookie mismatch: got %d want %d", clonedStatus.GetCookie(), sourceCookie)
|
|
}
|
|
|
|
clonedReadResp := framework.ReadBytes(t, client, clusterHarness.VolumeAdminURL(), framework.NewFileID(volumeID, clonedNeedleID, sourceCookie))
|
|
clonedReadBody := framework.ReadAllAndClose(t, clonedReadResp)
|
|
if clonedReadResp.StatusCode != 200 {
|
|
t.Fatalf("cloned needle GET expected 200, got %d", clonedReadResp.StatusCode)
|
|
}
|
|
if string(clonedReadBody) != string(payload) {
|
|
t.Fatalf("cloned needle body mismatch: got %q want %q", string(clonedReadBody), string(payload))
|
|
}
|
|
}
|
|
|
|
func TestReadAllNeedlesStreamsUploadedRecords(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
const volumeID = uint32(84)
|
|
framework.AllocateVolume(t, grpcClient, volumeID, "")
|
|
|
|
client := framework.NewHTTPClient()
|
|
expected := map[uint64]string{
|
|
444441: "read-all-needle-one",
|
|
444442: "read-all-needle-two",
|
|
}
|
|
for key, body := range expected {
|
|
resp := framework.UploadBytes(t, client, clusterHarness.VolumeAdminURL(), framework.NewFileID(volumeID, key, 0xA0B0C0D0), []byte(body))
|
|
_ = framework.ReadAllAndClose(t, resp)
|
|
if resp.StatusCode != 201 {
|
|
t.Fatalf("upload for key %d expected 201, got %d", key, resp.StatusCode)
|
|
}
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
stream, err := grpcClient.ReadAllNeedles(ctx, &volume_server_pb.ReadAllNeedlesRequest{VolumeIds: []uint32{volumeID}})
|
|
if err != nil {
|
|
t.Fatalf("ReadAllNeedles start failed: %v", err)
|
|
}
|
|
|
|
seen := map[uint64]string{}
|
|
for {
|
|
msg, recvErr := stream.Recv()
|
|
if recvErr == io.EOF {
|
|
break
|
|
}
|
|
if recvErr != nil {
|
|
t.Fatalf("ReadAllNeedles recv failed: %v", recvErr)
|
|
}
|
|
if _, wanted := expected[msg.GetNeedleId()]; wanted {
|
|
seen[msg.GetNeedleId()] = string(msg.GetNeedleBlob())
|
|
}
|
|
}
|
|
|
|
for key, body := range expected {
|
|
got, found := seen[key]
|
|
if !found {
|
|
t.Fatalf("ReadAllNeedles missing key %d in stream", key)
|
|
}
|
|
if got != body {
|
|
t.Fatalf("ReadAllNeedles body mismatch for key %d: got %q want %q", key, got, body)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestReadAllNeedlesExistingThenMissingVolumeAbortsStream(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
const existingVolumeID = uint32(85)
|
|
const missingVolumeID = uint32(98585)
|
|
const needleID = uint64(445551)
|
|
framework.AllocateVolume(t, grpcClient, existingVolumeID, "")
|
|
|
|
client := framework.NewHTTPClient()
|
|
fid := framework.NewFileID(existingVolumeID, needleID, 0xAA11BB22)
|
|
payload := "read-all-existing-then-missing"
|
|
uploadResp := framework.UploadBytes(t, client, clusterHarness.VolumeAdminURL(), fid, []byte(payload))
|
|
_ = framework.ReadAllAndClose(t, uploadResp)
|
|
if uploadResp.StatusCode != 201 {
|
|
t.Fatalf("upload expected 201, got %d", uploadResp.StatusCode)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
stream, err := grpcClient.ReadAllNeedles(ctx, &volume_server_pb.ReadAllNeedlesRequest{
|
|
VolumeIds: []uint32{existingVolumeID, missingVolumeID},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("ReadAllNeedles start failed: %v", err)
|
|
}
|
|
|
|
seenUploadedNeedle := false
|
|
for {
|
|
msg, recvErr := stream.Recv()
|
|
if recvErr == io.EOF {
|
|
t.Fatalf("ReadAllNeedles expected stream error for missing volume, got EOF")
|
|
}
|
|
if recvErr != nil {
|
|
if !strings.Contains(recvErr.Error(), "not found volume id") {
|
|
t.Fatalf("ReadAllNeedles missing-volume error mismatch: %v", recvErr)
|
|
}
|
|
break
|
|
}
|
|
if msg.GetNeedleId() == needleID && string(msg.GetNeedleBlob()) == payload {
|
|
seenUploadedNeedle = true
|
|
}
|
|
}
|
|
|
|
if !seenUploadedNeedle {
|
|
t.Fatalf("ReadAllNeedles should stream entries from existing volume before missing-volume abort")
|
|
}
|
|
}
|
|
|
|
func copyFileBytes(t testing.TB, grpcClient volume_server_pb.VolumeServerClient, req *volume_server_pb.CopyFileRequest) []byte {
|
|
t.Helper()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
stream, err := grpcClient.CopyFile(ctx, req)
|
|
if err != nil {
|
|
t.Fatalf("CopyFile start failed: %v", err)
|
|
}
|
|
|
|
var out []byte
|
|
for {
|
|
msg, recvErr := stream.Recv()
|
|
if recvErr == io.EOF {
|
|
return out
|
|
}
|
|
if recvErr != nil {
|
|
t.Fatalf("CopyFile recv failed: %v", recvErr)
|
|
}
|
|
out = append(out, msg.GetFileContent()...)
|
|
}
|
|
}
|
|
|
|
func findNeedleOffsetAndSize(t testing.TB, idxBytes []byte, needleID uint64) (offset int64, size int32) {
|
|
t.Helper()
|
|
|
|
for i := 0; i+types.NeedleMapEntrySize <= len(idxBytes); i += types.NeedleMapEntrySize {
|
|
key, entryOffset, entrySize := idx.IdxFileEntry(idxBytes[i : i+types.NeedleMapEntrySize])
|
|
if uint64(key) != needleID {
|
|
continue
|
|
}
|
|
if entryOffset.IsZero() || entrySize <= 0 {
|
|
continue
|
|
}
|
|
return entryOffset.ToActualOffset(), int32(entrySize)
|
|
}
|
|
|
|
t.Fatalf("needle id %d not found in idx entries", needleID)
|
|
return 0, 0
|
|
}
|