From f256002d0ba1f31741c76d31f37a6d8d72acac41 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 27 Mar 2026 11:14:10 -0700 Subject: [PATCH] fix ec.balance failing to rebalance when all nodes share all volumes (#8796) * fix ec.balance failing to rebalance when all nodes share all volumes (#8793) Two bugs in doBalanceEcRack prevented rebalancing: 1. Sorting by freeEcSlot instead of actual shard count caused incorrect empty/full node selection when nodes have different total capacities. 2. The volume-level check skipped any volume already present on the target node. When every node has a shard of every volume (common with many EC volumes across N nodes with N shards each), no moves were possible. Fix: sort by actual shard count, and use a two-pass approach - first prefer moving shards of volumes not on the target (best diversity), then fall back to moving specific shard IDs not yet on the target. * add test simulating real cluster topology from issue #8793 Uses the actual node addresses and mixed max capacities (80 vs 33) from the reporter's 14-node cluster to verify ec.balance correctly rebalances with heterogeneous node sizes. * fix pass comments to match 0-indexed loop variable --- weed/shell/command_ec_common.go | 72 ++++++++++------- weed/shell/command_ec_test.go | 137 ++++++++++++++++++++++++++++++++ 2 files changed, 181 insertions(+), 28 deletions(-) diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 5af5b1c17..ba84fc7f7 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -1301,49 +1301,65 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { hasMove := true for hasMove { hasMove = false + // Sort by shard count (ascending) so emptyNode has fewest shards, fullNode has most. + // Using freeEcSlot would be incorrect when nodes have different total capacities. slices.SortFunc(rackEcNodes, func(a, b *EcNode) int { - return b.freeEcSlot - a.freeEcSlot + return ecNodeIdToShardCount[a.info.Id] - ecNodeIdToShardCount[b.info.Id] }) emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1] emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id] if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount { - emptyNodeIds := make(map[uint32]bool) + // Build a map of volume ID -> shard bits on the empty node + emptyNodeVolumeShards := make(map[uint32]erasure_coding.ShardBits) if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(ecb.diskType)]; found { for _, shards := range emptyDiskInfo.EcShardInfos { - emptyNodeIds[shards.Id] = true + emptyNodeVolumeShards[shards.Id] = erasure_coding.ShardBits(shards.EcIndexBits) } } + if fullDiskInfo, found := fullNode.info.DiskInfos[string(ecb.diskType)]; found { - for _, shards := range fullDiskInfo.EcShardInfos { - if _, found := emptyNodeIds[shards.Id]; found { - continue - } - si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(shards) - for _, shardId := range si.Ids() { - vid := needle.VolumeId(shards.Id) - // For balancing, strictly require matching disk type - // For balancing, strictly require matching disk type and apply anti-affinity - dataShardCount := ecb.getDataShardCount() - destDiskId := pickBestDiskOnNode(emptyNode, vid, ecb.diskType, true, shardId, dataShardCount) - - if destDiskId > 0 { - fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId) - } else { - fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) + // Pass 0: prefer moving shards of volumes not on the empty node (best diversity) + // Pass 1: move shards of shared volumes where the specific shard ID differs + for pass := 0; pass < 2 && !hasMove; pass++ { + for _, shards := range fullDiskInfo.EcShardInfos { + emptyBits, volumeOnEmpty := emptyNodeVolumeShards[shards.Id] + if pass == 0 && volumeOnEmpty { + continue // pass 0: skip volumes already on the empty node } - - err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing, ecb.diskType) - if err != nil { - return err + if pass == 1 && !volumeOnEmpty { + continue // pass 1: only consider volumes already on the empty node } + si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(shards) + for _, shardId := range si.Ids() { + if pass == 1 && emptyBits.Has(shardId) { + continue // Skip shard IDs already on the empty node + } + vid := needle.VolumeId(shards.Id) + // For balancing, strictly require matching disk type and apply anti-affinity + dataShardCount := ecb.getDataShardCount() + destDiskId := pickBestDiskOnNode(emptyNode, vid, ecb.diskType, true, shardId, dataShardCount) + + if destDiskId > 0 { + fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId) + } else { + fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) + } - ecNodeIdToShardCount[emptyNode.info.Id]++ - ecNodeIdToShardCount[fullNode.info.Id]-- - hasMove = true - break + err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing, ecb.diskType) + if err != nil { + return err + } + + ecNodeIdToShardCount[emptyNode.info.Id]++ + ecNodeIdToShardCount[fullNode.info.Id]-- + hasMove = true + break + } + if hasMove { + break + } } - break } } } diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go index 1bf3924ff..4a422b7ea 100644 --- a/weed/shell/command_ec_test.go +++ b/weed/shell/command_ec_test.go @@ -285,3 +285,140 @@ func TestCommandEcBalanceMultipleVolumesEvenDistribution(t *testing.T) { t.Logf("Volume %d - Data: %v, Parity: %v", vid, dataPerRack, parityPerRack) } } + +// TestCommandEcBalanceAllNodesShareAllVolumes reproduces the scenario from issue #8793: +// When every node has a shard of every volume, ec.balance was unable to move any shards +// because it skipped volumes that already existed on the target node at the volume level. +func TestCommandEcBalanceAllNodesShareAllVolumes(t *testing.T) { + // 4 nodes, all in same rack, 2 volumes with 14 shards each. + // Distribute shards so every node has shards of both volumes, but unevenly: + // dn1: vol1 shards 0-4, vol2 shards 0-4 => 10 shards + // dn2: vol1 shards 5-9, vol2 shards 5-9 => 10 shards + // dn3: vol1 shards 10-12, vol2 shards 10-12 => 6 shards + // dn4: vol1 shard 13, vol2 shard 13 => 2 shards + // Total: 28 shards, average = 7 per node + ecb := &ecBalancer{ + ecNodes: []*EcNode{ + newEcNode("dc1", "rack1", "dn1", 100). + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4}). + addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4}), + newEcNode("dc1", "rack1", "dn2", 100). + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{5, 6, 7, 8, 9}). + addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{5, 6, 7, 8, 9}), + newEcNode("dc1", "rack1", "dn3", 100). + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{10, 11, 12}). + addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{10, 11, 12}), + newEcNode("dc1", "rack1", "dn4", 100). + addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{13}). + addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{13}), + }, + applyBalancing: false, + diskType: types.HardDriveType, + } + + ecb.balanceEcVolumes("c1") + ecb.balanceEcRacks() + + // Count total shards per node after balancing + for _, node := range ecb.ecNodes { + count := 0 + if diskInfo, found := node.info.DiskInfos[string(types.HardDriveType)]; found { + for _, ecsi := range diskInfo.EcShardInfos { + count += erasure_coding.GetShardCount(ecsi) + } + } + // Average is 7, so all nodes should be at 7 (ceil(28/4) = 7) + if count > 7 { + t.Errorf("node %s has %d shards after balancing, expected at most 7", node.info.Id, count) + } + t.Logf("node %s: %d shards", node.info.Id, count) + } +} + +// TestCommandEcBalanceIssue8793Topology simulates the real cluster from issue #8793: +// 14 nodes (9 with max=80, 5 with max=33), all in one rack, with mixed capacities. +// Each EC volume has 1 shard per node. Nodes have uneven totals (some have extra volumes). +func TestCommandEcBalanceIssue8793Topology(t *testing.T) { + // Simulate 22 EC volumes across 14 nodes (each volume has 14 shards, 1 per node). + // Give nodes 0-3 an extra volume each (vol 23-26, all 14 shards) to create imbalance. + // Before balancing: nodes 0-3 have 22+14=36 shards each, nodes 4-13 have 22 shards each. + // Total = 4*36 + 10*22 = 144+220 = 364. Average = ceil(364/14) = 26. + + type nodeSpec struct { + id string + maxSlot int + } + nodes := []nodeSpec{ + {"192.168.0.12:8332", 80}, {"192.168.0.12:8333", 80}, {"192.168.0.12:8334", 80}, + {"192.168.0.12:8335", 80}, {"192.168.0.12:8336", 80}, {"192.168.0.12:8337", 80}, + {"192.168.0.12:8338", 80}, {"192.168.0.12:8339", 80}, {"192.168.0.12:8340", 80}, + {"192.168.0.12:8341", 33}, {"192.168.0.12:8342", 33}, {"192.168.0.12:8343", 33}, + {"192.168.0.25:8350", 33}, {"192.168.0.25:8351", 33}, + } + + ecNodes := make([]*EcNode, len(nodes)) + for i, ns := range nodes { + ecNodes[i] = newEcNode("home", "center", ns.id, ns.maxSlot) + } + + // 22 shared volumes: each node gets exactly 1 shard (shard i for node i) + for vid := uint32(1); vid <= 22; vid++ { + for i := range ecNodes { + ecNodes[i].addEcVolumeAndShardsForTest(vid, "cldata", []erasure_coding.ShardId{erasure_coding.ShardId(i)}) + } + } + + // 4 extra volumes only on first 4 nodes (all 14 shards each) to create imbalance + for extra := uint32(0); extra < 4; extra++ { + vid := 23 + extra + nodeIdx := int(extra) + allShards := make([]erasure_coding.ShardId, 14) + for s := 0; s < 14; s++ { + allShards[s] = erasure_coding.ShardId(s) + } + ecNodes[nodeIdx].addEcVolumeAndShardsForTest(vid, "cldata", allShards) + } + + ecb := &ecBalancer{ + ecNodes: ecNodes, + applyBalancing: false, + diskType: types.HardDriveType, + } + + // Log initial state + for _, node := range ecb.ecNodes { + count := 0 + if diskInfo, found := node.info.DiskInfos[string(types.HardDriveType)]; found { + for _, ecsi := range diskInfo.EcShardInfos { + count += erasure_coding.GetShardCount(ecsi) + } + } + t.Logf("BEFORE node %s (max %d): %d shards", node.info.Id, node.freeEcSlot+count, count) + } + + ecb.balanceEcVolumes("cldata") + ecb.balanceEcRacks() + + // Verify: no node should exceed the average + totalShards := 0 + shardCounts := make(map[string]int) + for _, node := range ecb.ecNodes { + count := 0 + if diskInfo, found := node.info.DiskInfos[string(types.HardDriveType)]; found { + for _, ecsi := range diskInfo.EcShardInfos { + count += erasure_coding.GetShardCount(ecsi) + } + } + shardCounts[node.info.Id] = count + totalShards += count + } + avg := ceilDivide(totalShards, len(ecNodes)) + + for _, node := range ecb.ecNodes { + count := shardCounts[node.info.Id] + t.Logf("AFTER node %s: %d shards (avg %d)", node.info.Id, count, avg) + if count > avg { + t.Errorf("node %s has %d shards, expected at most %d (avg)", node.info.Id, count, avg) + } + } +}