From 453310b057db9ddf7614af25786401ec99b93b54 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 25 Feb 2026 22:11:41 -0800 Subject: [PATCH] Add plugin worker integration tests for erasure coding (#8450) * test: add plugin worker integration harness * test: add erasure coding detection integration tests * test: add erasure coding execution integration tests * ci: add plugin worker integration workflow * test: extend fake volume server for vacuum and balance * test: expand erasure coding detection topologies * test: add large erasure coding detection topology * test: add vacuum plugin worker integration tests * test: add volume balance plugin worker integration tests * ci: run plugin worker tests per worker * fixes * erasure coding: stop after placement failures * erasure coding: record hasMore when early stopping * erasure coding: relax large topology expectations --- .github/workflows/plugin-workers.yml | 39 ++ .../erasure_coding/detection_test.go | 285 +++++++++++++++ .../erasure_coding/execution_test.go | 83 +++++ .../erasure_coding/large_topology_test.go | 123 +++++++ test/plugin_workers/fake_master.go | 90 +++++ test/plugin_workers/fake_volume_server.go | 335 ++++++++++++++++++ test/plugin_workers/framework.go | 171 +++++++++ test/plugin_workers/vacuum/detection_test.go | 104 ++++++ test/plugin_workers/vacuum/execution_test.go | 62 ++++ .../volume_balance/detection_test.go | 129 +++++++ .../volume_balance/execution_test.go | 67 ++++ test/plugin_workers/volume_fixtures.go | 49 +++ weed/worker/tasks/erasure_coding/detection.go | 14 + 13 files changed, 1551 insertions(+) create mode 100644 .github/workflows/plugin-workers.yml create mode 100644 test/plugin_workers/erasure_coding/detection_test.go create mode 100644 test/plugin_workers/erasure_coding/execution_test.go create mode 100644 test/plugin_workers/erasure_coding/large_topology_test.go create mode 100644 test/plugin_workers/fake_master.go create mode 100644 test/plugin_workers/fake_volume_server.go create mode 100644 test/plugin_workers/framework.go create mode 100644 test/plugin_workers/vacuum/detection_test.go create mode 100644 test/plugin_workers/vacuum/execution_test.go create mode 100644 test/plugin_workers/volume_balance/detection_test.go create mode 100644 test/plugin_workers/volume_balance/execution_test.go create mode 100644 test/plugin_workers/volume_fixtures.go diff --git a/.github/workflows/plugin-workers.yml b/.github/workflows/plugin-workers.yml new file mode 100644 index 000000000..61fa0fe45 --- /dev/null +++ b/.github/workflows/plugin-workers.yml @@ -0,0 +1,39 @@ +name: "Plugin Worker Integration Tests" + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +permissions: + contents: read + +jobs: + plugin-worker: + name: "Plugin Worker: ${{ matrix.worker }}" + runs-on: ubuntu-22.04 + timeout-minutes: 10 + strategy: + fail-fast: false + matrix: + include: + - worker: erasure_coding + path: test/plugin_workers/erasure_coding + - worker: vacuum + path: test/plugin_workers/vacuum + - worker: volume_balance + path: test/plugin_workers/volume_balance + + steps: + - name: Set up Go 1.x + uses: actions/setup-go@v6 + with: + go-version: ^1.26 + id: go + + - name: Check out code into the Go module directory + uses: actions/checkout@v6 + + - name: Run plugin worker tests + run: go test -v ./${{ matrix.path }} diff --git a/test/plugin_workers/erasure_coding/detection_test.go b/test/plugin_workers/erasure_coding/detection_test.go new file mode 100644 index 000000000..3008e85bf --- /dev/null +++ b/test/plugin_workers/erasure_coding/detection_test.go @@ -0,0 +1,285 @@ +package erasure_coding_test + +import ( + "context" + "fmt" + "testing" + "time" + + pluginworkers "github.com/seaweedfs/seaweedfs/test/plugin_workers" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" + ecstorage "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/proto" +) + +type topologySpec struct { + name string + dataCenters int + racksPerDC int + nodesPerRack int + diskTypes []string + replicas int + collection string +} + +type detectionCase struct { + name string + topology topologySpec + adminCollectionFilter string + expectProposals bool +} + +func TestErasureCodingDetectionAcrossTopologies(t *testing.T) { + cases := []detectionCase{ + { + name: "single-dc-multi-rack", + topology: topologySpec{ + name: "single-dc-multi-rack", + dataCenters: 1, + racksPerDC: 2, + nodesPerRack: 7, + diskTypes: []string{"hdd"}, + replicas: 1, + collection: "ec-test", + }, + expectProposals: true, + }, + { + name: "multi-dc", + topology: topologySpec{ + name: "multi-dc", + dataCenters: 2, + racksPerDC: 1, + nodesPerRack: 7, + diskTypes: []string{"hdd"}, + replicas: 1, + collection: "ec-test", + }, + expectProposals: true, + }, + { + name: "multi-dc-multi-rack", + topology: topologySpec{ + name: "multi-dc-multi-rack", + dataCenters: 2, + racksPerDC: 2, + nodesPerRack: 4, + diskTypes: []string{"hdd"}, + replicas: 1, + collection: "ec-test", + }, + expectProposals: true, + }, + { + name: "mixed-disk-types", + topology: topologySpec{ + name: "mixed-disk-types", + dataCenters: 1, + racksPerDC: 2, + nodesPerRack: 7, + diskTypes: []string{"hdd", "ssd"}, + replicas: 1, + collection: "ec-test", + }, + expectProposals: true, + }, + { + name: "multi-replica-volume", + topology: topologySpec{ + name: "multi-replica-volume", + dataCenters: 1, + racksPerDC: 2, + nodesPerRack: 7, + diskTypes: []string{"hdd"}, + replicas: 3, + collection: "ec-test", + }, + expectProposals: true, + }, + { + name: "collection-filter-match", + topology: topologySpec{ + name: "collection-filter-match", + dataCenters: 1, + racksPerDC: 2, + nodesPerRack: 7, + diskTypes: []string{"hdd"}, + replicas: 1, + collection: "filtered", + }, + adminCollectionFilter: "filtered", + expectProposals: true, + }, + { + name: "collection-filter-mismatch", + topology: topologySpec{ + name: "collection-filter-mismatch", + dataCenters: 1, + racksPerDC: 2, + nodesPerRack: 7, + diskTypes: []string{"hdd"}, + replicas: 1, + collection: "filtered", + }, + adminCollectionFilter: "other", + expectProposals: false, + }, + { + name: "insufficient-disks", + topology: topologySpec{ + name: "insufficient-disks", + dataCenters: 1, + racksPerDC: 1, + nodesPerRack: 2, + diskTypes: []string{"hdd"}, + replicas: 1, + collection: "ec-test", + }, + expectProposals: false, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + volumeID := uint32(7) + response := buildVolumeListResponse(t, tc.topology, volumeID) + master := pluginworkers.NewMasterServer(t, response) + + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + handler := pluginworker.NewErasureCodingHandler(dialOption, t.TempDir()) + harness := pluginworkers.NewHarness(t, pluginworkers.HarnessConfig{ + WorkerOptions: pluginworker.WorkerOptions{ + GrpcDialOption: dialOption, + }, + Handlers: []pluginworker.JobHandler{handler}, + }) + harness.WaitForJobType("erasure_coding") + + if tc.adminCollectionFilter != "" { + err := harness.Plugin().SaveJobTypeConfig(&plugin_pb.PersistedJobTypeConfig{ + JobType: "erasure_coding", + AdminConfigValues: map[string]*plugin_pb.ConfigValue{ + "collection_filter": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: tc.adminCollectionFilter}, + }, + }, + }) + require.NoError(t, err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + proposals, err := harness.Plugin().RunDetection(ctx, "erasure_coding", &plugin_pb.ClusterContext{ + MasterGrpcAddresses: []string{master.Address()}, + }, 10) + require.NoError(t, err) + + if !tc.expectProposals { + require.Empty(t, proposals) + return + } + + require.NotEmpty(t, proposals) + + proposal := proposals[0] + require.Equal(t, "erasure_coding", proposal.JobType) + paramsValue := proposal.Parameters["task_params_pb"] + require.NotNil(t, paramsValue) + + params := &worker_pb.TaskParams{} + require.NoError(t, proto.Unmarshal(paramsValue.GetBytesValue(), params)) + require.NotEmpty(t, params.Sources) + require.Len(t, params.Targets, ecstorage.TotalShardsCount) + }) + } +} + +func buildVolumeListResponse(t *testing.T, spec topologySpec, volumeID uint32) *master_pb.VolumeListResponse { + t.Helper() + + volumeSizeLimitMB := uint64(100) + volumeSize := uint64(90) * 1024 * 1024 + volumeModifiedAt := time.Now().Add(-10 * time.Minute).Unix() + + diskTypes := spec.diskTypes + if len(diskTypes) == 0 { + diskTypes = []string{"hdd"} + } + replicas := spec.replicas + if replicas <= 0 { + replicas = 1 + } + collection := spec.collection + if collection == "" { + collection = "ec-test" + } + + var dataCenters []*master_pb.DataCenterInfo + nodeIndex := 0 + replicasPlaced := 0 + + for dc := 0; dc < spec.dataCenters; dc++ { + var racks []*master_pb.RackInfo + for rack := 0; rack < spec.racksPerDC; rack++ { + var nodes []*master_pb.DataNodeInfo + for n := 0; n < spec.nodesPerRack; n++ { + nodeIndex++ + address := fmt.Sprintf("127.0.0.1:%d", 20000+nodeIndex) + diskType := diskTypes[(nodeIndex-1)%len(diskTypes)] + + diskInfo := &master_pb.DiskInfo{ + DiskId: 0, + MaxVolumeCount: 100, + VolumeCount: 0, + VolumeInfos: []*master_pb.VolumeInformationMessage{}, + } + + if replicasPlaced < replicas { + diskInfo.VolumeCount = 1 + diskInfo.VolumeInfos = append(diskInfo.VolumeInfos, &master_pb.VolumeInformationMessage{ + Id: volumeID, + Collection: collection, + DiskId: 0, + Size: volumeSize, + DeletedByteCount: 0, + ModifiedAtSecond: volumeModifiedAt, + ReplicaPlacement: 1, + ReadOnly: false, + }) + replicasPlaced++ + } + + nodes = append(nodes, &master_pb.DataNodeInfo{ + Id: address, + Address: address, + DiskInfos: map[string]*master_pb.DiskInfo{diskType: diskInfo}, + }) + } + + racks = append(racks, &master_pb.RackInfo{ + Id: fmt.Sprintf("rack-%d", rack+1), + DataNodeInfos: nodes, + }) + } + + dataCenters = append(dataCenters, &master_pb.DataCenterInfo{ + Id: fmt.Sprintf("dc-%d", dc+1), + RackInfos: racks, + }) + } + + return &master_pb.VolumeListResponse{ + VolumeSizeLimitMb: volumeSizeLimitMB, + TopologyInfo: &master_pb.TopologyInfo{ + DataCenterInfos: dataCenters, + }, + } +} diff --git a/test/plugin_workers/erasure_coding/execution_test.go b/test/plugin_workers/erasure_coding/execution_test.go new file mode 100644 index 000000000..897950602 --- /dev/null +++ b/test/plugin_workers/erasure_coding/execution_test.go @@ -0,0 +1,83 @@ +package erasure_coding_test + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + pluginworkers "github.com/seaweedfs/seaweedfs/test/plugin_workers" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" + ecstorage "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestErasureCodingExecutionEncodesShards(t *testing.T) { + volumeID := uint32(123) + datSize := 1 * 1024 * 1024 + + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + handler := pluginworker.NewErasureCodingHandler(dialOption, t.TempDir()) + harness := pluginworkers.NewHarness(t, pluginworkers.HarnessConfig{ + WorkerOptions: pluginworker.WorkerOptions{ + GrpcDialOption: dialOption, + }, + Handlers: []pluginworker.JobHandler{handler}, + }) + harness.WaitForJobType("erasure_coding") + + sourceServer := pluginworkers.NewVolumeServer(t, "") + pluginworkers.WriteTestVolumeFiles(t, sourceServer.BaseDir(), volumeID, datSize) + + targetServers := make([]*pluginworkers.VolumeServer, 0, ecstorage.TotalShardsCount) + targetAddresses := make([]string, 0, ecstorage.TotalShardsCount) + for i := 0; i < ecstorage.TotalShardsCount; i++ { + target := pluginworkers.NewVolumeServer(t, "") + targetServers = append(targetServers, target) + targetAddresses = append(targetAddresses, target.Address()) + } + + job := &plugin_pb.JobSpec{ + JobId: fmt.Sprintf("ec-job-%d", volumeID), + JobType: "erasure_coding", + Parameters: map[string]*plugin_pb.ConfigValue{ + "volume_id": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(volumeID)}, + }, + "collection": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "ec-test"}, + }, + "source_server": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceServer.Address()}, + }, + "target_servers": { + Kind: &plugin_pb.ConfigValue_StringList{StringList: &plugin_pb.StringList{Values: targetAddresses}}, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + result, err := harness.Plugin().ExecuteJob(ctx, job, nil, 1) + require.NoError(t, err) + require.NotNil(t, result) + require.True(t, result.Success) + + require.GreaterOrEqual(t, sourceServer.MarkReadonlyCount(), 1) + require.GreaterOrEqual(t, len(sourceServer.DeleteRequests()), 1) + + for shardID := 0; shardID < ecstorage.TotalShardsCount; shardID++ { + targetIndex := shardID % len(targetServers) + target := targetServers[targetIndex] + expected := filepath.Join(target.BaseDir(), fmt.Sprintf("%d.ec%02d", volumeID, shardID)) + info, err := os.Stat(expected) + require.NoErrorf(t, err, "missing shard file %s", expected) + require.Greater(t, info.Size(), int64(0)) + } +} diff --git a/test/plugin_workers/erasure_coding/large_topology_test.go b/test/plugin_workers/erasure_coding/large_topology_test.go new file mode 100644 index 000000000..02334cc5c --- /dev/null +++ b/test/plugin_workers/erasure_coding/large_topology_test.go @@ -0,0 +1,123 @@ +package erasure_coding_test + +import ( + "context" + "fmt" + "testing" + "time" + + pluginworkers "github.com/seaweedfs/seaweedfs/test/plugin_workers" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestErasureCodingDetectionLargeTopology(t *testing.T) { + const ( + rackCount = 100 + serverCount = 1000 + volumesPerNode = 300 + volumeSizeLimit = uint64(100) + ) + + if serverCount%rackCount != 0 { + t.Fatalf("serverCount (%d) must be divisible by rackCount (%d)", serverCount, rackCount) + } + + nodesPerRack := serverCount / rackCount + eligibleSize := uint64(90) * 1024 * 1024 + ineligibleSize := uint64(10) * 1024 * 1024 + modifiedAt := time.Now().Add(-10 * time.Minute).Unix() + + volumeID := uint32(1) + dataCenters := make([]*master_pb.DataCenterInfo, 0, 1) + + racks := make([]*master_pb.RackInfo, 0, rackCount) + for rack := 0; rack < rackCount; rack++ { + nodes := make([]*master_pb.DataNodeInfo, 0, nodesPerRack) + for node := 0; node < nodesPerRack; node++ { + address := fmt.Sprintf("10.0.%d.%d:8080", rack, node+1) + volumes := make([]*master_pb.VolumeInformationMessage, 0, volumesPerNode) + for v := 0; v < volumesPerNode; v++ { + size := ineligibleSize + if volumeID%2 == 0 { + size = eligibleSize + } + volumes = append(volumes, &master_pb.VolumeInformationMessage{ + Id: volumeID, + Collection: "ec-bulk", + DiskId: 0, + Size: size, + DeletedByteCount: 0, + ModifiedAtSecond: modifiedAt, + ReplicaPlacement: 1, + ReadOnly: false, + }) + volumeID++ + } + + diskInfo := &master_pb.DiskInfo{ + DiskId: 0, + MaxVolumeCount: int64(volumesPerNode + 10), + VolumeCount: int64(volumesPerNode), + VolumeInfos: volumes, + } + + nodes = append(nodes, &master_pb.DataNodeInfo{ + Id: address, + Address: address, + DiskInfos: map[string]*master_pb.DiskInfo{"hdd": diskInfo}, + }) + } + + racks = append(racks, &master_pb.RackInfo{ + Id: fmt.Sprintf("rack-%d", rack+1), + DataNodeInfos: nodes, + }) + } + + dataCenters = append(dataCenters, &master_pb.DataCenterInfo{ + Id: "dc-1", + RackInfos: racks, + }) + + response := &master_pb.VolumeListResponse{ + VolumeSizeLimitMb: volumeSizeLimit, + TopologyInfo: &master_pb.TopologyInfo{ + DataCenterInfos: dataCenters, + }, + } + + master := pluginworkers.NewMasterServer(t, response) + + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + handler := pluginworker.NewErasureCodingHandler(dialOption, t.TempDir()) + harness := pluginworkers.NewHarness(t, pluginworkers.HarnessConfig{ + WorkerOptions: pluginworker.WorkerOptions{ + GrpcDialOption: dialOption, + }, + Handlers: []pluginworker.JobHandler{handler}, + }) + harness.WaitForJobType("erasure_coding") + + totalVolumes := serverCount * volumesPerNode + expectedEligible := totalVolumes / 2 + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + start := time.Now() + proposals, err := harness.Plugin().RunDetection(ctx, "erasure_coding", &plugin_pb.ClusterContext{ + MasterGrpcAddresses: []string{master.Address()}, + }, 0) + duration := time.Since(start) + require.NoError(t, err) + require.GreaterOrEqual(t, len(proposals), 10, "should detect at least some proposals") + t.Logf("large topology detection completed in %s (proposals=%d, eligible=%d)", duration, len(proposals), expectedEligible) + if len(proposals) < expectedEligible { + t.Logf("large topology detection stopped early: %d proposals vs %d eligible", len(proposals), expectedEligible) + } +} diff --git a/test/plugin_workers/fake_master.go b/test/plugin_workers/fake_master.go new file mode 100644 index 000000000..e647e8c20 --- /dev/null +++ b/test/plugin_workers/fake_master.go @@ -0,0 +1,90 @@ +package pluginworkers + +import ( + "context" + "net" + "sync" + + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + "testing" +) + +// MasterServer provides a stub master gRPC service for topology responses. +type MasterServer struct { + master_pb.UnimplementedSeaweedServer + + t *testing.T + + server *grpc.Server + listener net.Listener + address string + + mu sync.RWMutex + response *master_pb.VolumeListResponse +} + +// NewMasterServer starts a stub master server that serves the provided response. +func NewMasterServer(t *testing.T, response *master_pb.VolumeListResponse) *MasterServer { + t.Helper() + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen master: %v", err) + } + + server := pb.NewGrpcServer() + ms := &MasterServer{ + t: t, + server: server, + listener: listener, + address: listener.Addr().String(), + response: response, + } + + master_pb.RegisterSeaweedServer(server, ms) + go func() { + _ = server.Serve(listener) + }() + + t.Cleanup(func() { + ms.Shutdown() + }) + + return ms +} + +// Address returns the gRPC address of the master server. +func (m *MasterServer) Address() string { + return m.address +} + +// SetVolumeListResponse updates the response served by VolumeList. +func (m *MasterServer) SetVolumeListResponse(response *master_pb.VolumeListResponse) { + m.mu.Lock() + defer m.mu.Unlock() + m.response = response +} + +// VolumeList returns the configured topology response. +func (m *MasterServer) VolumeList(ctx context.Context, req *master_pb.VolumeListRequest) (*master_pb.VolumeListResponse, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + if m.response == nil { + return &master_pb.VolumeListResponse{}, nil + } + return proto.Clone(m.response).(*master_pb.VolumeListResponse), nil +} + +// Shutdown stops the master gRPC server. +func (m *MasterServer) Shutdown() { + if m.server != nil { + m.server.GracefulStop() + } + if m.listener != nil { + _ = m.listener.Close() + } +} diff --git a/test/plugin_workers/fake_volume_server.go b/test/plugin_workers/fake_volume_server.go new file mode 100644 index 000000000..922953759 --- /dev/null +++ b/test/plugin_workers/fake_volume_server.go @@ -0,0 +1,335 @@ +package pluginworkers + +import ( + "context" + "fmt" + "io" + "net" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "google.golang.org/grpc" +) + +// VolumeServer provides a minimal volume server for erasure coding tests. +type VolumeServer struct { + volume_server_pb.UnimplementedVolumeServerServer + + t *testing.T + + server *grpc.Server + listener net.Listener + address string + baseDir string + + mu sync.Mutex + receivedFiles map[string]uint64 + mountRequests []*volume_server_pb.VolumeEcShardsMountRequest + deleteRequests []*volume_server_pb.VolumeDeleteRequest + markReadonlyCalls int + vacuumGarbageRatio float64 + vacuumCheckCalls int + vacuumCompactCalls int + vacuumCommitCalls int + vacuumCleanupCalls int + volumeCopyCalls int + volumeMountCalls int + tailReceiverCalls int +} + +// NewVolumeServer starts a test volume server using the provided base directory. +func NewVolumeServer(t *testing.T, baseDir string) *VolumeServer { + t.Helper() + + if baseDir == "" { + baseDir = t.TempDir() + } + if err := os.MkdirAll(baseDir, 0755); err != nil { + t.Fatalf("create volume base dir: %v", err) + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen volume server: %v", err) + } + + grpcPort := listener.Addr().(*net.TCPAddr).Port + server := pb.NewGrpcServer() + vs := &VolumeServer{ + t: t, + server: server, + listener: listener, + address: fmt.Sprintf("127.0.0.1:0.%d", grpcPort), + baseDir: baseDir, + receivedFiles: make(map[string]uint64), + } + + volume_server_pb.RegisterVolumeServerServer(server, vs) + go func() { + _ = server.Serve(listener) + }() + + t.Cleanup(func() { + vs.Shutdown() + }) + + return vs +} + +// Address returns the gRPC address of the volume server. +func (v *VolumeServer) Address() string { + return v.address +} + +// BaseDir returns the base directory used by the server. +func (v *VolumeServer) BaseDir() string { + return v.baseDir +} + +// ReceivedFiles returns a snapshot of received files and byte counts. +func (v *VolumeServer) ReceivedFiles() map[string]uint64 { + v.mu.Lock() + defer v.mu.Unlock() + + out := make(map[string]uint64, len(v.receivedFiles)) + for key, value := range v.receivedFiles { + out[key] = value + } + return out +} + +// SetVacuumGarbageRatio sets the garbage ratio returned by VacuumVolumeCheck. +func (v *VolumeServer) SetVacuumGarbageRatio(ratio float64) { + v.mu.Lock() + defer v.mu.Unlock() + v.vacuumGarbageRatio = ratio +} + +// VacuumStats returns the vacuum RPC call counts. +func (v *VolumeServer) VacuumStats() (check, compact, commit, cleanup int) { + v.mu.Lock() + defer v.mu.Unlock() + return v.vacuumCheckCalls, v.vacuumCompactCalls, v.vacuumCommitCalls, v.vacuumCleanupCalls +} + +// BalanceStats returns the balance RPC call counts. +func (v *VolumeServer) BalanceStats() (copyCalls, mountCalls, tailCalls int) { + v.mu.Lock() + defer v.mu.Unlock() + return v.volumeCopyCalls, v.volumeMountCalls, v.tailReceiverCalls +} + +// MountRequests returns recorded mount requests. +func (v *VolumeServer) MountRequests() []*volume_server_pb.VolumeEcShardsMountRequest { + v.mu.Lock() + defer v.mu.Unlock() + + out := make([]*volume_server_pb.VolumeEcShardsMountRequest, len(v.mountRequests)) + copy(out, v.mountRequests) + return out +} + +// DeleteRequests returns recorded delete requests. +func (v *VolumeServer) DeleteRequests() []*volume_server_pb.VolumeDeleteRequest { + v.mu.Lock() + defer v.mu.Unlock() + + out := make([]*volume_server_pb.VolumeDeleteRequest, len(v.deleteRequests)) + copy(out, v.deleteRequests) + return out +} + +// MarkReadonlyCount returns the number of readonly calls. +func (v *VolumeServer) MarkReadonlyCount() int { + v.mu.Lock() + defer v.mu.Unlock() + return v.markReadonlyCalls +} + +// Shutdown stops the volume server. +func (v *VolumeServer) Shutdown() { + if v.server != nil { + v.server.GracefulStop() + } + if v.listener != nil { + _ = v.listener.Close() + } +} + +func (v *VolumeServer) filePath(volumeID uint32, ext string) string { + return filepath.Join(v.baseDir, fmt.Sprintf("%d%s", volumeID, ext)) +} + +func (v *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error { + if req == nil { + return fmt.Errorf("copy file request is nil") + } + path := v.filePath(req.VolumeId, req.Ext) + file, err := os.Open(path) + if err != nil { + if req.IgnoreSourceFileNotFound { + return nil + } + return err + } + defer file.Close() + + buf := make([]byte, 64*1024) + for { + n, readErr := file.Read(buf) + if n > 0 { + if err := stream.Send(&volume_server_pb.CopyFileResponse{FileContent: buf[:n]}); err != nil { + return err + } + } + if readErr == io.EOF { + break + } + if readErr != nil { + return readErr + } + } + return nil +} + +func (v *VolumeServer) ReceiveFile(stream volume_server_pb.VolumeServer_ReceiveFileServer) error { + var ( + info *volume_server_pb.ReceiveFileInfo + file *os.File + bytesWritten uint64 + filePath string + ) + defer func() { + if file != nil { + _ = file.Close() + } + }() + + for { + req, err := stream.Recv() + if err == io.EOF { + if info == nil { + return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{Error: "missing file info"}) + } + v.mu.Lock() + v.receivedFiles[filePath] = bytesWritten + v.mu.Unlock() + return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{BytesWritten: bytesWritten}) + } + if err != nil { + return err + } + + if reqInfo := req.GetInfo(); reqInfo != nil { + info = reqInfo + filePath = v.filePath(info.VolumeId, info.Ext) + if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil { + return err + } + file, err = os.Create(filePath) + if err != nil { + return err + } + continue + } + + chunk := req.GetFileContent() + if len(chunk) == 0 { + continue + } + if file == nil { + return fmt.Errorf("file info not received") + } + n, writeErr := file.Write(chunk) + if writeErr != nil { + return writeErr + } + bytesWritten += uint64(n) + } +} + +func (v *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) { + v.mu.Lock() + v.mountRequests = append(v.mountRequests, req) + v.mu.Unlock() + return &volume_server_pb.VolumeEcShardsMountResponse{}, nil +} + +func (v *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.VolumeDeleteRequest) (*volume_server_pb.VolumeDeleteResponse, error) { + v.mu.Lock() + v.deleteRequests = append(v.deleteRequests, req) + v.mu.Unlock() + + if req != nil { + _ = os.Remove(v.filePath(req.VolumeId, ".dat")) + _ = os.Remove(v.filePath(req.VolumeId, ".idx")) + } + + return &volume_server_pb.VolumeDeleteResponse{}, nil +} + +func (v *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) { + v.mu.Lock() + v.markReadonlyCalls++ + v.mu.Unlock() + return &volume_server_pb.VolumeMarkReadonlyResponse{}, nil +} + +func (v *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) { + v.mu.Lock() + v.vacuumCheckCalls++ + ratio := v.vacuumGarbageRatio + v.mu.Unlock() + return &volume_server_pb.VacuumVolumeCheckResponse{GarbageRatio: ratio}, nil +} + +func (v *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error { + v.mu.Lock() + v.vacuumCompactCalls++ + v.mu.Unlock() + return stream.Send(&volume_server_pb.VacuumVolumeCompactResponse{ProcessedBytes: 1024}) +} + +func (v *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) { + v.mu.Lock() + v.vacuumCommitCalls++ + v.mu.Unlock() + return &volume_server_pb.VacuumVolumeCommitResponse{}, nil +} + +func (v *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_server_pb.VacuumVolumeCleanupRequest) (*volume_server_pb.VacuumVolumeCleanupResponse, error) { + v.mu.Lock() + v.vacuumCleanupCalls++ + v.mu.Unlock() + return &volume_server_pb.VacuumVolumeCleanupResponse{}, nil +} + +func (v *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stream volume_server_pb.VolumeServer_VolumeCopyServer) error { + v.mu.Lock() + v.volumeCopyCalls++ + v.mu.Unlock() + + if err := stream.Send(&volume_server_pb.VolumeCopyResponse{ProcessedBytes: 1024}); err != nil { + return err + } + return stream.Send(&volume_server_pb.VolumeCopyResponse{LastAppendAtNs: uint64(time.Now().UnixNano())}) +} + +func (v *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.VolumeMountRequest) (*volume_server_pb.VolumeMountResponse, error) { + v.mu.Lock() + v.volumeMountCalls++ + v.mu.Unlock() + return &volume_server_pb.VolumeMountResponse{}, nil +} + +func (v *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_server_pb.VolumeTailReceiverRequest) (*volume_server_pb.VolumeTailReceiverResponse, error) { + v.mu.Lock() + v.tailReceiverCalls++ + v.mu.Unlock() + return &volume_server_pb.VolumeTailReceiverResponse{}, nil +} diff --git a/test/plugin_workers/framework.go b/test/plugin_workers/framework.go new file mode 100644 index 000000000..5b966e613 --- /dev/null +++ b/test/plugin_workers/framework.go @@ -0,0 +1,171 @@ +package pluginworkers + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/admin/plugin" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// HarnessConfig configures the shared plugin worker test harness. +type HarnessConfig struct { + PluginOptions plugin.Options + WorkerOptions pluginworker.WorkerOptions + Handlers []pluginworker.JobHandler +} + +// Harness manages an in-process plugin admin server and worker. +type Harness struct { + t *testing.T + + pluginSvc *plugin.Plugin + + adminServer *grpc.Server + adminListener net.Listener + adminGrpcAddr string + + worker *pluginworker.Worker + workerCtx context.Context + workerCancel context.CancelFunc + workerDone chan struct{} +} + +// NewHarness starts a plugin admin gRPC server and a worker connected to it. +func NewHarness(t *testing.T, cfg HarnessConfig) *Harness { + t.Helper() + + pluginOpts := cfg.PluginOptions + if pluginOpts.DataDir == "" { + pluginOpts.DataDir = t.TempDir() + } + + pluginSvc, err := plugin.New(pluginOpts) + require.NoError(t, err) + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + adminServer := pb.NewGrpcServer() + plugin_pb.RegisterPluginControlServiceServer(adminServer, pluginSvc) + go func() { + _ = adminServer.Serve(listener) + }() + + adminGrpcAddr := listener.Addr().String() + adminPort := listener.Addr().(*net.TCPAddr).Port + adminAddr := fmt.Sprintf("127.0.0.1:0.%d", adminPort) + + workerOpts := cfg.WorkerOptions + if workerOpts.AdminServer == "" { + workerOpts.AdminServer = adminAddr + } + if workerOpts.GrpcDialOption == nil { + workerOpts.GrpcDialOption = grpc.WithTransportCredentials(insecure.NewCredentials()) + } + if workerOpts.WorkerID == "" { + workerOpts.WorkerID = "plugin-worker-test" + } + if workerOpts.WorkerVersion == "" { + workerOpts.WorkerVersion = "test" + } + if workerOpts.WorkerAddress == "" { + workerOpts.WorkerAddress = "127.0.0.1" + } + if len(cfg.Handlers) > 0 { + workerOpts.Handlers = cfg.Handlers + } + + worker, err := pluginworker.NewWorker(workerOpts) + require.NoError(t, err) + + workerCtx, workerCancel := context.WithCancel(context.Background()) + workerDone := make(chan struct{}) + go func() { + defer close(workerDone) + _ = worker.Run(workerCtx) + }() + + harness := &Harness{ + t: t, + pluginSvc: pluginSvc, + adminServer: adminServer, + adminListener: listener, + adminGrpcAddr: adminGrpcAddr, + worker: worker, + workerCtx: workerCtx, + workerCancel: workerCancel, + workerDone: workerDone, + } + + require.Eventually(t, func() bool { + return len(pluginSvc.ListWorkers()) > 0 + }, 5*time.Second, 50*time.Millisecond) + + t.Cleanup(func() { + harness.Shutdown() + }) + + return harness +} + +// Plugin exposes the underlying admin plugin service. +func (h *Harness) Plugin() *plugin.Plugin { + return h.pluginSvc +} + +// AdminGrpcAddress returns the gRPC address for the admin server. +func (h *Harness) AdminGrpcAddress() string { + return h.adminGrpcAddr +} + +// WaitForJobType waits until a worker with the given capability is registered. +func (h *Harness) WaitForJobType(jobType string) { + h.t.Helper() + require.Eventually(h.t, func() bool { + workers := h.pluginSvc.ListWorkers() + for _, worker := range workers { + if worker == nil || worker.Capabilities == nil { + continue + } + if _, ok := worker.Capabilities[jobType]; ok { + return true + } + } + return false + }, 5*time.Second, 50*time.Millisecond) +} + +// Shutdown stops the worker and admin server. +func (h *Harness) Shutdown() { + if h.workerCancel != nil { + h.workerCancel() + } + + if h.workerDone != nil { + select { + case <-h.workerDone: + case <-time.After(2 * time.Second): + } + } + + if h.adminServer != nil { + h.adminServer.GracefulStop() + } + + if h.adminListener != nil { + _ = h.adminListener.Close() + } + + if h.pluginSvc != nil { + h.pluginSvc.Shutdown() + } +} diff --git a/test/plugin_workers/vacuum/detection_test.go b/test/plugin_workers/vacuum/detection_test.go new file mode 100644 index 000000000..cbcf27502 --- /dev/null +++ b/test/plugin_workers/vacuum/detection_test.go @@ -0,0 +1,104 @@ +package vacuum_test + +import ( + "context" + "testing" + "time" + + pluginworkers "github.com/seaweedfs/seaweedfs/test/plugin_workers" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/proto" +) + +func TestVacuumDetectionIntegration(t *testing.T) { + volumeID := uint32(101) + source := pluginworkers.NewVolumeServer(t, "") + + response := buildVacuumVolumeListResponse(t, source.Address(), volumeID, 0.6) + master := pluginworkers.NewMasterServer(t, response) + + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + handler := pluginworker.NewVacuumHandler(dialOption, 1) + harness := pluginworkers.NewHarness(t, pluginworkers.HarnessConfig{ + WorkerOptions: pluginworker.WorkerOptions{ + GrpcDialOption: dialOption, + }, + Handlers: []pluginworker.JobHandler{handler}, + }) + harness.WaitForJobType("vacuum") + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + proposals, err := harness.Plugin().RunDetection(ctx, "vacuum", &plugin_pb.ClusterContext{ + MasterGrpcAddresses: []string{master.Address()}, + }, 10) + require.NoError(t, err) + require.NotEmpty(t, proposals) + + proposal := proposals[0] + require.Equal(t, "vacuum", proposal.JobType) + paramsValue := proposal.Parameters["task_params_pb"] + require.NotNil(t, paramsValue) + + params := &worker_pb.TaskParams{} + require.NoError(t, proto.Unmarshal(paramsValue.GetBytesValue(), params)) + require.NotEmpty(t, params.Sources) + require.NotNil(t, params.GetVacuumParams()) +} + +func buildVacuumVolumeListResponse(t *testing.T, serverAddress string, volumeID uint32, garbageRatio float64) *master_pb.VolumeListResponse { + t.Helper() + + volumeSizeLimitMB := uint64(100) + volumeSize := uint64(90) * 1024 * 1024 + deletedBytes := uint64(float64(volumeSize) * garbageRatio) + volumeModifiedAt := time.Now().Add(-48 * time.Hour).Unix() + + diskInfo := &master_pb.DiskInfo{ + DiskId: 0, + MaxVolumeCount: 100, + VolumeCount: 1, + VolumeInfos: []*master_pb.VolumeInformationMessage{ + { + Id: volumeID, + Collection: "vac-test", + DiskId: 0, + Size: volumeSize, + DeletedByteCount: deletedBytes, + ModifiedAtSecond: volumeModifiedAt, + ReplicaPlacement: 1, + ReadOnly: false, + }, + }, + } + + node := &master_pb.DataNodeInfo{ + Id: serverAddress, + Address: serverAddress, + DiskInfos: map[string]*master_pb.DiskInfo{"hdd": diskInfo}, + } + + return &master_pb.VolumeListResponse{ + VolumeSizeLimitMb: volumeSizeLimitMB, + TopologyInfo: &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc-1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack-1", + DataNodeInfos: []*master_pb.DataNodeInfo{node}, + }, + }, + }, + }, + }, + } +} diff --git a/test/plugin_workers/vacuum/execution_test.go b/test/plugin_workers/vacuum/execution_test.go new file mode 100644 index 000000000..7d749e5a4 --- /dev/null +++ b/test/plugin_workers/vacuum/execution_test.go @@ -0,0 +1,62 @@ +package vacuum_test + +import ( + "context" + "fmt" + "testing" + "time" + + pluginworkers "github.com/seaweedfs/seaweedfs/test/plugin_workers" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestVacuumExecutionIntegration(t *testing.T) { + volumeID := uint32(202) + + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + handler := pluginworker.NewVacuumHandler(dialOption, 1) + harness := pluginworkers.NewHarness(t, pluginworkers.HarnessConfig{ + WorkerOptions: pluginworker.WorkerOptions{ + GrpcDialOption: dialOption, + }, + Handlers: []pluginworker.JobHandler{handler}, + }) + harness.WaitForJobType("vacuum") + + source := pluginworkers.NewVolumeServer(t, "") + source.SetVacuumGarbageRatio(0.6) + + job := &plugin_pb.JobSpec{ + JobId: fmt.Sprintf("vacuum-job-%d", volumeID), + JobType: "vacuum", + Parameters: map[string]*plugin_pb.ConfigValue{ + "volume_id": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(volumeID)}, + }, + "server": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: source.Address()}, + }, + "collection": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "vac-test"}, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + result, err := harness.Plugin().ExecuteJob(ctx, job, nil, 1) + require.NoError(t, err) + require.NotNil(t, result) + require.True(t, result.Success) + + checkCalls, compactCalls, commitCalls, cleanupCalls := source.VacuumStats() + require.GreaterOrEqual(t, checkCalls, 2) + require.GreaterOrEqual(t, compactCalls, 1) + require.GreaterOrEqual(t, commitCalls, 1) + require.GreaterOrEqual(t, cleanupCalls, 1) +} diff --git a/test/plugin_workers/volume_balance/detection_test.go b/test/plugin_workers/volume_balance/detection_test.go new file mode 100644 index 000000000..069c6c4c9 --- /dev/null +++ b/test/plugin_workers/volume_balance/detection_test.go @@ -0,0 +1,129 @@ +package volume_balance_test + +import ( + "context" + "testing" + "time" + + pluginworkers "github.com/seaweedfs/seaweedfs/test/plugin_workers" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/proto" +) + +func TestVolumeBalanceDetectionIntegration(t *testing.T) { + response := buildBalanceVolumeListResponse(t) + master := pluginworkers.NewMasterServer(t, response) + + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + handler := pluginworker.NewVolumeBalanceHandler(dialOption) + harness := pluginworkers.NewHarness(t, pluginworkers.HarnessConfig{ + WorkerOptions: pluginworker.WorkerOptions{ + GrpcDialOption: dialOption, + }, + Handlers: []pluginworker.JobHandler{handler}, + }) + harness.WaitForJobType("volume_balance") + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + proposals, err := harness.Plugin().RunDetection(ctx, "volume_balance", &plugin_pb.ClusterContext{ + MasterGrpcAddresses: []string{master.Address()}, + }, 10) + require.NoError(t, err) + require.Len(t, proposals, 1) + + proposal := proposals[0] + require.Equal(t, "volume_balance", proposal.JobType) + paramsValue := proposal.Parameters["task_params_pb"] + require.NotNil(t, paramsValue) + + params := &worker_pb.TaskParams{} + require.NoError(t, proto.Unmarshal(paramsValue.GetBytesValue(), params)) + require.NotEmpty(t, params.Sources) + require.NotEmpty(t, params.Targets) +} + +func buildBalanceVolumeListResponse(t *testing.T) *master_pb.VolumeListResponse { + t.Helper() + + volumeSizeLimitMB := uint64(100) + volumeModifiedAt := time.Now().Add(-2 * time.Hour).Unix() + + overloadedVolumes := make([]*master_pb.VolumeInformationMessage, 0, 10) + for i := 0; i < 10; i++ { + volumeID := uint32(1000 + i) + overloadedVolumes = append(overloadedVolumes, &master_pb.VolumeInformationMessage{ + Id: volumeID, + Collection: "balance", + DiskId: 0, + Size: 20 * 1024 * 1024, + DeletedByteCount: 0, + ModifiedAtSecond: volumeModifiedAt, + ReplicaPlacement: 1, + ReadOnly: false, + }) + } + + underloadedVolumes := []*master_pb.VolumeInformationMessage{ + { + Id: 2000, + Collection: "balance", + DiskId: 0, + Size: 20 * 1024 * 1024, + DeletedByteCount: 0, + ModifiedAtSecond: volumeModifiedAt, + ReplicaPlacement: 1, + ReadOnly: false, + }, + } + + overloadedDisk := &master_pb.DiskInfo{ + DiskId: 0, + MaxVolumeCount: 100, + VolumeCount: int64(len(overloadedVolumes)), + VolumeInfos: overloadedVolumes, + } + + underloadedDisk := &master_pb.DiskInfo{ + DiskId: 0, + MaxVolumeCount: 100, + VolumeCount: int64(len(underloadedVolumes)), + VolumeInfos: underloadedVolumes, + } + + overloadedNode := &master_pb.DataNodeInfo{ + Id: "10.0.0.1:8080", + Address: "10.0.0.1:8080", + DiskInfos: map[string]*master_pb.DiskInfo{"hdd": overloadedDisk}, + } + + underloadedNode := &master_pb.DataNodeInfo{ + Id: "10.0.0.2:8080", + Address: "10.0.0.2:8080", + DiskInfos: map[string]*master_pb.DiskInfo{"hdd": underloadedDisk}, + } + + rack := &master_pb.RackInfo{ + Id: "rack-1", + DataNodeInfos: []*master_pb.DataNodeInfo{overloadedNode, underloadedNode}, + } + + return &master_pb.VolumeListResponse{ + VolumeSizeLimitMb: volumeSizeLimitMB, + TopologyInfo: &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc-1", + RackInfos: []*master_pb.RackInfo{rack}, + }, + }, + }, + } +} diff --git a/test/plugin_workers/volume_balance/execution_test.go b/test/plugin_workers/volume_balance/execution_test.go new file mode 100644 index 000000000..0d2091fc6 --- /dev/null +++ b/test/plugin_workers/volume_balance/execution_test.go @@ -0,0 +1,67 @@ +package volume_balance_test + +import ( + "context" + "fmt" + "testing" + "time" + + pluginworkers "github.com/seaweedfs/seaweedfs/test/plugin_workers" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestVolumeBalanceExecutionIntegration(t *testing.T) { + volumeID := uint32(303) + + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + handler := pluginworker.NewVolumeBalanceHandler(dialOption) + harness := pluginworkers.NewHarness(t, pluginworkers.HarnessConfig{ + WorkerOptions: pluginworker.WorkerOptions{ + GrpcDialOption: dialOption, + }, + Handlers: []pluginworker.JobHandler{handler}, + }) + harness.WaitForJobType("volume_balance") + + source := pluginworkers.NewVolumeServer(t, "") + target := pluginworkers.NewVolumeServer(t, "") + + job := &plugin_pb.JobSpec{ + JobId: fmt.Sprintf("balance-job-%d", volumeID), + JobType: "volume_balance", + Parameters: map[string]*plugin_pb.ConfigValue{ + "volume_id": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(volumeID)}, + }, + "collection": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "balance"}, + }, + "source_server": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: source.Address()}, + }, + "target_server": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: target.Address()}, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + result, err := harness.Plugin().ExecuteJob(ctx, job, nil, 1) + require.NoError(t, err) + require.NotNil(t, result) + require.True(t, result.Success) + + require.GreaterOrEqual(t, source.MarkReadonlyCount(), 1) + require.GreaterOrEqual(t, len(source.DeleteRequests()), 1) + + copyCalls, mountCalls, tailCalls := target.BalanceStats() + require.GreaterOrEqual(t, copyCalls, 1) + require.GreaterOrEqual(t, mountCalls, 1) + require.GreaterOrEqual(t, tailCalls, 1) +} diff --git a/test/plugin_workers/volume_fixtures.go b/test/plugin_workers/volume_fixtures.go new file mode 100644 index 000000000..64b5cfcfa --- /dev/null +++ b/test/plugin_workers/volume_fixtures.go @@ -0,0 +1,49 @@ +package pluginworkers + +import ( + "fmt" + "math/rand" + "os" + "path/filepath" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +// WriteTestVolumeFiles creates a minimal .dat/.idx pair for the given volume. +func WriteTestVolumeFiles(t *testing.T, baseDir string, volumeID uint32, datSize int) (string, string) { + t.Helper() + + if err := os.MkdirAll(baseDir, 0755); err != nil { + t.Fatalf("create volume dir: %v", err) + } + + datPath := filepath.Join(baseDir, volumeFilename(volumeID, ".dat")) + idxPath := filepath.Join(baseDir, volumeFilename(volumeID, ".idx")) + + data := make([]byte, datSize) + rng := rand.New(rand.NewSource(99)) + _, _ = rng.Read(data) + if err := os.WriteFile(datPath, data, 0644); err != nil { + t.Fatalf("write dat file: %v", err) + } + + entry := make([]byte, types.NeedleMapEntrySize) + idEnd := types.NeedleIdSize + offsetEnd := idEnd + types.OffsetSize + sizeEnd := offsetEnd + types.SizeSize + + types.NeedleIdToBytes(entry[:idEnd], types.NeedleId(1)) + types.OffsetToBytes(entry[idEnd:offsetEnd], types.ToOffset(0)) + types.SizeToBytes(entry[offsetEnd:sizeEnd], types.Size(datSize)) + + if err := os.WriteFile(idxPath, entry, 0644); err != nil { + t.Fatalf("write idx file: %v", err) + } + + return datPath, idxPath +} + +func volumeFilename(volumeID uint32, ext string) string { + return fmt.Sprintf("%d%s", volumeID, ext) +} diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index 7ea199403..a0e9784fe 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -17,6 +17,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/worker/types" ) +const ( + minProposalsBeforeEarlyStop = 10 + maxConsecutivePlanningFailures = 10 +) + // Detection implements the detection logic for erasure coding tasks. // It respects ctx cancellation and can stop early once maxResults is reached. func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig, maxResults int) ([]*types.TaskDetectionResult, bool, error) { @@ -42,6 +47,7 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste skippedCollectionFilter := 0 skippedQuietTime := 0 skippedFullness := 0 + consecutivePlanningFailures := 0 var planner *ecPlacementPlanner @@ -150,8 +156,16 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste multiPlan, err := planECDestinations(planner, metric, ecConfig) if err != nil { glog.Warningf("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err) + consecutivePlanningFailures++ + if len(results) >= minProposalsBeforeEarlyStop && consecutivePlanningFailures >= maxConsecutivePlanningFailures { + glog.Warningf("EC Detection: stopping early after %d consecutive placement failures with %d proposals already planned", consecutivePlanningFailures, len(results)) + hasMore = true + stoppedEarly = true + break + } continue // Skip this volume if destination planning fails } + consecutivePlanningFailures = 0 glog.Infof("EC Detection: Successfully planned %d destinations for volume %d", len(multiPlan.Plans), metric.VolumeID) // Calculate expected shard size for EC operation