diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 3f4ad8760..489c6ee57 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -1202,12 +1202,6 @@ impl VolumeServer for VolumeGrpcService { } } - // StopOffset 0 means nothing to read - if req.stop_offset == 0 { - let stream = tokio_stream::iter(Vec::new()); - return Ok(Response::new(Box::pin(stream))); - } - // Open file and read content let file = match std::fs::File::open(&file_name) { Ok(f) => f, diff --git a/test/volume_server/grpc/copy_receive_variants_test.go b/test/volume_server/grpc/copy_receive_variants_test.go index d9d70c28a..3a82822b2 100644 --- a/test/volume_server/grpc/copy_receive_variants_test.go +++ b/test/volume_server/grpc/copy_receive_variants_test.go @@ -4,6 +4,7 @@ import ( "context" "io" "math" + "net/http" "strings" "testing" "time" @@ -132,6 +133,64 @@ func TestCopyFileIgnoreNotFoundAndStopOffsetZeroPaths(t *testing.T) { } } +func TestCopyFileStopOffsetZeroExistingFileSendsMetadata(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + clusterHarness := framework.StartVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + const volumeID = uint32(93) + const needleID = uint64(770101) + const cookie = uint32(0x1234ABCD) + framework.AllocateVolume(t, grpcClient, volumeID, "") + + client := framework.NewHTTPClient() + uploadResp := framework.UploadBytes( + t, + client, + clusterHarness.VolumeAdminURL(), + framework.NewFileID(volumeID, needleID, cookie), + []byte("copy-file-stop-zero"), + ) + _ = framework.ReadAllAndClose(t, uploadResp) + if uploadResp.StatusCode != http.StatusCreated { + t.Fatalf("upload expected 201, got %d", uploadResp.StatusCode) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + stream, err := grpcClient.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ + VolumeId: volumeID, + Ext: ".dat", + CompactionRevision: math.MaxUint32, + StopOffset: 0, + IgnoreSourceFileNotFound: false, + }) + if err != nil { + t.Fatalf("CopyFile stop_offset=0 existing file start failed: %v", err) + } + + msg, err := stream.Recv() + if err != nil { + t.Fatalf("CopyFile stop_offset=0 existing file recv failed: %v", err) + } + if len(msg.GetFileContent()) != 0 { + t.Fatalf("CopyFile stop_offset=0 existing file should not send content, got %d bytes", len(msg.GetFileContent())) + } + if msg.GetModifiedTsNs() == 0 { + t.Fatalf("CopyFile stop_offset=0 existing file expected non-zero ModifiedTsNs") + } + + _, err = stream.Recv() + if err != io.EOF { + t.Fatalf("CopyFile stop_offset=0 existing file expected EOF after metadata frame, got: %v", err) + } +} + func TestCopyFileCompactionRevisionMismatch(t *testing.T) { if testing.Short() { t.Skip("skipping integration test in short mode")