You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

513 lines
18 KiB

package balance
import (
"strings"
"testing"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
const (
fullnessThreshold = 1.01
stateActive = "ACTIVE"
stateFull = "FULL"
)
// filterByState filters metrics by volume state for testing.
func filterByState(metrics []*types.VolumeHealthMetrics, state string) []*types.VolumeHealthMetrics {
if state != stateActive && state != stateFull {
return metrics
}
var out []*types.VolumeHealthMetrics
for _, m := range metrics {
if m == nil {
continue
}
if state == stateActive && m.FullnessRatio < fullnessThreshold {
out = append(out, m)
}
if state == stateFull && m.FullnessRatio >= fullnessThreshold {
out = append(out, m)
}
}
return out
}
// Integration tests that exercise multiple features together:
// DC/rack/node filters, volume state filtering, replica placement validation,
// and collection scoping all interacting within a single detection run.
// makeVolumesWithOptions generates metrics with additional options.
type volumeOption func(m *types.VolumeHealthMetrics)
func withFullness(ratio float64) volumeOption {
return func(m *types.VolumeHealthMetrics) { m.FullnessRatio = ratio }
}
func withReplicas(rp int) volumeOption {
return func(m *types.VolumeHealthMetrics) { m.ExpectedReplicas = rp }
}
func makeVolumesWith(server, diskType, dc, rack, collection string, volumeIDBase uint32, n int, opts ...volumeOption) []*types.VolumeHealthMetrics {
vols := makeVolumes(server, diskType, dc, rack, collection, volumeIDBase, n)
for _, v := range vols {
for _, opt := range opts {
opt(v)
}
}
return vols
}
// buildReplicaMap builds a VolumeReplicaMap from metrics (each metric is one replica location).
func buildReplicaMap(metrics []*types.VolumeHealthMetrics) map[uint32][]types.ReplicaLocation {
m := make(map[uint32][]types.ReplicaLocation)
for _, metric := range metrics {
m[metric.VolumeID] = append(m[metric.VolumeID], types.ReplicaLocation{
DataCenter: metric.DataCenter,
Rack: metric.Rack,
NodeID: metric.Server,
})
}
return m
}
// TestIntegration_DCFilterWithVolumeState tests that DC filtering and volume
// state filtering compose correctly: only ACTIVE volumes in the specified DC
// participate in balancing.
func TestIntegration_DCFilterWithVolumeState(t *testing.T) {
servers := []serverSpec{
{id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"},
{id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"},
{id: "node-c", diskType: "hdd", diskID: 3, dc: "dc2", rack: "rack1"},
}
var allMetrics []*types.VolumeHealthMetrics
// dc1: node-a has 40 active volumes, node-b has 10 active volumes
allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 1, 40, withFullness(0.5))...)
allMetrics = append(allMetrics, makeVolumesWith("node-b", "hdd", "dc1", "rack1", "c1", 100, 10, withFullness(0.5))...)
// dc1: node-a also has 20 FULL volumes that should be excluded
allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 200, 20, withFullness(1.5))...)
// dc2: node-c has 50 active volumes (should be excluded by DC filter)
allMetrics = append(allMetrics, makeVolumesWith("node-c", "hdd", "dc2", "rack1", "c1", 300, 50, withFullness(0.5))...)
// Apply volume state filter (ACTIVE only) first
activeMetrics := filterByState(allMetrics, stateActive)
// Then apply DC filter
dcMetrics := make([]*types.VolumeHealthMetrics, 0)
for _, m := range activeMetrics {
if m.DataCenter == "dc1" {
dcMetrics = append(dcMetrics, m)
}
}
at := buildTopology(servers, allMetrics)
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
conf := defaultConf()
conf.DataCenterFilter = "dc1"
tasks, _, err := Detection(dcMetrics, clusterInfo, conf, 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
if len(tasks) == 0 {
t.Fatal("Expected balance tasks for 40/10 active-only imbalance in dc1, got 0")
}
for _, task := range tasks {
if task.Server == "node-c" {
t.Error("node-c (dc2) should not be a source")
}
if task.TypedParams != nil {
for _, tgt := range task.TypedParams.Targets {
if strings.Contains(tgt.Node, "node-c") {
t.Error("node-c (dc2) should not be a target")
}
}
}
}
// Verify convergence uses only the 50 active dc1 volumes (40+10),
// not the 20 full volumes
effective := computeEffectiveCounts(servers[:2], dcMetrics, tasks)
total := 0
maxC, minC := 0, len(dcMetrics)
for _, c := range effective {
total += c
if c > maxC {
maxC = c
}
if c < minC {
minC = c
}
}
if total != 50 {
t.Errorf("Expected 50 total active volumes in dc1, got %d", total)
}
t.Logf("DC+state filter: %d tasks, effective=%v", len(tasks), effective)
}
// TestIntegration_NodeFilterWithCollections tests that node filtering works
// correctly when volumes span multiple collections.
func TestIntegration_NodeFilterWithCollections(t *testing.T) {
servers := []serverSpec{
{id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"},
{id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"},
{id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"},
}
var allMetrics []*types.VolumeHealthMetrics
// node-a: 30 "photos" + 20 "videos" = 50 total
allMetrics = append(allMetrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "photos", 1, 30)...)
allMetrics = append(allMetrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "videos", 100, 20)...)
// node-b: 5 "photos" + 5 "videos" = 10 total
allMetrics = append(allMetrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "photos", 200, 5)...)
allMetrics = append(allMetrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "videos", 300, 5)...)
// node-c: 40 volumes (should be excluded by node filter)
allMetrics = append(allMetrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "photos", 400, 40)...)
// Apply node filter
filteredMetrics := make([]*types.VolumeHealthMetrics, 0)
for _, m := range allMetrics {
if m.Server == "node-a" || m.Server == "node-b" {
filteredMetrics = append(filteredMetrics, m)
}
}
at := buildTopology(servers, allMetrics)
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
conf := defaultConf()
conf.NodeFilter = "node-a,node-b"
tasks, _, err := Detection(filteredMetrics, clusterInfo, conf, 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
if len(tasks) == 0 {
t.Fatal("Expected tasks for 50/10 imbalance within node-a,node-b")
}
// All moves should be between node-a and node-b only
for _, task := range tasks {
if task.Server != "node-a" && task.Server != "node-b" {
t.Errorf("Source %s should be node-a or node-b", task.Server)
}
if task.TypedParams != nil {
for _, tgt := range task.TypedParams.Targets {
if !strings.Contains(tgt.Node, "node-a") && !strings.Contains(tgt.Node, "node-b") {
t.Errorf("Target %s should be node-a or node-b", tgt.Node)
}
}
}
}
assertNoDuplicateVolumes(t, tasks)
t.Logf("Node filter with mixed collections: %d tasks", len(tasks))
}
// TestIntegration_ReplicaPlacementWithDCFilter tests that replica placement
// validation prevents moves that would violate replication policy even when
// DC filtering restricts the available servers.
func TestIntegration_ReplicaPlacementWithDCFilter(t *testing.T) {
// Setup: 2 DCs, volumes with rp=100 (1 replica in different DC)
// DC filter restricts to dc1 only.
// Replicas: each volume has one copy in dc1 and one in dc2.
// The balancer should NOT move volumes within dc1 to a node that would
// violate the cross-DC placement requirement.
servers := []serverSpec{
{id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"},
{id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"},
{id: "node-c", diskType: "hdd", diskID: 3, dc: "dc2", rack: "rack1"},
}
// node-a: 30 volumes with rp=100, node-b: 5 volumes with rp=100
// Each volume also has a replica on node-c (dc2)
var allMetrics []*types.VolumeHealthMetrics
allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 1, 30, withReplicas(100))...)
allMetrics = append(allMetrics, makeVolumesWith("node-b", "hdd", "dc1", "rack1", "c1", 100, 5, withReplicas(100))...)
// dc2 replicas (not part of filtered metrics, but in replica map)
dc2Replicas := makeVolumesWith("node-c", "hdd", "dc2", "rack1", "c1", 1, 30, withReplicas(100))
// Build replica map: volumes 1-30 have replicas on node-a AND node-c
replicaMap := buildReplicaMap(allMetrics)
for _, r := range dc2Replicas {
replicaMap[r.VolumeID] = append(replicaMap[r.VolumeID], types.ReplicaLocation{
DataCenter: r.DataCenter,
Rack: r.Rack,
NodeID: r.Server,
})
}
// Filter to dc1 only
dc1Metrics := make([]*types.VolumeHealthMetrics, 0)
for _, m := range allMetrics {
if m.DataCenter == "dc1" {
dc1Metrics = append(dc1Metrics, m)
}
}
at := buildTopology(servers, allMetrics)
clusterInfo := &types.ClusterInfo{
ActiveTopology: at,
VolumeReplicaMap: replicaMap,
}
conf := defaultConf()
conf.DataCenterFilter = "dc1"
tasks, _, err := Detection(dc1Metrics, clusterInfo, conf, 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
// With rp=100, moving a volume from node-a to node-b is valid because
// the cross-DC replica on node-c is preserved. The balancer should
// produce moves that keep the dc2 replica intact.
if len(tasks) == 0 {
t.Fatal("Expected tasks: 30/5 imbalance with valid cross-DC replicas")
}
for _, task := range tasks {
// All sources should be from dc1
if task.Server != "node-a" && task.Server != "node-b" {
t.Errorf("Source %s should be in dc1", task.Server)
}
// All targets should be in dc1
if task.TypedParams != nil {
for _, tgt := range task.TypedParams.Targets {
if strings.Contains(tgt.Node, "node-c") {
t.Errorf("Target should not be node-c (dc2) with dc1 filter")
}
}
}
}
assertNoDuplicateVolumes(t, tasks)
t.Logf("Replica placement + DC filter: %d tasks", len(tasks))
}
// TestIntegration_RackFilterWithReplicaPlacement tests that rack filtering
// and replica placement validation (rp=010) work together. With rp=010,
// replicas must be on different racks. When rack filtering restricts to one
// rack, the balancer should still produce valid moves within that rack for
// volumes whose replicas satisfy the cross-rack requirement via nodes outside
// the filter.
func TestIntegration_RackFilterWithReplicaPlacement(t *testing.T) {
servers := []serverSpec{
{id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"},
{id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"},
{id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack2"},
}
// rack1: node-a has 30 volumes, node-b has 5
// Each volume also has a replica on node-c (rack2), satisfying rp=010
var allMetrics []*types.VolumeHealthMetrics
allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 1, 30, withReplicas(10))...)
allMetrics = append(allMetrics, makeVolumesWith("node-b", "hdd", "dc1", "rack1", "c1", 100, 5, withReplicas(10))...)
rack2Replicas := makeVolumesWith("node-c", "hdd", "dc1", "rack2", "c1", 1, 30, withReplicas(10))
replicaMap := buildReplicaMap(allMetrics)
for _, r := range rack2Replicas {
replicaMap[r.VolumeID] = append(replicaMap[r.VolumeID], types.ReplicaLocation{
DataCenter: r.DataCenter,
Rack: r.Rack,
NodeID: r.Server,
})
}
// Filter to rack1 only
rack1Metrics := make([]*types.VolumeHealthMetrics, 0)
for _, m := range allMetrics {
if m.Rack == "rack1" {
rack1Metrics = append(rack1Metrics, m)
}
}
at := buildTopology(servers, allMetrics)
clusterInfo := &types.ClusterInfo{
ActiveTopology: at,
VolumeReplicaMap: replicaMap,
}
conf := defaultConf()
conf.RackFilter = "rack1"
tasks, _, err := Detection(rack1Metrics, clusterInfo, conf, 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
// Moving within rack1 (node-a → node-b) is valid because the cross-rack
// replica on node-c (rack2) is preserved.
if len(tasks) == 0 {
t.Fatal("Expected tasks for 30/5 imbalance within rack1")
}
for _, task := range tasks {
if task.Server == "node-c" {
t.Error("node-c (rack2) should not be a source with rack1 filter")
}
if task.TypedParams != nil {
for _, tgt := range task.TypedParams.Targets {
if strings.Contains(tgt.Node, "node-c") {
t.Error("node-c (rack2) should not be a target with rack1 filter")
}
}
}
}
assertNoDuplicateVolumes(t, tasks)
t.Logf("Rack filter + replica placement: %d tasks", len(tasks))
}
// TestIntegration_AllFactors exercises all filtering dimensions simultaneously:
// DC filter, rack filter, volume state filter, replica placement validation,
// and mixed collections.
func TestIntegration_AllFactors(t *testing.T) {
servers := []serverSpec{
{id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"},
{id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"},
{id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack2"}, // excluded by rack filter
{id: "node-d", diskType: "hdd", diskID: 4, dc: "dc2", rack: "rack1"}, // excluded by DC filter
}
var allMetrics []*types.VolumeHealthMetrics
// node-a: 25 active "photos" + 10 full "photos" (full excluded by state filter)
allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "photos", 1, 25, withFullness(0.5), withReplicas(100))...)
allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "photos", 200, 10, withFullness(1.5), withReplicas(100))...)
// node-b: 5 active "photos"
allMetrics = append(allMetrics, makeVolumesWith("node-b", "hdd", "dc1", "rack1", "photos", 100, 5, withFullness(0.5), withReplicas(100))...)
// node-c: 20 volumes in dc1/rack2 (excluded by rack filter)
allMetrics = append(allMetrics, makeVolumesWith("node-c", "hdd", "dc1", "rack2", "photos", 300, 20, withFullness(0.5))...)
// node-d: 30 volumes in dc2 (excluded by DC filter, but provides cross-DC replicas)
dc2Replicas := makeVolumesWith("node-d", "hdd", "dc2", "rack1", "photos", 1, 25, withFullness(0.5), withReplicas(100))
// Build replica map: volumes 1-25 have cross-DC replicas on node-d
replicaMap := buildReplicaMap(allMetrics)
for _, r := range dc2Replicas {
replicaMap[r.VolumeID] = append(replicaMap[r.VolumeID], types.ReplicaLocation{
DataCenter: r.DataCenter,
Rack: r.Rack,
NodeID: r.Server,
})
}
// Apply all filters: ACTIVE state, dc1, rack1
filtered := filterByState(allMetrics, stateActive)
var finalMetrics []*types.VolumeHealthMetrics
for _, m := range filtered {
if m.DataCenter == "dc1" && m.Rack == "rack1" {
finalMetrics = append(finalMetrics, m)
}
}
at := buildTopology(servers, allMetrics)
clusterInfo := &types.ClusterInfo{
ActiveTopology: at,
VolumeReplicaMap: replicaMap,
}
conf := defaultConf()
conf.DataCenterFilter = "dc1"
conf.RackFilter = "rack1"
tasks, _, err := Detection(finalMetrics, clusterInfo, conf, 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
if len(tasks) == 0 {
t.Fatal("Expected tasks for 25/5 active imbalance in dc1/rack1")
}
// Verify all moves stay within dc1/rack1 scope
for i, task := range tasks {
if task.Server != "node-a" && task.Server != "node-b" {
t.Errorf("Task %d: source %s should be node-a or node-b", i, task.Server)
}
if task.TypedParams != nil {
for _, tgt := range task.TypedParams.Targets {
node := tgt.Node
if !strings.Contains(node, "node-a") && !strings.Contains(node, "node-b") {
t.Errorf("Task %d: target %s should be node-a or node-b", i, node)
}
}
}
}
assertNoDuplicateVolumes(t, tasks)
// Verify convergence
effective := computeEffectiveCounts(servers[:2], finalMetrics, tasks)
total := 0
maxC, minC := 0, len(finalMetrics)
for _, c := range effective {
total += c
if c > maxC {
maxC = c
}
if c < minC {
minC = c
}
}
// Should have balanced the 30 active dc1/rack1 volumes (25+5)
if total != 30 {
t.Errorf("Expected 30 total filtered volumes, got %d", total)
}
avg := float64(total) / float64(len(effective))
imbalance := float64(maxC-minC) / avg
if imbalance > conf.ImbalanceThreshold {
t.Errorf("Still imbalanced: effective=%v, imbalance=%.1f%% (threshold=%.1f%%)",
effective, imbalance*100, conf.ImbalanceThreshold*100)
}
t.Logf("All factors combined: %d tasks, effective=%v", len(tasks), effective)
}
// TestIntegration_FullVolumesOnlyBalancing verifies that with FULL state filter,
// only full volumes participate in balancing.
func TestIntegration_FullVolumesOnlyBalancing(t *testing.T) {
servers := []serverSpec{
{id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"},
{id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"},
}
var allMetrics []*types.VolumeHealthMetrics
// node-a: 10 active + 30 full
allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 1, 10, withFullness(0.5))...)
allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 100, 30, withFullness(1.5))...)
// node-b: 10 active + 5 full
allMetrics = append(allMetrics, makeVolumesWith("node-b", "hdd", "dc1", "rack1", "c1", 200, 10, withFullness(0.5))...)
allMetrics = append(allMetrics, makeVolumesWith("node-b", "hdd", "dc1", "rack1", "c1", 300, 5, withFullness(1.5))...)
// Filter to FULL only
fullMetrics := filterByState(allMetrics, stateFull)
at := buildTopology(servers, allMetrics)
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
tasks, _, err := Detection(fullMetrics, clusterInfo, defaultConf(), 100)
if err != nil {
t.Fatalf("Detection failed: %v", err)
}
if len(tasks) == 0 {
t.Fatal("Expected tasks for 30/5 full volume imbalance")
}
// Verify only full volumes are moved (IDs 100-129 from node-a, 300-304 from node-b)
for _, task := range tasks {
vid := task.VolumeID
// Active volumes are IDs 1-10 (node-a) and 200-209 (node-b)
if (vid >= 1 && vid <= 10) || (vid >= 200 && vid <= 209) {
t.Errorf("Task moved active volume %d, should only move full volumes", vid)
}
}
assertNoDuplicateVolumes(t, tasks)
t.Logf("Full-only balancing: %d tasks", len(tasks))
}