diff --git a/test/plugin_workers/fake_volume_server.go b/test/plugin_workers/fake_volume_server.go index 5ad35edff..6978b7c76 100644 --- a/test/plugin_workers/fake_volume_server.go +++ b/test/plugin_workers/fake_volume_server.go @@ -11,9 +11,11 @@ import ( "testing" "time" + "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) // VolumeServer provides a minimal volume server for erasure coding tests. @@ -196,12 +198,25 @@ func (v *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream vo defer file.Close() buf := make([]byte, 64*1024) + remaining := int64(req.GetStopOffset()) for { - n, readErr := file.Read(buf) + if remaining == 0 { + break + } + + readBuf := buf + if remaining > 0 && remaining < int64(len(buf)) { + readBuf = buf[:remaining] + } + + n, readErr := file.Read(readBuf) if n > 0 { - if err := stream.Send(&volume_server_pb.CopyFileResponse{FileContent: buf[:n]}); err != nil { + if err := stream.Send(&volume_server_pb.CopyFileResponse{FileContent: readBuf[:n]}); err != nil { return err } + if remaining > 0 { + remaining -= int64(n) + } } if readErr == io.EOF { break @@ -307,10 +322,21 @@ func (v *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_ser v.mu.Lock() v.readFileStatusCalls++ v.mu.Unlock() + + datInfo, err := os.Stat(v.filePath(req.VolumeId, ".dat")) + if err != nil { + return nil, err + } + + idxInfo, err := os.Stat(v.filePath(req.VolumeId, ".idx")) + if err != nil { + return nil, err + } + return &volume_server_pb.ReadVolumeFileStatusResponse{ VolumeId: req.VolumeId, - DatFileSize: 1024, - IdxFileSize: 16, + DatFileSize: uint64(datInfo.Size()), + IdxFileSize: uint64(idxInfo.Size()), FileCount: 1, }, nil } @@ -349,7 +375,27 @@ func (v *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, strea v.volumeCopyCalls++ v.mu.Unlock() - if err := stream.Send(&volume_server_pb.VolumeCopyResponse{ProcessedBytes: 1024}); err != nil { + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + var statusResp *volume_server_pb.ReadVolumeFileStatusResponse + if err := operation.WithVolumeServerClient(false, pb.ServerAddress(req.SourceDataNode), dialOption, + func(client volume_server_pb.VolumeServerClient) error { + var readErr error + statusResp, readErr = client.ReadVolumeFileStatus(stream.Context(), &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: req.VolumeId, + }) + return readErr + }); err != nil { + return err + } + + if err := v.copyRemoteFile(stream.Context(), req.SourceDataNode, req.VolumeId, ".dat", statusResp.DatFileSize, dialOption); err != nil { + return err + } + if err := v.copyRemoteFile(stream.Context(), req.SourceDataNode, req.VolumeId, ".idx", statusResp.IdxFileSize, dialOption); err != nil { + return err + } + + if err := stream.Send(&volume_server_pb.VolumeCopyResponse{ProcessedBytes: int64(statusResp.DatFileSize + statusResp.IdxFileSize)}); err != nil { return err } return stream.Send(&volume_server_pb.VolumeCopyResponse{LastAppendAtNs: uint64(time.Now().UnixNano())}) @@ -368,3 +414,44 @@ func (v *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serve v.mu.Unlock() return &volume_server_pb.VolumeTailReceiverResponse{}, nil } + +func (v *VolumeServer) copyRemoteFile(ctx context.Context, sourceDataNode string, volumeID uint32, ext string, fileSize uint64, dialOption grpc.DialOption) error { + path := v.filePath(volumeID, ext) + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return err + } + + file, err := os.Create(path) + if err != nil { + return err + } + defer file.Close() + + return operation.WithVolumeServerClient(true, pb.ServerAddress(sourceDataNode), dialOption, + func(client volume_server_pb.VolumeServerClient) error { + stream, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ + VolumeId: volumeID, + Ext: ext, + StopOffset: fileSize, + }) + if err != nil { + return err + } + + for { + resp, recvErr := stream.Recv() + if recvErr == io.EOF { + return nil + } + if recvErr != nil { + return recvErr + } + if len(resp.FileContent) == 0 { + continue + } + if _, err := file.Write(resp.FileContent); err != nil { + return err + } + } + }) +} diff --git a/test/plugin_workers/volume_balance/execution_test.go b/test/plugin_workers/volume_balance/execution_test.go index 6e711227d..da5c41286 100644 --- a/test/plugin_workers/volume_balance/execution_test.go +++ b/test/plugin_workers/volume_balance/execution_test.go @@ -16,6 +16,8 @@ import ( "google.golang.org/protobuf/proto" ) +const testVolumeDatSize = 1 * 1024 * 1024 + func TestVolumeBalanceExecutionIntegration(t *testing.T) { volumeID := uint32(303) @@ -31,6 +33,7 @@ func TestVolumeBalanceExecutionIntegration(t *testing.T) { source := pluginworkers.NewVolumeServer(t, "") target := pluginworkers.NewVolumeServer(t, "") + pluginworkers.WriteTestVolumeFiles(t, source.BaseDir(), volumeID, testVolumeDatSize) job := &plugin_pb.JobSpec{ JobId: fmt.Sprintf("balance-job-%d", volumeID), @@ -84,6 +87,9 @@ func TestVolumeBalanceBatchExecutionIntegration(t *testing.T) { // Build a batch job with 3 volume moves from source → target. volumeIDs := []uint32{401, 402, 403} + for _, vid := range volumeIDs { + pluginworkers.WriteTestVolumeFiles(t, source.BaseDir(), vid, testVolumeDatSize) + } moves := make([]*worker_pb.BalanceMoveSpec, len(volumeIDs)) for i, vid := range volumeIDs { moves[i] = &worker_pb.BalanceMoveSpec{ @@ -139,10 +145,11 @@ func TestVolumeBalanceBatchExecutionIntegration(t *testing.T) { require.True(t, deletedVols[vid], "volume %d should have been deleted from source", vid) } - // Pre-delete verification should have called ReadVolumeFileStatus on both - // source and target for each volume. - require.Equal(t, len(volumeIDs), source.ReadFileStatusCount(), - "each move should read source volume status before delete") + // Each move reads source status once before copy and once inside the + // target's fake VolumeCopy implementation, then reads target status once + // before deleting the source. + require.Equal(t, len(volumeIDs)*2, source.ReadFileStatusCount(), + "each move should read source volume status before copy and during target copy") require.Equal(t, len(volumeIDs), target.ReadFileStatusCount(), "each move should read target volume status before delete") diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go index e2e130f1b..bf0c84731 100644 --- a/weed/worker/tasks/erasure_coding/ec_task.go +++ b/weed/worker/tasks/erasure_coding/ec_task.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "math" "os" "path/filepath" "strings" @@ -146,14 +145,14 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP // Step 1: Mark volume readonly t.ReportProgressWithStage(10.0, "Marking volume readonly") t.GetLogger().Info("Marking volume readonly") - if err := t.markVolumeReadonly(); err != nil { + if err := t.markVolumeReadonly(ctx); err != nil { return fmt.Errorf("failed to mark volume readonly: %v", err) } // Step 2: Copy volume files to worker t.ReportProgressWithStage(25.0, "Copying volume files to worker") t.GetLogger().Info("Copying volume files to worker") - localFiles, err := t.copyVolumeFilesToWorker(taskWorkDir) + localFiles, err := t.copyVolumeFilesToWorker(ctx, taskWorkDir) if err != nil { return fmt.Errorf("failed to copy volume files: %v", err) } @@ -183,7 +182,7 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP // Step 6: Delete original volume t.ReportProgressWithStage(90.0, "Deleting original volume") t.GetLogger().Info("Deleting original volume") - if err := t.deleteOriginalVolume(); err != nil { + if err := t.deleteOriginalVolume(ctx); err != nil { return fmt.Errorf("failed to delete original volume: %v", err) } @@ -250,10 +249,10 @@ func (t *ErasureCodingTask) GetProgress() float64 { // Helper methods for actual EC operations // markVolumeReadonly marks the volume as readonly on the source server -func (t *ErasureCodingTask) markVolumeReadonly() error { +func (t *ErasureCodingTask) markVolumeReadonly(ctx context.Context) error { return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ + _, err := client.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: t.volumeID, }) return err @@ -261,18 +260,26 @@ func (t *ErasureCodingTask) markVolumeReadonly() error { } // copyVolumeFilesToWorker copies .dat and .idx files from source server to local worker -func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string]string, error) { +func (t *ErasureCodingTask) copyVolumeFilesToWorker(ctx context.Context, workDir string) (map[string]string, error) { localFiles := make(map[string]string) + fileStatus, err := t.readSourceVolumeFileStatus(ctx) + if err != nil { + return nil, fmt.Errorf("failed to read source volume file status: %v", err) + } + t.GetLogger().WithFields(map[string]interface{}{ - "volume_id": t.volumeID, - "source": t.server, - "working_dir": workDir, + "volume_id": t.volumeID, + "source": t.server, + "working_dir": workDir, + "compaction_revision": fileStatus.GetCompactionRevision(), + "dat_file_size_bytes": fileStatus.GetDatFileSize(), + "idx_file_size_bytes": fileStatus.GetIdxFileSize(), }).Info("Starting volume file copy from source server") // Copy .dat file datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID)) - if err := t.copyFileFromSource(".dat", datFile); err != nil { + if err := t.copyFileFromSource(ctx, ".dat", datFile, fileStatus.GetCompactionRevision(), fileStatus.GetDatFileSize()); err != nil { return nil, fmt.Errorf("failed to copy .dat file: %v", err) } localFiles["dat"] = datFile @@ -289,7 +296,7 @@ func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string] // Copy .idx file idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID)) - if err := t.copyFileFromSource(".idx", idxFile); err != nil { + if err := t.copyFileFromSource(ctx, ".idx", idxFile, fileStatus.GetCompactionRevision(), fileStatus.GetIdxFileSize()); err != nil { return nil, fmt.Errorf("failed to copy .idx file: %v", err) } localFiles["idx"] = idxFile @@ -307,15 +314,38 @@ func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string] return localFiles, nil } +func (t *ErasureCodingTask) readSourceVolumeFileStatus(ctx context.Context) (*volume_server_pb.ReadVolumeFileStatusResponse, error) { + var statusResp *volume_server_pb.ReadVolumeFileStatusResponse + err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + var readErr error + statusResp, readErr = client.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: t.volumeID, + }) + return readErr + }) + if err != nil { + return nil, err + } + if statusResp.GetDatFileSize() == 0 { + return nil, fmt.Errorf("volume %d on %s reports zero dat file size", t.volumeID, t.server) + } + if statusResp.GetIdxFileSize() == 0 { + return nil, fmt.Errorf("volume %d on %s reports zero idx file size with non-empty dat", t.volumeID, t.server) + } + return statusResp, nil +} + // copyFileFromSource copies a file from source server to local path using gRPC streaming -func (t *ErasureCodingTask) copyFileFromSource(ext, localPath string) error { +func (t *ErasureCodingTask) copyFileFromSource(ctx context.Context, ext, localPath string, compactionRevision uint32, stopOffset uint64) error { return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ - VolumeId: t.volumeID, - Collection: t.collection, - Ext: ext, - StopOffset: uint64(math.MaxInt64), + stream, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + Ext: ext, + CompactionRevision: compactionRevision, + StopOffset: stopOffset, }) if err != nil { return fmt.Errorf("failed to initiate file copy: %v", err) @@ -348,6 +378,9 @@ func (t *ErasureCodingTask) copyFileFromSource(ext, localPath string) error { } } + if totalBytes != int64(stopOffset) { + return fmt.Errorf("short copy of %s: got %d bytes, expected %d", ext, totalBytes, stopOffset) + } glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.server, localPath) return nil }) @@ -468,7 +501,7 @@ func (t *ErasureCodingTask) mountEcShards() error { } // deleteOriginalVolume deletes the original volume and all its replicas from all servers -func (t *ErasureCodingTask) deleteOriginalVolume() error { +func (t *ErasureCodingTask) deleteOriginalVolume(ctx context.Context) error { // Get replicas from task parameters (set during detection) replicas := t.getReplicas() @@ -497,7 +530,7 @@ func (t *ErasureCodingTask) deleteOriginalVolume() error { err := operation.WithVolumeServerClient(false, pb.ServerAddress(replicaServer), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ + _, err := client.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{ VolumeId: t.volumeID, OnlyEmpty: false, // Force delete since we've created EC shards }) diff --git a/weed/worker/tasks/erasure_coding/ec_task_test.go b/weed/worker/tasks/erasure_coding/ec_task_test.go new file mode 100644 index 000000000..a6a4ab5d6 --- /dev/null +++ b/weed/worker/tasks/erasure_coding/ec_task_test.go @@ -0,0 +1,108 @@ +package erasure_coding + +import ( + "context" + "io" + "net/http" + "os" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestCopyVolumeFilesToWorkerUsesCurrentCompactionRevision(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + clusterHarness := framework.StartVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + const volumeID = uint32(951) + framework.AllocateVolume(t, grpcClient, volumeID, "") + + httpClient := framework.NewHTTPClient() + + liveFID := framework.NewFileID(volumeID, 1001, 0x1111AAAA) + liveUploadResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), liveFID, []byte("live-payload-for-ec-copy")) + _ = framework.ReadAllAndClose(t, liveUploadResp) + require.Equal(t, http.StatusCreated, liveUploadResp.StatusCode) + + deletedFID := framework.NewFileID(volumeID, 1002, 0x2222BBBB) + deletedUploadResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), deletedFID, []byte("deleted-payload-for-vacuum")) + _ = framework.ReadAllAndClose(t, deletedUploadResp) + require.Equal(t, http.StatusCreated, deletedUploadResp.StatusCode) + + deleteReq, err := http.NewRequest(http.MethodDelete, clusterHarness.VolumeAdminURL()+"/"+deletedFID, nil) + require.NoError(t, err) + deleteResp := framework.DoRequest(t, httpClient, deleteReq) + _ = framework.ReadAllAndClose(t, deleteResp) + require.Equal(t, http.StatusAccepted, deleteResp.StatusCode) + + compactVolumeOnce(t, grpcClient, volumeID) + + task := NewErasureCodingTask( + "copy-after-compaction", + clusterHarness.VolumeServerAddress(), + volumeID, + "", + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + require.NoError(t, task.markVolumeReadonly(ctx)) + + fileStatus, err := task.readSourceVolumeFileStatus(ctx) + require.NoError(t, err) + require.Greater(t, fileStatus.GetCompactionRevision(), uint32(0)) + + localFiles, err := task.copyVolumeFilesToWorker(ctx, t.TempDir()) + require.NoError(t, err) + + datInfo, err := os.Stat(localFiles["dat"]) + require.NoError(t, err) + require.Equal(t, int64(fileStatus.GetDatFileSize()), datInfo.Size()) + + idxInfo, err := os.Stat(localFiles["idx"]) + require.NoError(t, err) + require.Equal(t, int64(fileStatus.GetIdxFileSize()), idxInfo.Size()) +} + +func compactVolumeOnce(t *testing.T, grpcClient volume_server_pb.VolumeServerClient, volumeID uint32) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + compactStream, err := grpcClient.VacuumVolumeCompact(ctx, &volume_server_pb.VacuumVolumeCompactRequest{ + VolumeId: volumeID, + }) + require.NoError(t, err) + + for { + _, err = compactStream.Recv() + if err == io.EOF { + break + } + require.NoError(t, err) + } + + _, err = grpcClient.VacuumVolumeCommit(ctx, &volume_server_pb.VacuumVolumeCommitRequest{ + VolumeId: volumeID, + }) + require.NoError(t, err) + + _, err = grpcClient.VacuumVolumeCleanup(ctx, &volume_server_pb.VacuumVolumeCleanupRequest{ + VolumeId: volumeID, + }) + require.NoError(t, err) +}