diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index 0c60625bb..670dc5758 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -1,6 +1,7 @@ package balance import ( + "fmt" "testing" "github.com/seaweedfs/seaweedfs/weed/admin/topology" @@ -9,6 +10,142 @@ import ( "github.com/seaweedfs/seaweedfs/weed/worker/types" ) +// serverSpec describes a server for the topology builder. +type serverSpec struct { + id string // e.g. "node-1" + diskType string // e.g. "ssd", "hdd" + diskID uint32 + dc string + rack string + maxVolumes int64 +} + +// buildTopology constructs an ActiveTopology from server specs and volume metrics. +func buildTopology(servers []serverSpec, metrics []*types.VolumeHealthMetrics) *topology.ActiveTopology { + at := topology.NewActiveTopology(0) + + volumesByServer := make(map[string][]*master_pb.VolumeInformationMessage) + for _, m := range metrics { + volumesByServer[m.Server] = append(volumesByServer[m.Server], &master_pb.VolumeInformationMessage{ + Id: m.VolumeID, + Size: m.Size, + Collection: m.Collection, + Version: 1, + }) + } + + // Group servers by dc → rack for topology construction + type rackKey struct{ dc, rack string } + rackNodes := make(map[rackKey][]*master_pb.DataNodeInfo) + + for _, s := range servers { + maxVol := s.maxVolumes + if maxVol == 0 { + maxVol = 1000 + } + node := &master_pb.DataNodeInfo{ + Id: s.id, + Address: s.id + ":8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + s.diskType: { + Type: s.diskType, + DiskId: s.diskID, + VolumeInfos: volumesByServer[s.id], + VolumeCount: int64(len(volumesByServer[s.id])), + MaxVolumeCount: maxVol, + }, + }, + } + key := rackKey{s.dc, s.rack} + rackNodes[key] = append(rackNodes[key], node) + } + + // Build DC → Rack tree + dcRacks := make(map[string][]*master_pb.RackInfo) + for key, nodes := range rackNodes { + dcRacks[key.dc] = append(dcRacks[key.dc], &master_pb.RackInfo{ + Id: key.rack, + DataNodeInfos: nodes, + }) + } + + var dcInfos []*master_pb.DataCenterInfo + for dcID, racks := range dcRacks { + dcInfos = append(dcInfos, &master_pb.DataCenterInfo{ + Id: dcID, + RackInfos: racks, + }) + } + + at.UpdateTopology(&master_pb.TopologyInfo{DataCenterInfos: dcInfos}) + return at +} + +// makeVolumes generates n VolumeHealthMetrics for a server starting at volumeIDBase. +func makeVolumes(server, diskType, dc, rack, collection string, volumeIDBase uint32, n int) []*types.VolumeHealthMetrics { + out := make([]*types.VolumeHealthMetrics, n) + for i := range out { + out[i] = &types.VolumeHealthMetrics{ + VolumeID: volumeIDBase + uint32(i), + Server: server, + ServerAddress: server + ":8080", + DiskType: diskType, + Collection: collection, + Size: 1024, + DataCenter: dc, + Rack: rack, + } + } + return out +} + +func defaultConf() *Config { + return &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 30, + MaxConcurrent: 1, + }, + MinServerCount: 2, + ImbalanceThreshold: 0.2, + } +} + +// assertNoDuplicateVolumes verifies every task moves a distinct volume. +func assertNoDuplicateVolumes(t *testing.T, tasks []*types.TaskDetectionResult) { + t.Helper() + seen := make(map[uint32]bool) + for i, task := range tasks { + if seen[task.VolumeID] { + t.Errorf("duplicate volume %d in task %d", task.VolumeID, i) + } + seen[task.VolumeID] = true + } +} + +// computeEffectiveCounts returns per-server volume counts after applying all planned moves. +func computeEffectiveCounts(metrics []*types.VolumeHealthMetrics, tasks []*types.TaskDetectionResult) map[string]int { + counts := make(map[string]int) + for _, m := range metrics { + counts[m.Server]++ + } + for _, task := range tasks { + counts[task.Server]-- // source loses one + if task.TypedParams != nil && len(task.TypedParams.Targets) > 0 { + // find the target server ID (strip :8080 from address) + addr := task.TypedParams.Targets[0].Node + // resolve from metrics + for _, m := range metrics { + if m.ServerAddress == addr { + counts[m.Server]++ + break + } + } + } + } + return counts +} + func createMockTopology(volumes ...*types.VolumeHealthMetrics) *topology.ActiveTopology { at := topology.NewActiveTopology(0) @@ -314,3 +451,303 @@ func TestDetection_RespectsMaxResults(t *testing.T) { t.Errorf("Expected exactly 3 tasks (maxResults=3), got %d", len(tasks)) } } + +// --- Complicated scenario tests --- + +// TestDetection_ThreeServers_ConvergesToBalance verifies that with 3 servers +// (60/30/10 volumes) the algorithm moves volumes from the heaviest server first, +// then re-evaluates, potentially shifting from the second-heaviest too. +func TestDetection_ThreeServers_ConvergesToBalance(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"}, + } + + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 60)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 30)...) + metrics = append(metrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "c1", 200, 10)...) + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + if len(tasks) < 2 { + t.Fatalf("Expected multiple tasks for 60/30/10 imbalance, got %d", len(tasks)) + } + + assertNoDuplicateVolumes(t, tasks) + + // Verify convergence: effective counts should be within 20% imbalance. + effective := computeEffectiveCounts(metrics, tasks) + total := 0 + maxC, minC := 0, 100 + for _, c := range effective { + total += c + if c > maxC { + maxC = c + } + if c < minC { + minC = c + } + } + avg := float64(total) / float64(len(effective)) + imbalance := float64(maxC-minC) / avg + if imbalance > 0.2 { + t.Errorf("After %d moves, cluster still imbalanced: effective=%v, imbalance=%.1f%%", + len(tasks), effective, imbalance*100) + } + + // All sources should be from the overloaded nodes, never node-c + for i, task := range tasks { + src := task.TypedParams.Sources[0].Node + if src == "node-c:8080" { + t.Errorf("Task %d: should not move FROM the underloaded server node-c", i) + } + } +} + +// TestDetection_SkipsPreExistingPendingTasks verifies that volumes with +// already-registered pending tasks in ActiveTopology are skipped. +func TestDetection_SkipsPreExistingPendingTasks(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + } + + // node-a has 20, node-b has 5 + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 20)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 5)...) + + at := buildTopology(servers, metrics) + + // Pre-register pending tasks for the first 15 volumes on node-a. + // This simulates a previous detection run that already planned moves. + for i := 0; i < 15; i++ { + volID := uint32(1 + i) + err := at.AddPendingTask(topology.TaskSpec{ + TaskID: fmt.Sprintf("existing-%d", volID), + TaskType: topology.TaskTypeBalance, + VolumeID: volID, + VolumeSize: 1024, + Sources: []topology.TaskSourceSpec{{ServerID: "node-a", DiskID: 1}}, + Destinations: []topology.TaskDestinationSpec{{ServerID: "node-b", DiskID: 2}}, + }) + if err != nil { + t.Fatalf("AddPendingTask failed: %v", err) + } + } + + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + // None of the results should reference a volume with an existing task (IDs 1-15). + for i, task := range tasks { + if task.VolumeID >= 1 && task.VolumeID <= 15 { + t.Errorf("Task %d: volume %d already has a pending task, should have been skipped", + i, task.VolumeID) + } + } + + assertNoDuplicateVolumes(t, tasks) +} + +// TestDetection_NoDuplicateVolumesAcrossIterations verifies that the loop +// never selects the same volume twice, even under high maxResults. +func TestDetection_NoDuplicateVolumesAcrossIterations(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "ssd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "ssd", diskID: 2, dc: "dc1", rack: "rack1"}, + } + + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "ssd", "dc1", "rack1", "c1", 1, 50)...) + metrics = append(metrics, makeVolumes("node-b", "ssd", "dc1", "rack1", "c1", 100, 10)...) + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + tasks, err := Detection(metrics, clusterInfo, defaultConf(), 200) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + assertNoDuplicateVolumes(t, tasks) +} + +// TestDetection_ThreeServers_MaxServerShifts verifies that after enough moves +// from the top server, the algorithm detects a new max server and moves from it. +func TestDetection_ThreeServers_MaxServerShifts(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"}, + } + + // node-a: 40, node-b: 38, node-c: 10. avg ≈ 29.3 + // Initial imbalance = (40-10)/29.3 ≈ 1.02 → move from node-a. + // After a few moves from node-a, node-b becomes the new max and should be + // picked as the source. + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 40)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 38)...) + metrics = append(metrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "c1", 200, 10)...) + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + if len(tasks) < 3 { + t.Fatalf("Expected several tasks for 40/38/10 imbalance, got %d", len(tasks)) + } + + // Collect source servers + sourceServers := make(map[string]int) + for _, task := range tasks { + sourceServers[task.Server]++ + } + + // Both node-a and node-b should appear as sources (max server shifts) + if sourceServers["node-a"] == 0 { + t.Error("Expected node-a to be a source for some moves") + } + if sourceServers["node-b"] == 0 { + t.Error("Expected node-b to be a source after node-a is drained enough") + } + if sourceServers["node-c"] > 0 { + t.Error("node-c (underloaded) should never be a source") + } + + assertNoDuplicateVolumes(t, tasks) +} + +// TestDetection_FourServers_DestinationSpreading verifies that with 4 servers +// (1 heavy, 3 light) the algorithm spreads moves across multiple destinations. +func TestDetection_FourServers_DestinationSpreading(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "ssd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "ssd", diskID: 2, dc: "dc1", rack: "rack2"}, + {id: "node-c", diskType: "ssd", diskID: 3, dc: "dc1", rack: "rack3"}, + {id: "node-d", diskType: "ssd", diskID: 4, dc: "dc1", rack: "rack4"}, + } + + // node-a: 80, b/c/d: 5 each. avg=23.75 + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "ssd", "dc1", "rack1", "c1", 1, 80)...) + metrics = append(metrics, makeVolumes("node-b", "ssd", "dc1", "rack2", "c1", 100, 5)...) + metrics = append(metrics, makeVolumes("node-c", "ssd", "dc1", "rack3", "c1", 200, 5)...) + metrics = append(metrics, makeVolumes("node-d", "ssd", "dc1", "rack4", "c1", 300, 5)...) + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + if len(tasks) < 5 { + t.Fatalf("Expected many tasks, got %d", len(tasks)) + } + + // Count destination servers + destServers := make(map[string]int) + for _, task := range tasks { + if task.TypedParams != nil && len(task.TypedParams.Targets) > 0 { + destServers[task.TypedParams.Targets[0].Node]++ + } + } + + // With 3 eligible destinations (b, c, d) and pending-task-aware scoring, + // moves should go to more than just one destination. + if len(destServers) < 2 { + t.Errorf("Expected moves to spread across destinations, but only got: %v", destServers) + } + + assertNoDuplicateVolumes(t, tasks) +} + +// TestDetection_ConvergenceVerification verifies that after all planned moves, +// the effective volume distribution is within the configured threshold. +func TestDetection_ConvergenceVerification(t *testing.T) { + tests := []struct { + name string + counts []int // volumes per server + threshold float64 + }{ + {"2-server-big-gap", []int{100, 10}, 0.2}, + {"3-server-staircase", []int{90, 50, 10}, 0.2}, + {"4-server-one-hot", []int{200, 20, 20, 20}, 0.2}, + {"3-server-tight-threshold", []int{30, 20, 10}, 0.1}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var servers []serverSpec + var metrics []*types.VolumeHealthMetrics + volBase := uint32(1) + + for i, count := range tt.counts { + id := fmt.Sprintf("node-%d", i) + servers = append(servers, serverSpec{ + id: id, diskType: "hdd", diskID: uint32(i + 1), + dc: "dc1", rack: "rack1", + }) + metrics = append(metrics, makeVolumes(id, "hdd", "dc1", "rack1", "c1", volBase, count)...) + volBase += uint32(count) + } + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + conf := defaultConf() + conf.ImbalanceThreshold = tt.threshold + + tasks, err := Detection(metrics, clusterInfo, conf, 500) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + if len(tasks) == 0 { + t.Fatal("Expected balance tasks, got 0") + } + + assertNoDuplicateVolumes(t, tasks) + + // Verify convergence + effective := computeEffectiveCounts(metrics, tasks) + total := 0 + maxC, minC := 0, len(metrics) + for _, c := range effective { + total += c + if c > maxC { + maxC = c + } + if c < minC { + minC = c + } + } + avg := float64(total) / float64(len(effective)) + imbalance := float64(maxC-minC) / avg + if imbalance > tt.threshold { + t.Errorf("After %d moves, still imbalanced: effective=%v, imbalance=%.1f%% (threshold=%.1f%%)", + len(tasks), effective, imbalance*100, tt.threshold*100) + } + t.Logf("%s: %d moves, effective=%v, imbalance=%.1f%%", + tt.name, len(tasks), effective, imbalance*100) + }) + } +}