From d0748300161bfec8adffad12ee1d2abc0a16cdc9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 29 Mar 2026 18:47:15 -0700 Subject: [PATCH] fix(worker): pass compaction revision and file sizes in EC volume copy (#8835) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(worker): pass compaction revision and file sizes in EC volume copy The worker EC task was sending CopyFile requests without the current compaction revision (defaulting to 0) and with StopOffset set to math.MaxInt64. After a vacuum compaction this caused the volume server to reject the copy or return stale data. Read the volume file status first and forward the compaction revision and actual file sizes so the copy is consistent with the compacted volume. * propagate erasure coding task context * fix(worker): validate volume file status and detect short copies Reject zero dat file size from ReadVolumeFileStatus — a zero-sized snapshot would produce 0-byte copies and broken EC shards. After streaming, verify totalBytes matches the expected stopOffset and return an error on short copies instead of logging success. * fix(worker): reject zero idx file size in volume status validation A non-empty dat with zero idx indicates an empty or corrupt volume. Without this guard, copyFileFromSource gets stopOffset=0, produces a 0-byte .idx, passes the short-copy check, and generateEcShardsLocally runs against a volume with no index. * fix fake plugin volume file status * fix plugin volume balance test fixtures --- test/plugin_workers/fake_volume_server.go | 97 +++++++++++++++- .../volume_balance/execution_test.go | 15 ++- weed/worker/tasks/erasure_coding/ec_task.go | 73 ++++++++---- .../tasks/erasure_coding/ec_task_test.go | 108 ++++++++++++++++++ 4 files changed, 264 insertions(+), 29 deletions(-) create mode 100644 weed/worker/tasks/erasure_coding/ec_task_test.go diff --git a/test/plugin_workers/fake_volume_server.go b/test/plugin_workers/fake_volume_server.go index 5ad35edff..6978b7c76 100644 --- a/test/plugin_workers/fake_volume_server.go +++ b/test/plugin_workers/fake_volume_server.go @@ -11,9 +11,11 @@ import ( "testing" "time" + "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) // VolumeServer provides a minimal volume server for erasure coding tests. @@ -196,12 +198,25 @@ func (v *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream vo defer file.Close() buf := make([]byte, 64*1024) + remaining := int64(req.GetStopOffset()) for { - n, readErr := file.Read(buf) + if remaining == 0 { + break + } + + readBuf := buf + if remaining > 0 && remaining < int64(len(buf)) { + readBuf = buf[:remaining] + } + + n, readErr := file.Read(readBuf) if n > 0 { - if err := stream.Send(&volume_server_pb.CopyFileResponse{FileContent: buf[:n]}); err != nil { + if err := stream.Send(&volume_server_pb.CopyFileResponse{FileContent: readBuf[:n]}); err != nil { return err } + if remaining > 0 { + remaining -= int64(n) + } } if readErr == io.EOF { break @@ -307,10 +322,21 @@ func (v *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_ser v.mu.Lock() v.readFileStatusCalls++ v.mu.Unlock() + + datInfo, err := os.Stat(v.filePath(req.VolumeId, ".dat")) + if err != nil { + return nil, err + } + + idxInfo, err := os.Stat(v.filePath(req.VolumeId, ".idx")) + if err != nil { + return nil, err + } + return &volume_server_pb.ReadVolumeFileStatusResponse{ VolumeId: req.VolumeId, - DatFileSize: 1024, - IdxFileSize: 16, + DatFileSize: uint64(datInfo.Size()), + IdxFileSize: uint64(idxInfo.Size()), FileCount: 1, }, nil } @@ -349,7 +375,27 @@ func (v *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, strea v.volumeCopyCalls++ v.mu.Unlock() - if err := stream.Send(&volume_server_pb.VolumeCopyResponse{ProcessedBytes: 1024}); err != nil { + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + var statusResp *volume_server_pb.ReadVolumeFileStatusResponse + if err := operation.WithVolumeServerClient(false, pb.ServerAddress(req.SourceDataNode), dialOption, + func(client volume_server_pb.VolumeServerClient) error { + var readErr error + statusResp, readErr = client.ReadVolumeFileStatus(stream.Context(), &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: req.VolumeId, + }) + return readErr + }); err != nil { + return err + } + + if err := v.copyRemoteFile(stream.Context(), req.SourceDataNode, req.VolumeId, ".dat", statusResp.DatFileSize, dialOption); err != nil { + return err + } + if err := v.copyRemoteFile(stream.Context(), req.SourceDataNode, req.VolumeId, ".idx", statusResp.IdxFileSize, dialOption); err != nil { + return err + } + + if err := stream.Send(&volume_server_pb.VolumeCopyResponse{ProcessedBytes: int64(statusResp.DatFileSize + statusResp.IdxFileSize)}); err != nil { return err } return stream.Send(&volume_server_pb.VolumeCopyResponse{LastAppendAtNs: uint64(time.Now().UnixNano())}) @@ -368,3 +414,44 @@ func (v *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serve v.mu.Unlock() return &volume_server_pb.VolumeTailReceiverResponse{}, nil } + +func (v *VolumeServer) copyRemoteFile(ctx context.Context, sourceDataNode string, volumeID uint32, ext string, fileSize uint64, dialOption grpc.DialOption) error { + path := v.filePath(volumeID, ext) + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return err + } + + file, err := os.Create(path) + if err != nil { + return err + } + defer file.Close() + + return operation.WithVolumeServerClient(true, pb.ServerAddress(sourceDataNode), dialOption, + func(client volume_server_pb.VolumeServerClient) error { + stream, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ + VolumeId: volumeID, + Ext: ext, + StopOffset: fileSize, + }) + if err != nil { + return err + } + + for { + resp, recvErr := stream.Recv() + if recvErr == io.EOF { + return nil + } + if recvErr != nil { + return recvErr + } + if len(resp.FileContent) == 0 { + continue + } + if _, err := file.Write(resp.FileContent); err != nil { + return err + } + } + }) +} diff --git a/test/plugin_workers/volume_balance/execution_test.go b/test/plugin_workers/volume_balance/execution_test.go index 6e711227d..da5c41286 100644 --- a/test/plugin_workers/volume_balance/execution_test.go +++ b/test/plugin_workers/volume_balance/execution_test.go @@ -16,6 +16,8 @@ import ( "google.golang.org/protobuf/proto" ) +const testVolumeDatSize = 1 * 1024 * 1024 + func TestVolumeBalanceExecutionIntegration(t *testing.T) { volumeID := uint32(303) @@ -31,6 +33,7 @@ func TestVolumeBalanceExecutionIntegration(t *testing.T) { source := pluginworkers.NewVolumeServer(t, "") target := pluginworkers.NewVolumeServer(t, "") + pluginworkers.WriteTestVolumeFiles(t, source.BaseDir(), volumeID, testVolumeDatSize) job := &plugin_pb.JobSpec{ JobId: fmt.Sprintf("balance-job-%d", volumeID), @@ -84,6 +87,9 @@ func TestVolumeBalanceBatchExecutionIntegration(t *testing.T) { // Build a batch job with 3 volume moves from source → target. volumeIDs := []uint32{401, 402, 403} + for _, vid := range volumeIDs { + pluginworkers.WriteTestVolumeFiles(t, source.BaseDir(), vid, testVolumeDatSize) + } moves := make([]*worker_pb.BalanceMoveSpec, len(volumeIDs)) for i, vid := range volumeIDs { moves[i] = &worker_pb.BalanceMoveSpec{ @@ -139,10 +145,11 @@ func TestVolumeBalanceBatchExecutionIntegration(t *testing.T) { require.True(t, deletedVols[vid], "volume %d should have been deleted from source", vid) } - // Pre-delete verification should have called ReadVolumeFileStatus on both - // source and target for each volume. - require.Equal(t, len(volumeIDs), source.ReadFileStatusCount(), - "each move should read source volume status before delete") + // Each move reads source status once before copy and once inside the + // target's fake VolumeCopy implementation, then reads target status once + // before deleting the source. + require.Equal(t, len(volumeIDs)*2, source.ReadFileStatusCount(), + "each move should read source volume status before copy and during target copy") require.Equal(t, len(volumeIDs), target.ReadFileStatusCount(), "each move should read target volume status before delete") diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go index e2e130f1b..bf0c84731 100644 --- a/weed/worker/tasks/erasure_coding/ec_task.go +++ b/weed/worker/tasks/erasure_coding/ec_task.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "math" "os" "path/filepath" "strings" @@ -146,14 +145,14 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP // Step 1: Mark volume readonly t.ReportProgressWithStage(10.0, "Marking volume readonly") t.GetLogger().Info("Marking volume readonly") - if err := t.markVolumeReadonly(); err != nil { + if err := t.markVolumeReadonly(ctx); err != nil { return fmt.Errorf("failed to mark volume readonly: %v", err) } // Step 2: Copy volume files to worker t.ReportProgressWithStage(25.0, "Copying volume files to worker") t.GetLogger().Info("Copying volume files to worker") - localFiles, err := t.copyVolumeFilesToWorker(taskWorkDir) + localFiles, err := t.copyVolumeFilesToWorker(ctx, taskWorkDir) if err != nil { return fmt.Errorf("failed to copy volume files: %v", err) } @@ -183,7 +182,7 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP // Step 6: Delete original volume t.ReportProgressWithStage(90.0, "Deleting original volume") t.GetLogger().Info("Deleting original volume") - if err := t.deleteOriginalVolume(); err != nil { + if err := t.deleteOriginalVolume(ctx); err != nil { return fmt.Errorf("failed to delete original volume: %v", err) } @@ -250,10 +249,10 @@ func (t *ErasureCodingTask) GetProgress() float64 { // Helper methods for actual EC operations // markVolumeReadonly marks the volume as readonly on the source server -func (t *ErasureCodingTask) markVolumeReadonly() error { +func (t *ErasureCodingTask) markVolumeReadonly(ctx context.Context) error { return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ + _, err := client.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: t.volumeID, }) return err @@ -261,18 +260,26 @@ func (t *ErasureCodingTask) markVolumeReadonly() error { } // copyVolumeFilesToWorker copies .dat and .idx files from source server to local worker -func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string]string, error) { +func (t *ErasureCodingTask) copyVolumeFilesToWorker(ctx context.Context, workDir string) (map[string]string, error) { localFiles := make(map[string]string) + fileStatus, err := t.readSourceVolumeFileStatus(ctx) + if err != nil { + return nil, fmt.Errorf("failed to read source volume file status: %v", err) + } + t.GetLogger().WithFields(map[string]interface{}{ - "volume_id": t.volumeID, - "source": t.server, - "working_dir": workDir, + "volume_id": t.volumeID, + "source": t.server, + "working_dir": workDir, + "compaction_revision": fileStatus.GetCompactionRevision(), + "dat_file_size_bytes": fileStatus.GetDatFileSize(), + "idx_file_size_bytes": fileStatus.GetIdxFileSize(), }).Info("Starting volume file copy from source server") // Copy .dat file datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID)) - if err := t.copyFileFromSource(".dat", datFile); err != nil { + if err := t.copyFileFromSource(ctx, ".dat", datFile, fileStatus.GetCompactionRevision(), fileStatus.GetDatFileSize()); err != nil { return nil, fmt.Errorf("failed to copy .dat file: %v", err) } localFiles["dat"] = datFile @@ -289,7 +296,7 @@ func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string] // Copy .idx file idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID)) - if err := t.copyFileFromSource(".idx", idxFile); err != nil { + if err := t.copyFileFromSource(ctx, ".idx", idxFile, fileStatus.GetCompactionRevision(), fileStatus.GetIdxFileSize()); err != nil { return nil, fmt.Errorf("failed to copy .idx file: %v", err) } localFiles["idx"] = idxFile @@ -307,15 +314,38 @@ func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string] return localFiles, nil } +func (t *ErasureCodingTask) readSourceVolumeFileStatus(ctx context.Context) (*volume_server_pb.ReadVolumeFileStatusResponse, error) { + var statusResp *volume_server_pb.ReadVolumeFileStatusResponse + err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + var readErr error + statusResp, readErr = client.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: t.volumeID, + }) + return readErr + }) + if err != nil { + return nil, err + } + if statusResp.GetDatFileSize() == 0 { + return nil, fmt.Errorf("volume %d on %s reports zero dat file size", t.volumeID, t.server) + } + if statusResp.GetIdxFileSize() == 0 { + return nil, fmt.Errorf("volume %d on %s reports zero idx file size with non-empty dat", t.volumeID, t.server) + } + return statusResp, nil +} + // copyFileFromSource copies a file from source server to local path using gRPC streaming -func (t *ErasureCodingTask) copyFileFromSource(ext, localPath string) error { +func (t *ErasureCodingTask) copyFileFromSource(ctx context.Context, ext, localPath string, compactionRevision uint32, stopOffset uint64) error { return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ - VolumeId: t.volumeID, - Collection: t.collection, - Ext: ext, - StopOffset: uint64(math.MaxInt64), + stream, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + Ext: ext, + CompactionRevision: compactionRevision, + StopOffset: stopOffset, }) if err != nil { return fmt.Errorf("failed to initiate file copy: %v", err) @@ -348,6 +378,9 @@ func (t *ErasureCodingTask) copyFileFromSource(ext, localPath string) error { } } + if totalBytes != int64(stopOffset) { + return fmt.Errorf("short copy of %s: got %d bytes, expected %d", ext, totalBytes, stopOffset) + } glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.server, localPath) return nil }) @@ -468,7 +501,7 @@ func (t *ErasureCodingTask) mountEcShards() error { } // deleteOriginalVolume deletes the original volume and all its replicas from all servers -func (t *ErasureCodingTask) deleteOriginalVolume() error { +func (t *ErasureCodingTask) deleteOriginalVolume(ctx context.Context) error { // Get replicas from task parameters (set during detection) replicas := t.getReplicas() @@ -497,7 +530,7 @@ func (t *ErasureCodingTask) deleteOriginalVolume() error { err := operation.WithVolumeServerClient(false, pb.ServerAddress(replicaServer), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ + _, err := client.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{ VolumeId: t.volumeID, OnlyEmpty: false, // Force delete since we've created EC shards }) diff --git a/weed/worker/tasks/erasure_coding/ec_task_test.go b/weed/worker/tasks/erasure_coding/ec_task_test.go new file mode 100644 index 000000000..a6a4ab5d6 --- /dev/null +++ b/weed/worker/tasks/erasure_coding/ec_task_test.go @@ -0,0 +1,108 @@ +package erasure_coding + +import ( + "context" + "io" + "net/http" + "os" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestCopyVolumeFilesToWorkerUsesCurrentCompactionRevision(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + clusterHarness := framework.StartVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + const volumeID = uint32(951) + framework.AllocateVolume(t, grpcClient, volumeID, "") + + httpClient := framework.NewHTTPClient() + + liveFID := framework.NewFileID(volumeID, 1001, 0x1111AAAA) + liveUploadResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), liveFID, []byte("live-payload-for-ec-copy")) + _ = framework.ReadAllAndClose(t, liveUploadResp) + require.Equal(t, http.StatusCreated, liveUploadResp.StatusCode) + + deletedFID := framework.NewFileID(volumeID, 1002, 0x2222BBBB) + deletedUploadResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), deletedFID, []byte("deleted-payload-for-vacuum")) + _ = framework.ReadAllAndClose(t, deletedUploadResp) + require.Equal(t, http.StatusCreated, deletedUploadResp.StatusCode) + + deleteReq, err := http.NewRequest(http.MethodDelete, clusterHarness.VolumeAdminURL()+"/"+deletedFID, nil) + require.NoError(t, err) + deleteResp := framework.DoRequest(t, httpClient, deleteReq) + _ = framework.ReadAllAndClose(t, deleteResp) + require.Equal(t, http.StatusAccepted, deleteResp.StatusCode) + + compactVolumeOnce(t, grpcClient, volumeID) + + task := NewErasureCodingTask( + "copy-after-compaction", + clusterHarness.VolumeServerAddress(), + volumeID, + "", + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + require.NoError(t, task.markVolumeReadonly(ctx)) + + fileStatus, err := task.readSourceVolumeFileStatus(ctx) + require.NoError(t, err) + require.Greater(t, fileStatus.GetCompactionRevision(), uint32(0)) + + localFiles, err := task.copyVolumeFilesToWorker(ctx, t.TempDir()) + require.NoError(t, err) + + datInfo, err := os.Stat(localFiles["dat"]) + require.NoError(t, err) + require.Equal(t, int64(fileStatus.GetDatFileSize()), datInfo.Size()) + + idxInfo, err := os.Stat(localFiles["idx"]) + require.NoError(t, err) + require.Equal(t, int64(fileStatus.GetIdxFileSize()), idxInfo.Size()) +} + +func compactVolumeOnce(t *testing.T, grpcClient volume_server_pb.VolumeServerClient, volumeID uint32) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + compactStream, err := grpcClient.VacuumVolumeCompact(ctx, &volume_server_pb.VacuumVolumeCompactRequest{ + VolumeId: volumeID, + }) + require.NoError(t, err) + + for { + _, err = compactStream.Recv() + if err == io.EOF { + break + } + require.NoError(t, err) + } + + _, err = grpcClient.VacuumVolumeCommit(ctx, &volume_server_pb.VacuumVolumeCommitRequest{ + VolumeId: volumeID, + }) + require.NoError(t, err) + + _, err = grpcClient.VacuumVolumeCleanup(ctx, &volume_server_pb.VacuumVolumeCleanupRequest{ + VolumeId: volumeID, + }) + require.NoError(t, err) +}