|
|
|
@ -12,6 +12,8 @@ import ( |
|
|
|
"github.com/seaweedfs/seaweedfs/test/volume_server/framework" |
|
|
|
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" |
|
|
|
"google.golang.org/grpc/codes" |
|
|
|
"google.golang.org/grpc/status" |
|
|
|
) |
|
|
|
|
|
|
|
func TestVolumeTailSenderMissingVolume(t *testing.T) { |
|
|
|
@ -204,3 +206,46 @@ func TestVolumeTailSenderLargeNeedleChunking(t *testing.T) { |
|
|
|
t.Fatalf("VolumeTailSender expected a final data chunk marked IsLastChunk=true") |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func TestVolumeTailSenderStreamCancellation(t *testing.T) { |
|
|
|
if testing.Short() { |
|
|
|
t.Skip("skipping integration test in short mode") |
|
|
|
} |
|
|
|
|
|
|
|
clusterHarness := framework.StartSingleVolumeCluster(t, matrix.P1()) |
|
|
|
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) |
|
|
|
defer conn.Close() |
|
|
|
|
|
|
|
const volumeID = uint32(74) |
|
|
|
framework.AllocateVolume(t, grpcClient, volumeID, "") |
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
|
|
stream, err := grpcClient.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{ |
|
|
|
VolumeId: volumeID, |
|
|
|
SinceNs: 0, |
|
|
|
IdleTimeoutSeconds: 30, |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
cancel() |
|
|
|
t.Fatalf("VolumeTailSender start failed: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
firstMsg, err := stream.Recv() |
|
|
|
if err != nil { |
|
|
|
cancel() |
|
|
|
t.Fatalf("VolumeTailSender first recv failed: %v", err) |
|
|
|
} |
|
|
|
if !firstMsg.GetIsLastChunk() { |
|
|
|
cancel() |
|
|
|
t.Fatalf("expected heartbeat first message before cancellation") |
|
|
|
} |
|
|
|
|
|
|
|
cancel() |
|
|
|
_, err = stream.Recv() |
|
|
|
if err == nil { |
|
|
|
t.Fatalf("VolumeTailSender recv after cancellation expected error") |
|
|
|
} |
|
|
|
if status.Code(err) != codes.Canceled { |
|
|
|
t.Fatalf("VolumeTailSender cancellation expected grpc code Canceled, got %v (%v)", status.Code(err), err) |
|
|
|
} |
|
|
|
} |