From e04cad789166880bf02b4bee6db9cf3596c787c5 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 12 Nov 2025 14:47:16 -0800 Subject: [PATCH] node.freeEcSlot >= slotsNeeded --- weed/shell/command_ec_rebuild.go | 80 +++++++++++++++++++++------ weed/shell/command_ec_rebuild_test.go | 72 +----------------------- 2 files changed, 64 insertions(+), 88 deletions(-) diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index ad31bc676..4d2e2a37b 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -130,23 +130,61 @@ func (erb *ecRebuilder) isLocked() bool { return erb.commandEnv.isLocked() } -// ecNodeWithMoreFreeSlots returns the EC node with higher free slot count, from all nodes visible to the rebuilder, and its free slot count. -func (erb *ecRebuilder) ecNodeWithMoreFreeSlots() (*EcNode, int) { +// countLocalShards returns the number of shards already present locally on the node for the given volume. +func (erb *ecRebuilder) countLocalShards(node *EcNode, collection string, volumeId needle.VolumeId) int { + for _, diskInfo := range node.info.DiskInfos { + for _, ecShardInfo := range diskInfo.EcShardInfos { + if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId { + shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + return len(shardBits.ShardIds()) + } + } + } + return 0 +} + +// selectAndReserveRebuilder atomically selects a rebuilder node with sufficient free slots +// and reserves slots only for the non-local shards that need to be copied/generated. +func (erb *ecRebuilder) selectAndReserveRebuilder(collection string, volumeId needle.VolumeId) (*EcNode, int, error) { erb.ecNodesMu.Lock() defer erb.ecNodesMu.Unlock() if len(erb.ecNodes) == 0 { - return nil, 0 + return nil, 0, fmt.Errorf("no ec nodes available") } - res := erb.ecNodes[0] - for i := 1; i < len(erb.ecNodes); i++ { - if erb.ecNodes[i].freeEcSlot > res.freeEcSlot { - res = erb.ecNodes[i] + // Find the node with the most free slots, considering local shards + var bestNode *EcNode + var bestSlotsNeeded int + for _, node := range erb.ecNodes { + localShards := erb.countLocalShards(node, collection, volumeId) + slotsNeeded := erasure_coding.TotalShardsCount - localShards + + if node.freeEcSlot >= slotsNeeded { + if bestNode == nil || node.freeEcSlot > bestNode.freeEcSlot { + bestNode = node + bestSlotsNeeded = slotsNeeded + } } } - return res, res.freeEcSlot + if bestNode == nil { + return nil, 0, fmt.Errorf("no node has sufficient free slots") + } + + // Reserve slots only for non-local shards + bestNode.freeEcSlot -= bestSlotsNeeded + + return bestNode, bestSlotsNeeded, nil +} + +// releaseRebuilder releases the reserved slots back to the rebuilder node. +func (erb *ecRebuilder) releaseRebuilder(node *EcNode, slotsToRelease int) { + erb.ecNodesMu.Lock() + defer erb.ecNodesMu.Unlock() + + // Release slots by incrementing the free slot count + node.freeEcSlot += slotsToRelease } func (erb *ecRebuilder) rebuildEcVolumes(collection string) { @@ -158,27 +196,33 @@ func (erb *ecRebuilder) rebuildEcVolumes(collection string) { ecShardMap.registerEcNode(ecNode, collection) } - rebuilder, freeSlots := erb.ecNodeWithMoreFreeSlots() - if freeSlots < erasure_coding.TotalShardsCount { - erb.ewg.Add(func() error { - return fmt.Errorf("disk space is not enough") - }) - return - } - for vid, locations := range ecShardMap { shardCount := locations.shardCount() if shardCount == erasure_coding.TotalShardsCount { continue } if shardCount < erasure_coding.DataShardsCount { + // Capture variables for closure + vid := vid + shardCount := shardCount erb.ewg.Add(func() error { - return fmt.Errorf("ec volume %d is unrepairable with %d shards\n", vid, shardCount) + return fmt.Errorf("ec volume %d is unrepairable with %d shards", vid, shardCount) }) - return + continue } + // Capture variables for closure + vid := vid + locations := locations + erb.ewg.Add(func() error { + // Select rebuilder and reserve slots atomically per volume + rebuilder, slotsReserved, err := erb.selectAndReserveRebuilder(collection, vid) + if err != nil { + return fmt.Errorf("failed to select rebuilder for volume %d: %v", vid, err) + } + defer erb.releaseRebuilder(rebuilder, slotsReserved) + return erb.rebuildOneEcVolume(collection, vid, locations, rebuilder) }) } diff --git a/weed/shell/command_ec_rebuild_test.go b/weed/shell/command_ec_rebuild_test.go index 7543b008c..e7e86bdb1 100644 --- a/weed/shell/command_ec_rebuild_test.go +++ b/weed/shell/command_ec_rebuild_test.go @@ -79,74 +79,6 @@ func TestEcShardMapShardCount(t *testing.T) { } } -// TestEcRebuilderEcNodeWithMoreFreeSlots tests the free slot selection -func TestEcRebuilderEcNodeWithMoreFreeSlots(t *testing.T) { - testCases := []struct { - name string - nodes []*EcNode - expectedNode string - }{ - { - name: "single node", - nodes: []*EcNode{ - newEcNode("dc1", "rack1", "node1", 100), - }, - expectedNode: "node1", - }, - { - name: "multiple nodes - select highest", - nodes: []*EcNode{ - newEcNode("dc1", "rack1", "node1", 50), - newEcNode("dc1", "rack1", "node2", 150), - newEcNode("dc1", "rack1", "node3", 100), - }, - expectedNode: "node2", - }, - { - name: "multiple nodes - same slots", - nodes: []*EcNode{ - newEcNode("dc1", "rack1", "node1", 100), - newEcNode("dc1", "rack1", "node2", 100), - }, - expectedNode: "node1", // Should return first one - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - erb := &ecRebuilder{ - ecNodes: tc.nodes, - } - - node, freeEcSlots := erb.ecNodeWithMoreFreeSlots() - if node == nil { - t.Fatal("Expected a node, got nil") - } - if node.info.Id != tc.expectedNode { - t.Errorf("Expected node %s, got %s", tc.expectedNode, node.info.Id) - } - if node.freeEcSlot != freeEcSlots { - t.Errorf("Expected node with %d free EC slots, got %d", freeEcSlots, node.freeEcSlot) - } - }) - } -} - -// TestEcRebuilderEcNodeWithMoreFreeSlotsEmpty tests empty node list -func TestEcRebuilderEcNodeWithMoreFreeSlotsEmpty(t *testing.T) { - erb := &ecRebuilder{ - ecNodes: []*EcNode{}, - } - - node, freeEcSlots := erb.ecNodeWithMoreFreeSlots() - if node != nil { - t.Errorf("Expected nil for empty node list, got %v", node) - } - if freeEcSlots != 0 { - t.Errorf("Expected no free EC slots, got %d", freeEcSlots) - } -} - // TestRebuildEcVolumesInsufficientShards tests error handling for unrepairable volumes func TestRebuildEcVolumesInsufficientShards(t *testing.T) { var logBuffer bytes.Buffer @@ -231,8 +163,8 @@ func TestRebuildEcVolumesInsufficientSpace(t *testing.T) { if err == nil { t.Fatal("Expected error for insufficient disk space, got nil") } - if !strings.Contains(err.Error(), "disk space is not enough") { - t.Errorf("Expected 'disk space' in error message, got: %s", err.Error()) + if !strings.Contains(err.Error(), "no node has sufficient free slots") { + t.Errorf("Expected 'no node has sufficient free slots' in error message, got: %s", err.Error()) } }