Browse Source

test: add comprehensive balance detection tests for complex scenarios

Cover multi-server convergence, max-server shifting, destination
spreading, pre-existing pending task skipping, no-duplicate-volume
invariant, and parameterized convergence verification across different
cluster shapes and thresholds.
pull/8559/head
Chris Lu 2 days ago
parent
commit
8f4ed2fb3f
  1. 437
      weed/worker/tasks/balance/detection_test.go

437
weed/worker/tasks/balance/detection_test.go

@ -1,6 +1,7 @@
package balance
import (
"fmt"
"testing"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
@ -9,6 +10,142 @@ import (
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// serverSpec describes a server for the topology builder.
type serverSpec struct {
id string // e.g. "node-1"
diskType string // e.g. "ssd", "hdd"
diskID uint32
dc string
rack string
maxVolumes int64
}
// buildTopology constructs an ActiveTopology from server specs and volume metrics.
func buildTopology(servers []serverSpec, metrics []*types.VolumeHealthMetrics) *topology.ActiveTopology {
at := topology.NewActiveTopology(0)
volumesByServer := make(map[string][]*master_pb.VolumeInformationMessage)
for _, m := range metrics {
volumesByServer[m.Server] = append(volumesByServer[m.Server], &master_pb.VolumeInformationMessage{
Id: m.VolumeID,
Size: m.Size,
Collection: m.Collection,
Version: 1,
})
}
// Group servers by dc → rack for topology construction
type rackKey struct{ dc, rack string }
rackNodes := make(map[rackKey][]*master_pb.DataNodeInfo)
for _, s := range servers {
maxVol := s.maxVolumes
if maxVol == 0 {
maxVol = 1000
}
node := &master_pb.DataNodeInfo{
Id: s.id,
Address: s.id + ":8080",
DiskInfos: map[string]*master_pb.DiskInfo{
s.diskType: {
Type: s.diskType,
DiskId: s.diskID,
VolumeInfos: volumesByServer[s.id],
VolumeCount: int64(len(volumesByServer[s.id])),
MaxVolumeCount: maxVol,
},
},
}
key := rackKey{s.dc, s.rack}
rackNodes[key] = append(rackNodes[key], node)
}
// Build DC → Rack tree
dcRacks := make(map[string][]*master_pb.RackInfo)
for key, nodes := range rackNodes {
dcRacks[key.dc] = append(dcRacks[key.dc], &master_pb.RackInfo{
Id: key.rack,
DataNodeInfos: nodes,
})
}
var dcInfos []*master_pb.DataCenterInfo
for dcID, racks := range dcRacks {
dcInfos = append(dcInfos, &master_pb.DataCenterInfo{
Id: dcID,
RackInfos: racks,
})
}
at.UpdateTopology(&master_pb.TopologyInfo{DataCenterInfos: dcInfos})
return at
}
// makeVolumes generates n VolumeHealthMetrics for a server starting at volumeIDBase.
func makeVolumes(server, diskType, dc, rack, collection string, volumeIDBase uint32, n int) []*types.VolumeHealthMetrics {
out := make([]*types.VolumeHealthMetrics, n)
for i := range out {
out[i] = &types.VolumeHealthMetrics{
VolumeID: volumeIDBase + uint32(i),
Server: server,
ServerAddress: server + ":8080",
DiskType: diskType,
Collection: collection,
Size: 1024,
DataCenter: dc,
Rack: rack,
}
}
return out
}
func defaultConf() *Config {
return &Config{
BaseConfig: base.BaseConfig{
Enabled: true,
ScanIntervalSeconds: 30,
MaxConcurrent: 1,
},
MinServerCount: 2,
ImbalanceThreshold: 0.2,
}
}
// assertNoDuplicateVolumes verifies every task moves a distinct volume.
func assertNoDuplicateVolumes(t *testing.T, tasks []*types.TaskDetectionResult) {
t.Helper()
seen := make(map[uint32]bool)
for i, task := range tasks {
if seen[task.VolumeID] {
t.Errorf("duplicate volume %d in task %d", task.VolumeID, i)
}
seen[task.VolumeID] = true
}
}
// computeEffectiveCounts returns per-server volume counts after applying all planned moves.
func computeEffectiveCounts(metrics []*types.VolumeHealthMetrics, tasks []*types.TaskDetectionResult) map[string]int {
counts := make(map[string]int)
for _, m := range metrics {
counts[m.Server]++
}
for _, task := range tasks {
counts[task.Server]-- // source loses one
if task.TypedParams != nil && len(task.TypedParams.Targets) > 0 {
// find the target server ID (strip :8080 from address)
addr := task.TypedParams.Targets[0].Node
// resolve from metrics
for _, m := range metrics {
if m.ServerAddress == addr {
counts[m.Server]++
break
}
}
}
}
return counts
}
func createMockTopology(volumes ...*types.VolumeHealthMetrics) *topology.ActiveTopology {
at := topology.NewActiveTopology(0)
@ -314,3 +451,303 @@ func TestDetection_RespectsMaxResults(t *testing.T) {
t.Errorf("Expected exactly 3 tasks (maxResults=3), got %d", len(tasks))
}
}
// --- Complicated scenario tests ---
// TestDetection_ThreeServers_ConvergesToBalance verifies that with 3 servers
// (60/30/10 volumes) the algorithm moves volumes from the heaviest server first,
// then re-evaluates, potentially shifting from the second-heaviest too.
func TestDetection_ThreeServers_ConvergesToBalance(t *testing.T) {
servers := []serverSpec{
{id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"},
{id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"},
{id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"},
}
var metrics []*types.VolumeHealthMetrics
metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 60)...)
metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 30)...)
metrics = append(metrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "c1", 200, 10)...)
at := buildTopology(servers, metrics)
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
if len(tasks) < 2 {
t.Fatalf("Expected multiple tasks for 60/30/10 imbalance, got %d", len(tasks))
}
assertNoDuplicateVolumes(t, tasks)
// Verify convergence: effective counts should be within 20% imbalance.
effective := computeEffectiveCounts(metrics, tasks)
total := 0
maxC, minC := 0, 100
for _, c := range effective {
total += c
if c > maxC {
maxC = c
}
if c < minC {
minC = c
}
}
avg := float64(total) / float64(len(effective))
imbalance := float64(maxC-minC) / avg
if imbalance > 0.2 {
t.Errorf("After %d moves, cluster still imbalanced: effective=%v, imbalance=%.1f%%",
len(tasks), effective, imbalance*100)
}
// All sources should be from the overloaded nodes, never node-c
for i, task := range tasks {
src := task.TypedParams.Sources[0].Node
if src == "node-c:8080" {
t.Errorf("Task %d: should not move FROM the underloaded server node-c", i)
}
}
}
// TestDetection_SkipsPreExistingPendingTasks verifies that volumes with
// already-registered pending tasks in ActiveTopology are skipped.
func TestDetection_SkipsPreExistingPendingTasks(t *testing.T) {
servers := []serverSpec{
{id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"},
{id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"},
}
// node-a has 20, node-b has 5
var metrics []*types.VolumeHealthMetrics
metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 20)...)
metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 5)...)
at := buildTopology(servers, metrics)
// Pre-register pending tasks for the first 15 volumes on node-a.
// This simulates a previous detection run that already planned moves.
for i := 0; i < 15; i++ {
volID := uint32(1 + i)
err := at.AddPendingTask(topology.TaskSpec{
TaskID: fmt.Sprintf("existing-%d", volID),
TaskType: topology.TaskTypeBalance,
VolumeID: volID,
VolumeSize: 1024,
Sources: []topology.TaskSourceSpec{{ServerID: "node-a", DiskID: 1}},
Destinations: []topology.TaskDestinationSpec{{ServerID: "node-b", DiskID: 2}},
})
if err != nil {
t.Fatalf("AddPendingTask failed: %v", err)
}
}
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
// None of the results should reference a volume with an existing task (IDs 1-15).
for i, task := range tasks {
if task.VolumeID >= 1 && task.VolumeID <= 15 {
t.Errorf("Task %d: volume %d already has a pending task, should have been skipped",
i, task.VolumeID)
}
}
assertNoDuplicateVolumes(t, tasks)
}
// TestDetection_NoDuplicateVolumesAcrossIterations verifies that the loop
// never selects the same volume twice, even under high maxResults.
func TestDetection_NoDuplicateVolumesAcrossIterations(t *testing.T) {
servers := []serverSpec{
{id: "node-a", diskType: "ssd", diskID: 1, dc: "dc1", rack: "rack1"},
{id: "node-b", diskType: "ssd", diskID: 2, dc: "dc1", rack: "rack1"},
}
var metrics []*types.VolumeHealthMetrics
metrics = append(metrics, makeVolumes("node-a", "ssd", "dc1", "rack1", "c1", 1, 50)...)
metrics = append(metrics, makeVolumes("node-b", "ssd", "dc1", "rack1", "c1", 100, 10)...)
at := buildTopology(servers, metrics)
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
tasks, err := Detection(metrics, clusterInfo, defaultConf(), 200)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
assertNoDuplicateVolumes(t, tasks)
}
// TestDetection_ThreeServers_MaxServerShifts verifies that after enough moves
// from the top server, the algorithm detects a new max server and moves from it.
func TestDetection_ThreeServers_MaxServerShifts(t *testing.T) {
servers := []serverSpec{
{id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"},
{id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"},
{id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"},
}
// node-a: 40, node-b: 38, node-c: 10. avg ≈ 29.3
// Initial imbalance = (40-10)/29.3 ≈ 1.02 → move from node-a.
// After a few moves from node-a, node-b becomes the new max and should be
// picked as the source.
var metrics []*types.VolumeHealthMetrics
metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 40)...)
metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 38)...)
metrics = append(metrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "c1", 200, 10)...)
at := buildTopology(servers, metrics)
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
if len(tasks) < 3 {
t.Fatalf("Expected several tasks for 40/38/10 imbalance, got %d", len(tasks))
}
// Collect source servers
sourceServers := make(map[string]int)
for _, task := range tasks {
sourceServers[task.Server]++
}
// Both node-a and node-b should appear as sources (max server shifts)
if sourceServers["node-a"] == 0 {
t.Error("Expected node-a to be a source for some moves")
}
if sourceServers["node-b"] == 0 {
t.Error("Expected node-b to be a source after node-a is drained enough")
}
if sourceServers["node-c"] > 0 {
t.Error("node-c (underloaded) should never be a source")
}
assertNoDuplicateVolumes(t, tasks)
}
// TestDetection_FourServers_DestinationSpreading verifies that with 4 servers
// (1 heavy, 3 light) the algorithm spreads moves across multiple destinations.
func TestDetection_FourServers_DestinationSpreading(t *testing.T) {
servers := []serverSpec{
{id: "node-a", diskType: "ssd", diskID: 1, dc: "dc1", rack: "rack1"},
{id: "node-b", diskType: "ssd", diskID: 2, dc: "dc1", rack: "rack2"},
{id: "node-c", diskType: "ssd", diskID: 3, dc: "dc1", rack: "rack3"},
{id: "node-d", diskType: "ssd", diskID: 4, dc: "dc1", rack: "rack4"},
}
// node-a: 80, b/c/d: 5 each. avg=23.75
var metrics []*types.VolumeHealthMetrics
metrics = append(metrics, makeVolumes("node-a", "ssd", "dc1", "rack1", "c1", 1, 80)...)
metrics = append(metrics, makeVolumes("node-b", "ssd", "dc1", "rack2", "c1", 100, 5)...)
metrics = append(metrics, makeVolumes("node-c", "ssd", "dc1", "rack3", "c1", 200, 5)...)
metrics = append(metrics, makeVolumes("node-d", "ssd", "dc1", "rack4", "c1", 300, 5)...)
at := buildTopology(servers, metrics)
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
tasks, err := Detection(metrics, clusterInfo, defaultConf(), 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
if len(tasks) < 5 {
t.Fatalf("Expected many tasks, got %d", len(tasks))
}
// Count destination servers
destServers := make(map[string]int)
for _, task := range tasks {
if task.TypedParams != nil && len(task.TypedParams.Targets) > 0 {
destServers[task.TypedParams.Targets[0].Node]++
}
}
// With 3 eligible destinations (b, c, d) and pending-task-aware scoring,
// moves should go to more than just one destination.
if len(destServers) < 2 {
t.Errorf("Expected moves to spread across destinations, but only got: %v", destServers)
}
assertNoDuplicateVolumes(t, tasks)
}
// TestDetection_ConvergenceVerification verifies that after all planned moves,
// the effective volume distribution is within the configured threshold.
func TestDetection_ConvergenceVerification(t *testing.T) {
tests := []struct {
name string
counts []int // volumes per server
threshold float64
}{
{"2-server-big-gap", []int{100, 10}, 0.2},
{"3-server-staircase", []int{90, 50, 10}, 0.2},
{"4-server-one-hot", []int{200, 20, 20, 20}, 0.2},
{"3-server-tight-threshold", []int{30, 20, 10}, 0.1},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var servers []serverSpec
var metrics []*types.VolumeHealthMetrics
volBase := uint32(1)
for i, count := range tt.counts {
id := fmt.Sprintf("node-%d", i)
servers = append(servers, serverSpec{
id: id, diskType: "hdd", diskID: uint32(i + 1),
dc: "dc1", rack: "rack1",
})
metrics = append(metrics, makeVolumes(id, "hdd", "dc1", "rack1", "c1", volBase, count)...)
volBase += uint32(count)
}
at := buildTopology(servers, metrics)
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
conf := defaultConf()
conf.ImbalanceThreshold = tt.threshold
tasks, err := Detection(metrics, clusterInfo, conf, 500)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
if len(tasks) == 0 {
t.Fatal("Expected balance tasks, got 0")
}
assertNoDuplicateVolumes(t, tasks)
// Verify convergence
effective := computeEffectiveCounts(metrics, tasks)
total := 0
maxC, minC := 0, len(metrics)
for _, c := range effective {
total += c
if c > maxC {
maxC = c
}
if c < minC {
minC = c
}
}
avg := float64(total) / float64(len(effective))
imbalance := float64(maxC-minC) / avg
if imbalance > tt.threshold {
t.Errorf("After %d moves, still imbalanced: effective=%v, imbalance=%.1f%% (threshold=%.1f%%)",
len(tasks), effective, imbalance*100, tt.threshold*100)
}
t.Logf("%s: %d moves, effective=%v, imbalance=%.1f%%",
tt.name, len(tasks), effective, imbalance*100)
})
}
}
Loading…
Cancel
Save