|
|
@ -130,23 +130,61 @@ func (erb *ecRebuilder) isLocked() bool { |
|
|
return erb.commandEnv.isLocked() |
|
|
return erb.commandEnv.isLocked() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// ecNodeWithMoreFreeSlots returns the EC node with higher free slot count, from all nodes visible to the rebuilder, and its free slot count.
|
|
|
|
|
|
func (erb *ecRebuilder) ecNodeWithMoreFreeSlots() (*EcNode, int) { |
|
|
|
|
|
|
|
|
// countLocalShards returns the number of shards already present locally on the node for the given volume.
|
|
|
|
|
|
func (erb *ecRebuilder) countLocalShards(node *EcNode, collection string, volumeId needle.VolumeId) int { |
|
|
|
|
|
for _, diskInfo := range node.info.DiskInfos { |
|
|
|
|
|
for _, ecShardInfo := range diskInfo.EcShardInfos { |
|
|
|
|
|
if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId { |
|
|
|
|
|
shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) |
|
|
|
|
|
return len(shardBits.ShardIds()) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
return 0 |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// selectAndReserveRebuilder atomically selects a rebuilder node with sufficient free slots
|
|
|
|
|
|
// and reserves slots only for the non-local shards that need to be copied/generated.
|
|
|
|
|
|
func (erb *ecRebuilder) selectAndReserveRebuilder(collection string, volumeId needle.VolumeId) (*EcNode, int, error) { |
|
|
erb.ecNodesMu.Lock() |
|
|
erb.ecNodesMu.Lock() |
|
|
defer erb.ecNodesMu.Unlock() |
|
|
defer erb.ecNodesMu.Unlock() |
|
|
|
|
|
|
|
|
if len(erb.ecNodes) == 0 { |
|
|
if len(erb.ecNodes) == 0 { |
|
|
return nil, 0 |
|
|
|
|
|
|
|
|
return nil, 0, fmt.Errorf("no ec nodes available") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
res := erb.ecNodes[0] |
|
|
|
|
|
for i := 1; i < len(erb.ecNodes); i++ { |
|
|
|
|
|
if erb.ecNodes[i].freeEcSlot > res.freeEcSlot { |
|
|
|
|
|
res = erb.ecNodes[i] |
|
|
|
|
|
|
|
|
// Find the node with the most free slots, considering local shards
|
|
|
|
|
|
var bestNode *EcNode |
|
|
|
|
|
var bestSlotsNeeded int |
|
|
|
|
|
for _, node := range erb.ecNodes { |
|
|
|
|
|
localShards := erb.countLocalShards(node, collection, volumeId) |
|
|
|
|
|
slotsNeeded := erasure_coding.TotalShardsCount - localShards |
|
|
|
|
|
|
|
|
|
|
|
if node.freeEcSlot >= slotsNeeded { |
|
|
|
|
|
if bestNode == nil || node.freeEcSlot > bestNode.freeEcSlot { |
|
|
|
|
|
bestNode = node |
|
|
|
|
|
bestSlotsNeeded = slotsNeeded |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
return res, res.freeEcSlot |
|
|
|
|
|
|
|
|
if bestNode == nil { |
|
|
|
|
|
return nil, 0, fmt.Errorf("no node has sufficient free slots") |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Reserve slots only for non-local shards
|
|
|
|
|
|
bestNode.freeEcSlot -= bestSlotsNeeded |
|
|
|
|
|
|
|
|
|
|
|
return bestNode, bestSlotsNeeded, nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// releaseRebuilder releases the reserved slots back to the rebuilder node.
|
|
|
|
|
|
func (erb *ecRebuilder) releaseRebuilder(node *EcNode, slotsToRelease int) { |
|
|
|
|
|
erb.ecNodesMu.Lock() |
|
|
|
|
|
defer erb.ecNodesMu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
// Release slots by incrementing the free slot count
|
|
|
|
|
|
node.freeEcSlot += slotsToRelease |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (erb *ecRebuilder) rebuildEcVolumes(collection string) { |
|
|
func (erb *ecRebuilder) rebuildEcVolumes(collection string) { |
|
|
@ -158,27 +196,33 @@ func (erb *ecRebuilder) rebuildEcVolumes(collection string) { |
|
|
ecShardMap.registerEcNode(ecNode, collection) |
|
|
ecShardMap.registerEcNode(ecNode, collection) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
rebuilder, freeSlots := erb.ecNodeWithMoreFreeSlots() |
|
|
|
|
|
if freeSlots < erasure_coding.TotalShardsCount { |
|
|
|
|
|
erb.ewg.Add(func() error { |
|
|
|
|
|
return fmt.Errorf("disk space is not enough") |
|
|
|
|
|
}) |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
for vid, locations := range ecShardMap { |
|
|
for vid, locations := range ecShardMap { |
|
|
shardCount := locations.shardCount() |
|
|
shardCount := locations.shardCount() |
|
|
if shardCount == erasure_coding.TotalShardsCount { |
|
|
if shardCount == erasure_coding.TotalShardsCount { |
|
|
continue |
|
|
continue |
|
|
} |
|
|
} |
|
|
if shardCount < erasure_coding.DataShardsCount { |
|
|
if shardCount < erasure_coding.DataShardsCount { |
|
|
|
|
|
// Capture variables for closure
|
|
|
|
|
|
vid := vid |
|
|
|
|
|
shardCount := shardCount |
|
|
erb.ewg.Add(func() error { |
|
|
erb.ewg.Add(func() error { |
|
|
return fmt.Errorf("ec volume %d is unrepairable with %d shards\n", vid, shardCount) |
|
|
|
|
|
|
|
|
return fmt.Errorf("ec volume %d is unrepairable with %d shards", vid, shardCount) |
|
|
}) |
|
|
}) |
|
|
return |
|
|
|
|
|
|
|
|
continue |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Capture variables for closure
|
|
|
|
|
|
vid := vid |
|
|
|
|
|
locations := locations |
|
|
|
|
|
|
|
|
erb.ewg.Add(func() error { |
|
|
erb.ewg.Add(func() error { |
|
|
|
|
|
// Select rebuilder and reserve slots atomically per volume
|
|
|
|
|
|
rebuilder, slotsReserved, err := erb.selectAndReserveRebuilder(collection, vid) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("failed to select rebuilder for volume %d: %v", vid, err) |
|
|
|
|
|
} |
|
|
|
|
|
defer erb.releaseRebuilder(rebuilder, slotsReserved) |
|
|
|
|
|
|
|
|
return erb.rebuildOneEcVolume(collection, vid, locations, rebuilder) |
|
|
return erb.rebuildOneEcVolume(collection, vid, locations, rebuilder) |
|
|
}) |
|
|
}) |
|
|
} |
|
|
} |
|
|
|