You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
514 lines
14 KiB
514 lines
14 KiB
package ec_balance
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
)
|
|
|
|
func TestShardBitCount(t *testing.T) {
|
|
tests := []struct {
|
|
bits uint32
|
|
expected int
|
|
}{
|
|
{0, 0},
|
|
{1, 1},
|
|
{0b111, 3},
|
|
{0x3FFF, 14}, // all 14 shards
|
|
{0b10101010, 4},
|
|
}
|
|
for _, tt := range tests {
|
|
got := shardBitCount(tt.bits)
|
|
if got != tt.expected {
|
|
t.Errorf("shardBitCount(%b) = %d, want %d", tt.bits, got, tt.expected)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestCeilDivide(t *testing.T) {
|
|
tests := []struct {
|
|
a, b int
|
|
expected int
|
|
}{
|
|
{14, 3, 5},
|
|
{14, 7, 2},
|
|
{10, 3, 4},
|
|
{0, 5, 0},
|
|
{5, 0, 0},
|
|
}
|
|
for _, tt := range tests {
|
|
got := ceilDivide(tt.a, tt.b)
|
|
if got != tt.expected {
|
|
t.Errorf("ceilDivide(%d, %d) = %d, want %d", tt.a, tt.b, got, tt.expected)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestDetectDuplicateShards(t *testing.T) {
|
|
nodes := map[string]*ecNodeInfo{
|
|
"node1": {
|
|
nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 5,
|
|
ecShards: map[uint32]*ecVolumeInfo{
|
|
100: {collection: "col1", shardBits: 0b11}, // shard 0, 1
|
|
},
|
|
},
|
|
"node2": {
|
|
nodeID: "node2", address: "node2:8080", rack: "dc1:rack2", freeSlots: 10,
|
|
ecShards: map[uint32]*ecVolumeInfo{
|
|
100: {collection: "col1", shardBits: 0b01}, // shard 0 (duplicate)
|
|
},
|
|
},
|
|
}
|
|
|
|
moves := detectDuplicateShards(100, "col1", nodes, "")
|
|
|
|
if len(moves) != 1 {
|
|
t.Fatalf("expected 1 dedup move, got %d", len(moves))
|
|
}
|
|
|
|
move := moves[0]
|
|
if move.phase != "dedup" {
|
|
t.Errorf("expected phase 'dedup', got %q", move.phase)
|
|
}
|
|
if move.shardID != 0 {
|
|
t.Errorf("expected shard 0 to be deduplicated, got %d", move.shardID)
|
|
}
|
|
// node1 has fewer free slots, so the duplicate on node1 should be removed (keeper is node2)
|
|
if move.source.nodeID != "node1" {
|
|
t.Errorf("expected source node1 (fewer free slots), got %s", move.source.nodeID)
|
|
}
|
|
// Dedup moves set target=source so isDedupPhase recognizes unmount+delete only
|
|
if move.target.nodeID != "node1" {
|
|
t.Errorf("expected target node1 (same as source for dedup), got %s", move.target.nodeID)
|
|
}
|
|
}
|
|
|
|
func TestDetectCrossRackImbalance(t *testing.T) {
|
|
// 14 shards all on rack1, 2 racks available — large imbalance
|
|
nodes := map[string]*ecNodeInfo{
|
|
"node1": {
|
|
nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 0,
|
|
ecShards: map[uint32]*ecVolumeInfo{
|
|
100: {collection: "col1", shardBits: 0x3FFF}, // all 14 shards
|
|
},
|
|
},
|
|
"node2": {
|
|
nodeID: "node2", address: "node2:8080", rack: "dc1:rack2", freeSlots: 20,
|
|
ecShards: map[uint32]*ecVolumeInfo{},
|
|
},
|
|
}
|
|
racks := map[string]*ecRackInfo{
|
|
"dc1:rack1": {
|
|
nodes: map[string]*ecNodeInfo{"node1": nodes["node1"]},
|
|
freeSlots: 0,
|
|
},
|
|
"dc1:rack2": {
|
|
nodes: map[string]*ecNodeInfo{"node2": nodes["node2"]},
|
|
freeSlots: 20,
|
|
},
|
|
}
|
|
|
|
// Use very low threshold so this triggers
|
|
moves := detectCrossRackImbalance(100, "col1", nodes, racks, "", 0.01)
|
|
|
|
// With 14 shards across 2 racks, max per rack = 7
|
|
// rack1 has 14 -> excess = 7, should move 7 to rack2
|
|
if len(moves) != 7 {
|
|
t.Fatalf("expected 7 cross-rack moves, got %d", len(moves))
|
|
}
|
|
for _, move := range moves {
|
|
if move.phase != "cross_rack" {
|
|
t.Errorf("expected phase 'cross_rack', got %q", move.phase)
|
|
}
|
|
if move.source.rack != "dc1:rack1" {
|
|
t.Errorf("expected source dc1:rack1, got %s", move.source.rack)
|
|
}
|
|
if move.target.rack != "dc1:rack2" {
|
|
t.Errorf("expected target dc1:rack2, got %s", move.target.rack)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestDetectCrossRackImbalanceBelowThreshold(t *testing.T) {
|
|
// Slight imbalance: rack1 has 8, rack2 has 6 — imbalance = 2/7 ≈ 0.29
|
|
nodes := map[string]*ecNodeInfo{
|
|
"node1": {
|
|
nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 10,
|
|
ecShards: map[uint32]*ecVolumeInfo{
|
|
100: {collection: "col1", shardBits: 0xFF}, // 8 shards
|
|
},
|
|
},
|
|
"node2": {
|
|
nodeID: "node2", address: "node2:8080", rack: "dc1:rack2", freeSlots: 10,
|
|
ecShards: map[uint32]*ecVolumeInfo{
|
|
100: {collection: "col1", shardBits: 0x3F00}, // 6 shards
|
|
},
|
|
},
|
|
}
|
|
racks := map[string]*ecRackInfo{
|
|
"dc1:rack1": {
|
|
nodes: map[string]*ecNodeInfo{"node1": nodes["node1"]},
|
|
freeSlots: 10,
|
|
},
|
|
"dc1:rack2": {
|
|
nodes: map[string]*ecNodeInfo{"node2": nodes["node2"]},
|
|
freeSlots: 10,
|
|
},
|
|
}
|
|
|
|
// High threshold should skip this
|
|
moves := detectCrossRackImbalance(100, "col1", nodes, racks, "", 0.5)
|
|
if len(moves) != 0 {
|
|
t.Fatalf("expected 0 moves below threshold, got %d", len(moves))
|
|
}
|
|
}
|
|
|
|
func TestDetectWithinRackImbalance(t *testing.T) {
|
|
// rack1 has 2 nodes: node1 has 10 shards, node2 has 0 shards
|
|
nodes := map[string]*ecNodeInfo{
|
|
"node1": {
|
|
nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 5,
|
|
ecShards: map[uint32]*ecVolumeInfo{
|
|
100: {collection: "col1", shardBits: 0b1111111111}, // shards 0-9
|
|
},
|
|
},
|
|
"node2": {
|
|
nodeID: "node2", address: "node2:8080", rack: "dc1:rack1", freeSlots: 20,
|
|
ecShards: map[uint32]*ecVolumeInfo{},
|
|
},
|
|
}
|
|
racks := map[string]*ecRackInfo{
|
|
"dc1:rack1": {
|
|
nodes: map[string]*ecNodeInfo{"node1": nodes["node1"], "node2": nodes["node2"]},
|
|
freeSlots: 25,
|
|
},
|
|
}
|
|
|
|
moves := detectWithinRackImbalance(100, "col1", nodes, racks, "", 0.01)
|
|
|
|
// 10 shards on 2 nodes, max per node = 5
|
|
// node1 has 10 -> excess = 5, should move 5 to node2
|
|
if len(moves) != 5 {
|
|
t.Fatalf("expected 5 within-rack moves, got %d", len(moves))
|
|
}
|
|
for _, move := range moves {
|
|
if move.phase != "within_rack" {
|
|
t.Errorf("expected phase 'within_rack', got %q", move.phase)
|
|
}
|
|
if move.source.nodeID != "node1" {
|
|
t.Errorf("expected source node1, got %s", move.source.nodeID)
|
|
}
|
|
if move.target.nodeID != "node2" {
|
|
t.Errorf("expected target node2, got %s", move.target.nodeID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestDetectGlobalImbalance(t *testing.T) {
|
|
// node1 has 20 total shards, node2 has 2 total shards (same rack)
|
|
nodes := map[string]*ecNodeInfo{
|
|
"node1": {
|
|
nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 5,
|
|
ecShards: map[uint32]*ecVolumeInfo{
|
|
100: {collection: "col1", shardBits: 0x3FFF}, // 14 shards
|
|
200: {collection: "col1", shardBits: 0b111111}, // 6 shards
|
|
},
|
|
},
|
|
"node2": {
|
|
nodeID: "node2", address: "node2:8080", rack: "dc1:rack1", freeSlots: 30,
|
|
ecShards: map[uint32]*ecVolumeInfo{
|
|
300: {collection: "col1", shardBits: 0b11}, // 2 shards
|
|
},
|
|
},
|
|
}
|
|
racks := map[string]*ecRackInfo{
|
|
"dc1:rack1": {
|
|
nodes: map[string]*ecNodeInfo{"node1": nodes["node1"], "node2": nodes["node2"]},
|
|
freeSlots: 35,
|
|
},
|
|
}
|
|
|
|
config := NewDefaultConfig()
|
|
config.ImbalanceThreshold = 0.01 // low threshold to ensure moves happen
|
|
moves := detectGlobalImbalance(nodes, racks, config, nil)
|
|
|
|
// Total = 22 shards, avg = 11. node1 has 20, node2 has 2.
|
|
// Should move shards until balanced (max 10 iterations)
|
|
if len(moves) == 0 {
|
|
t.Fatal("expected global balance moves, got 0")
|
|
}
|
|
for _, move := range moves {
|
|
if move.phase != "global" {
|
|
t.Errorf("expected phase 'global', got %q", move.phase)
|
|
}
|
|
if move.source.nodeID != "node1" {
|
|
t.Errorf("expected moves from node1, got %s", move.source.nodeID)
|
|
}
|
|
if move.target.nodeID != "node2" {
|
|
t.Errorf("expected moves to node2, got %s", move.target.nodeID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestDetectGlobalImbalanceSkipsFullNodes(t *testing.T) {
|
|
// node2 has 0 free slots — should not be chosen as destination
|
|
nodes := map[string]*ecNodeInfo{
|
|
"node1": {
|
|
nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 10,
|
|
ecShards: map[uint32]*ecVolumeInfo{
|
|
100: {collection: "col1", shardBits: 0x3FFF}, // 14 shards
|
|
},
|
|
},
|
|
"node2": {
|
|
nodeID: "node2", address: "node2:8080", rack: "dc1:rack1", freeSlots: 0,
|
|
ecShards: map[uint32]*ecVolumeInfo{
|
|
200: {collection: "col1", shardBits: 0b11}, // 2 shards
|
|
},
|
|
},
|
|
}
|
|
racks := map[string]*ecRackInfo{
|
|
"dc1:rack1": {
|
|
nodes: map[string]*ecNodeInfo{"node1": nodes["node1"], "node2": nodes["node2"]},
|
|
freeSlots: 10,
|
|
},
|
|
}
|
|
|
|
config := NewDefaultConfig()
|
|
config.ImbalanceThreshold = 0.01
|
|
moves := detectGlobalImbalance(nodes, racks, config, nil)
|
|
|
|
// node2 has no free slots so no moves should be proposed
|
|
if len(moves) != 0 {
|
|
t.Fatalf("expected 0 moves (node2 full), got %d", len(moves))
|
|
}
|
|
}
|
|
|
|
func TestBuildECTopology(t *testing.T) {
|
|
topoInfo := &master_pb.TopologyInfo{
|
|
DataCenterInfos: []*master_pb.DataCenterInfo{
|
|
{
|
|
Id: "dc1",
|
|
RackInfos: []*master_pb.RackInfo{
|
|
{
|
|
Id: "rack1",
|
|
DataNodeInfos: []*master_pb.DataNodeInfo{
|
|
{
|
|
Id: "server1:8080",
|
|
DiskInfos: map[string]*master_pb.DiskInfo{
|
|
"": {
|
|
MaxVolumeCount: 100,
|
|
VolumeCount: 50,
|
|
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{
|
|
{
|
|
Id: 1,
|
|
Collection: "test",
|
|
EcIndexBits: 0x3FFF, // all 14 shards
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
config := NewDefaultConfig()
|
|
nodes, racks := buildECTopology(topoInfo, config)
|
|
|
|
if len(nodes) != 1 {
|
|
t.Fatalf("expected 1 node, got %d", len(nodes))
|
|
}
|
|
if len(racks) != 1 {
|
|
t.Fatalf("expected 1 rack, got %d", len(racks))
|
|
}
|
|
|
|
node := nodes["server1:8080"]
|
|
if node == nil {
|
|
t.Fatal("expected node server1:8080")
|
|
}
|
|
if node.dc != "dc1" {
|
|
t.Errorf("expected dc=dc1, got %s", node.dc)
|
|
}
|
|
// Rack key should be dc:rack composite
|
|
if node.rack != "dc1:rack1" {
|
|
t.Errorf("expected rack=dc1:rack1, got %s", node.rack)
|
|
}
|
|
|
|
ecInfo, ok := node.ecShards[1]
|
|
if !ok {
|
|
t.Fatal("expected EC shard info for volume 1")
|
|
}
|
|
if ecInfo.collection != "test" {
|
|
t.Errorf("expected collection=test, got %s", ecInfo.collection)
|
|
}
|
|
if shardBitCount(ecInfo.shardBits) != 14 {
|
|
t.Errorf("expected 14 shards, got %d", shardBitCount(ecInfo.shardBits))
|
|
}
|
|
}
|
|
|
|
func TestBuildECTopologyCrossDCRackNames(t *testing.T) {
|
|
// Two DCs with identically-named racks should produce distinct rack keys
|
|
topoInfo := &master_pb.TopologyInfo{
|
|
DataCenterInfos: []*master_pb.DataCenterInfo{
|
|
{
|
|
Id: "dc1",
|
|
RackInfos: []*master_pb.RackInfo{{
|
|
Id: "rack1",
|
|
DataNodeInfos: []*master_pb.DataNodeInfo{{
|
|
Id: "node-dc1:8080",
|
|
DiskInfos: map[string]*master_pb.DiskInfo{
|
|
"": {MaxVolumeCount: 10, VolumeCount: 0},
|
|
},
|
|
}},
|
|
}},
|
|
},
|
|
{
|
|
Id: "dc2",
|
|
RackInfos: []*master_pb.RackInfo{{
|
|
Id: "rack1",
|
|
DataNodeInfos: []*master_pb.DataNodeInfo{{
|
|
Id: "node-dc2:8080",
|
|
DiskInfos: map[string]*master_pb.DiskInfo{
|
|
"": {MaxVolumeCount: 10, VolumeCount: 0},
|
|
},
|
|
}},
|
|
}},
|
|
},
|
|
},
|
|
}
|
|
|
|
config := NewDefaultConfig()
|
|
_, racks := buildECTopology(topoInfo, config)
|
|
|
|
if len(racks) != 2 {
|
|
t.Fatalf("expected 2 distinct racks, got %d", len(racks))
|
|
}
|
|
if _, ok := racks["dc1:rack1"]; !ok {
|
|
t.Error("expected dc1:rack1 rack key")
|
|
}
|
|
if _, ok := racks["dc2:rack1"]; !ok {
|
|
t.Error("expected dc2:rack1 rack key")
|
|
}
|
|
}
|
|
|
|
func TestCollectECCollections(t *testing.T) {
|
|
nodes := map[string]*ecNodeInfo{
|
|
"node1": {
|
|
ecShards: map[uint32]*ecVolumeInfo{
|
|
100: {collection: "col1"},
|
|
200: {collection: "col2"},
|
|
},
|
|
},
|
|
"node2": {
|
|
ecShards: map[uint32]*ecVolumeInfo{
|
|
100: {collection: "col1"},
|
|
300: {collection: "col2"},
|
|
},
|
|
},
|
|
}
|
|
|
|
config := NewDefaultConfig()
|
|
collections := collectECCollections(nodes, config)
|
|
|
|
if len(collections) != 2 {
|
|
t.Fatalf("expected 2 collections, got %d", len(collections))
|
|
}
|
|
if len(collections["col1"]) != 1 {
|
|
t.Errorf("expected 1 volume in col1, got %d", len(collections["col1"]))
|
|
}
|
|
if len(collections["col2"]) != 2 {
|
|
t.Errorf("expected 2 volumes in col2, got %d", len(collections["col2"]))
|
|
}
|
|
}
|
|
|
|
func TestCollectECCollectionsWithFilter(t *testing.T) {
|
|
nodes := map[string]*ecNodeInfo{
|
|
"node1": {
|
|
ecShards: map[uint32]*ecVolumeInfo{
|
|
100: {collection: "col1"},
|
|
200: {collection: "col2"},
|
|
},
|
|
},
|
|
}
|
|
|
|
config := NewDefaultConfig()
|
|
config.CollectionFilter = "col1"
|
|
collections := collectECCollections(nodes, config)
|
|
|
|
if len(collections) != 1 {
|
|
t.Fatalf("expected 1 collection, got %d", len(collections))
|
|
}
|
|
if _, ok := collections["col1"]; !ok {
|
|
t.Error("expected col1 to be present")
|
|
}
|
|
}
|
|
|
|
func TestDetectionDisabled(t *testing.T) {
|
|
config := NewDefaultConfig()
|
|
config.Enabled = false
|
|
|
|
results, hasMore, err := Detection(context.Background(), nil, nil, config, 0)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if hasMore {
|
|
t.Error("expected hasMore=false")
|
|
}
|
|
if len(results) != 0 {
|
|
t.Errorf("expected 0 results, got %d", len(results))
|
|
}
|
|
}
|
|
|
|
func TestDetectionNilTopology(t *testing.T) {
|
|
config := NewDefaultConfig()
|
|
clusterInfo := &types.ClusterInfo{ActiveTopology: nil}
|
|
|
|
_, _, err := Detection(context.Background(), nil, clusterInfo, config, 0)
|
|
if err == nil {
|
|
t.Fatal("expected error for nil topology")
|
|
}
|
|
}
|
|
|
|
func TestMovePhasePriority(t *testing.T) {
|
|
if movePhasePriority("dedup") != types.TaskPriorityHigh {
|
|
t.Error("dedup should be high priority")
|
|
}
|
|
if movePhasePriority("cross_rack") != types.TaskPriorityMedium {
|
|
t.Error("cross_rack should be medium priority")
|
|
}
|
|
if movePhasePriority("within_rack") != types.TaskPriorityLow {
|
|
t.Error("within_rack should be low priority")
|
|
}
|
|
if movePhasePriority("global") != types.TaskPriorityLow {
|
|
t.Error("global should be low priority")
|
|
}
|
|
}
|
|
|
|
func TestExceedsImbalanceThreshold(t *testing.T) {
|
|
// 14 vs 0 across 2 groups: imbalance = 14/7 = 2.0 > any reasonable threshold
|
|
counts := map[string]int{"a": 14, "b": 0}
|
|
if !exceedsImbalanceThreshold(counts, 14, 2, 0.2) {
|
|
t.Error("expected imbalance to exceed 0.2 threshold")
|
|
}
|
|
|
|
// Only one group has shards but numGroups=2: min is 0 from absent group
|
|
counts2 := map[string]int{"a": 14}
|
|
if !exceedsImbalanceThreshold(counts2, 14, 2, 0.2) {
|
|
t.Error("expected imbalance with absent group to exceed 0.2 threshold")
|
|
}
|
|
|
|
// 7 vs 7: perfectly balanced
|
|
counts3 := map[string]int{"a": 7, "b": 7}
|
|
if exceedsImbalanceThreshold(counts3, 14, 2, 0.01) {
|
|
t.Error("expected balanced distribution to not exceed threshold")
|
|
}
|
|
}
|
|
|
|
// helper to avoid unused import
|
|
var _ = erasure_coding.DataShardsCount
|