Browse Source

Match Go CopyFile zero-stop metadata

rust-volume-server
Chris Lu 3 days ago
parent
commit
5f37482a04
  1. 6
      seaweed-volume/src/server/grpc_server.rs
  2. 59
      test/volume_server/grpc/copy_receive_variants_test.go

6
seaweed-volume/src/server/grpc_server.rs

@ -1202,12 +1202,6 @@ impl VolumeServer for VolumeGrpcService {
}
}
// StopOffset 0 means nothing to read
if req.stop_offset == 0 {
let stream = tokio_stream::iter(Vec::new());
return Ok(Response::new(Box::pin(stream)));
}
// Open file and read content
let file = match std::fs::File::open(&file_name) {
Ok(f) => f,

59
test/volume_server/grpc/copy_receive_variants_test.go

@ -4,6 +4,7 @@ import (
"context"
"io"
"math"
"net/http"
"strings"
"testing"
"time"
@ -132,6 +133,64 @@ func TestCopyFileIgnoreNotFoundAndStopOffsetZeroPaths(t *testing.T) {
}
}
func TestCopyFileStopOffsetZeroExistingFileSendsMetadata(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
const volumeID = uint32(93)
const needleID = uint64(770101)
const cookie = uint32(0x1234ABCD)
framework.AllocateVolume(t, grpcClient, volumeID, "")
client := framework.NewHTTPClient()
uploadResp := framework.UploadBytes(
t,
client,
clusterHarness.VolumeAdminURL(),
framework.NewFileID(volumeID, needleID, cookie),
[]byte("copy-file-stop-zero"),
)
_ = framework.ReadAllAndClose(t, uploadResp)
if uploadResp.StatusCode != http.StatusCreated {
t.Fatalf("upload expected 201, got %d", uploadResp.StatusCode)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := grpcClient.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
VolumeId: volumeID,
Ext: ".dat",
CompactionRevision: math.MaxUint32,
StopOffset: 0,
IgnoreSourceFileNotFound: false,
})
if err != nil {
t.Fatalf("CopyFile stop_offset=0 existing file start failed: %v", err)
}
msg, err := stream.Recv()
if err != nil {
t.Fatalf("CopyFile stop_offset=0 existing file recv failed: %v", err)
}
if len(msg.GetFileContent()) != 0 {
t.Fatalf("CopyFile stop_offset=0 existing file should not send content, got %d bytes", len(msg.GetFileContent()))
}
if msg.GetModifiedTsNs() == 0 {
t.Fatalf("CopyFile stop_offset=0 existing file expected non-zero ModifiedTsNs")
}
_, err = stream.Recv()
if err != io.EOF {
t.Fatalf("CopyFile stop_offset=0 existing file expected EOF after metadata frame, got: %v", err)
}
}
func TestCopyFileCompactionRevisionMismatch(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")

Loading…
Cancel
Save