You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
206 lines
6.6 KiB
206 lines
6.6 KiB
package volume_server_grpc_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
|
|
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
)
|
|
|
|
func TestVolumeTailSenderMissingVolume(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
stream, err := grpcClient.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{VolumeId: 77777, SinceNs: 0, IdleTimeoutSeconds: 1})
|
|
if err == nil {
|
|
_, err = stream.Recv()
|
|
}
|
|
if err == nil || !strings.Contains(err.Error(), "not found volume") {
|
|
t.Fatalf("VolumeTailSender missing-volume error mismatch: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestVolumeTailSenderHeartbeatThenEOF(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
const volumeID = uint32(71)
|
|
framework.AllocateVolume(t, grpcClient, volumeID, "")
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
stream, err := grpcClient.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{
|
|
VolumeId: volumeID,
|
|
SinceNs: 0,
|
|
IdleTimeoutSeconds: 1,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("VolumeTailSender start failed: %v", err)
|
|
}
|
|
|
|
msg, err := stream.Recv()
|
|
if err != nil {
|
|
t.Fatalf("VolumeTailSender first recv failed: %v", err)
|
|
}
|
|
if !msg.GetIsLastChunk() {
|
|
t.Fatalf("expected first tail message to be heartbeat IsLastChunk=true")
|
|
}
|
|
|
|
_, err = stream.Recv()
|
|
if err != io.EOF {
|
|
t.Fatalf("expected EOF after idle timeout drain, got: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestVolumeTailReceiverMissingVolume(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
_, err := grpcClient.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{VolumeId: 88888, SourceVolumeServer: clusterHarness.VolumeServerAddress(), SinceNs: 0, IdleTimeoutSeconds: 1})
|
|
if err == nil || !strings.Contains(err.Error(), "receiver not found volume") {
|
|
t.Fatalf("VolumeTailReceiver missing-volume error mismatch: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestVolumeTailReceiverReplicatesSourceUpdates(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartDualVolumeCluster(t, matrix.P1())
|
|
sourceConn, sourceClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress(0))
|
|
defer sourceConn.Close()
|
|
destConn, destClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress(1))
|
|
defer destConn.Close()
|
|
|
|
const volumeID = uint32(72)
|
|
framework.AllocateVolume(t, sourceClient, volumeID, "")
|
|
framework.AllocateVolume(t, destClient, volumeID, "")
|
|
|
|
httpClient := framework.NewHTTPClient()
|
|
fid := framework.NewFileID(volumeID, 880003, 0x3456789A)
|
|
payload := []byte("tail-receiver-replicates-source-updates")
|
|
|
|
sourceUploadResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(0), fid, payload)
|
|
_ = framework.ReadAllAndClose(t, sourceUploadResp)
|
|
if sourceUploadResp.StatusCode != http.StatusCreated {
|
|
t.Fatalf("source upload expected 201, got %d", sourceUploadResp.StatusCode)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
_, err := destClient.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{
|
|
VolumeId: volumeID,
|
|
SourceVolumeServer: clusterHarness.VolumeAdminAddress(0) + "." + strings.Split(clusterHarness.VolumeGRPCAddress(0), ":")[1],
|
|
SinceNs: 0,
|
|
IdleTimeoutSeconds: 1,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("VolumeTailReceiver success path failed: %v", err)
|
|
}
|
|
|
|
destReadResp := framework.ReadBytes(t, httpClient, clusterHarness.VolumeAdminURL(1), fid)
|
|
destReadBody := framework.ReadAllAndClose(t, destReadResp)
|
|
if destReadResp.StatusCode != http.StatusOK {
|
|
t.Fatalf("destination read after tail receive expected 200, got %d", destReadResp.StatusCode)
|
|
}
|
|
if string(destReadBody) != string(payload) {
|
|
t.Fatalf("destination tail-received payload mismatch: got %q want %q", string(destReadBody), string(payload))
|
|
}
|
|
}
|
|
|
|
func TestVolumeTailSenderLargeNeedleChunking(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1())
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
|
defer conn.Close()
|
|
|
|
const volumeID = uint32(73)
|
|
framework.AllocateVolume(t, grpcClient, volumeID, "")
|
|
|
|
httpClient := framework.NewHTTPClient()
|
|
fid := framework.NewFileID(volumeID, 880004, 0x456789AB)
|
|
largePayload := bytes.Repeat([]byte("L"), 2*1024*1024+128*1024)
|
|
uploadResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), fid, largePayload)
|
|
_ = framework.ReadAllAndClose(t, uploadResp)
|
|
if uploadResp.StatusCode != http.StatusCreated {
|
|
t.Fatalf("large upload expected 201, got %d", uploadResp.StatusCode)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
stream, err := grpcClient.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{
|
|
VolumeId: volumeID,
|
|
SinceNs: 0,
|
|
IdleTimeoutSeconds: 1,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("VolumeTailSender start failed: %v", err)
|
|
}
|
|
|
|
dataChunkCount := 0
|
|
sawNonLastDataChunk := false
|
|
sawLastDataChunk := false
|
|
for {
|
|
msg, recvErr := stream.Recv()
|
|
if recvErr == io.EOF {
|
|
break
|
|
}
|
|
if recvErr != nil {
|
|
t.Fatalf("VolumeTailSender recv failed: %v", recvErr)
|
|
}
|
|
if len(msg.GetNeedleBody()) == 0 {
|
|
continue
|
|
}
|
|
dataChunkCount++
|
|
if msg.GetIsLastChunk() {
|
|
sawLastDataChunk = true
|
|
} else {
|
|
sawNonLastDataChunk = true
|
|
}
|
|
}
|
|
|
|
if dataChunkCount < 2 {
|
|
t.Fatalf("VolumeTailSender expected multiple chunks for large needle, got %d", dataChunkCount)
|
|
}
|
|
if !sawNonLastDataChunk {
|
|
t.Fatalf("VolumeTailSender expected at least one non-last data chunk")
|
|
}
|
|
if !sawLastDataChunk {
|
|
t.Fatalf("VolumeTailSender expected a final data chunk marked IsLastChunk=true")
|
|
}
|
|
}
|