Browse Source

Rework `shell.EcBalance()`'s waitgroup code into a standalone type. (#6373)

Rework `shell.EcBalance()`'s waitgroup with errors code into a standalone type.

We'll re-use this for other EC jobs - for example, volume creation. Also fixes
potential concurrency issues when collecting error results.
pull/6374/head
Lisandro Pin 5 days ago
committed by GitHub
parent
commit
9fbc4ea417
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 115
      weed/shell/command_ec_common.go

115
weed/shell/command_ec_common.go

@ -113,6 +113,50 @@ var (
getDefaultReplicaPlacement = _getDefaultReplicaPlacement
)
type ErrorWaitGroup struct {
parallelize bool
wg *sync.WaitGroup
errors []error
errorsMu sync.Mutex
}
type ErrorWaitGroupTask func() error
func (ewg *ErrorWaitGroup) Init() {
if ewg.wg != nil {
return
}
ewg.wg = &sync.WaitGroup{}
ewg.errors = nil
}
func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) {
if ewg.wg == nil || !ewg.parallelize {
ewg.errors = append(ewg.errors, f())
return
}
ewg.wg.Add(1)
go func() {
err := f()
ewg.errorsMu.Lock()
ewg.errors = append(ewg.errors, err)
ewg.errorsMu.Unlock()
ewg.wg.Done()
}()
}
func (ewg *ErrorWaitGroup) Wait() error {
if ewg.wg != nil {
ewg.wg.Wait()
}
err := errors.Join(ewg.errors...)
ewg.wg = nil
ewg.errors = nil
return err
}
func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
var resp *master_pb.GetMasterConfigurationResponse
var err error
@ -557,48 +601,8 @@ type ecBalancer struct {
ecNodes []*EcNode
replicaPlacement *super_block.ReplicaPlacement
applyBalancing bool
parallelize bool
wg *sync.WaitGroup
wgErrors []error
}
type ecBalancerTask func() error
func (ecb *ecBalancer) wgInit() {
if ecb.wg != nil {
return
}
ecb.wg = &sync.WaitGroup{}
ecb.wgErrors = nil
}
func (ecb *ecBalancer) wgAdd(f ecBalancerTask) {
if ecb.wg == nil || !ecb.parallelize {
if err := f(); err != nil {
ecb.wgErrors = append(ecb.wgErrors, err)
}
return
}
ecb.wg.Add(1)
go func() {
if err := f(); err != nil {
ecb.wgErrors = append(ecb.wgErrors, err)
}
ecb.wg.Done()
}()
}
func (ecb *ecBalancer) wgWait() error {
if ecb.wg != nil {
ecb.wg.Wait()
}
err := errors.Join(ecb.wgErrors...)
ecb.wg = nil
ecb.wgErrors = nil
return err
ewg ErrorWaitGroup
}
func (ecb *ecBalancer) racks() map[RackId]*EcRack {
@ -637,13 +641,13 @@ func (ecb *ecBalancer) balanceEcVolumes(collection string) error {
func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error {
vidLocations := ecb.collectVolumeIdToEcNodes(collection)
ecb.wgInit()
ecb.ewg.Init()
for vid, locations := range vidLocations {
ecb.wgAdd(func() error {
ecb.ewg.Add(func() error {
return ecb.doDeduplicateEcShards(collection, vid, locations)
})
}
return ecb.wgWait()
return ecb.ewg.Wait()
}
func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error {
@ -684,13 +688,13 @@ func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
vidLocations := ecb.collectVolumeIdToEcNodes(collection)
// spread the ec shards evenly
ecb.wgInit()
ecb.ewg.Init()
for vid, locations := range vidLocations {
ecb.wgAdd(func() error {
ecb.ewg.Add(func() error {
return ecb.doBalanceEcShardsAcrossRacks(collection, vid, locations)
})
}
return ecb.wgWait()
return ecb.ewg.Wait()
}
func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
@ -792,7 +796,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
racks := ecb.racks()
// spread the ec shards evenly
ecb.wgInit()
ecb.ewg.Init()
for vid, locations := range vidLocations {
// see the volume's shards are in how many racks, and how many in each rack
@ -811,12 +815,12 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
}
sourceEcNodes := rackEcNodesWithVid[rackId]
averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
ecb.wgAdd(func() error {
ecb.ewg.Add(func() error {
return ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes)
})
}
}
return ecb.wgWait()
return ecb.ewg.Wait()
}
func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error {
@ -847,13 +851,13 @@ func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int
func (ecb *ecBalancer) balanceEcRacks() error {
// balance one rack for all ec shards
ecb.wgInit()
ecb.ewg.Init()
for _, ecRack := range ecb.racks() {
ecb.wgAdd(func() error {
ecb.ewg.Add(func() error {
return ecb.doBalanceEcRack(ecRack)
})
}
return ecb.wgWait()
return ecb.ewg.Wait()
}
func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
@ -1067,7 +1071,10 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic
ecNodes: allEcNodes,
replicaPlacement: ecReplicaPlacement,
applyBalancing: applyBalancing,
parallelize: parallelize,
ewg: ErrorWaitGroup{
parallelize: parallelize,
},
}
for _, c := range collections {

Loading…
Cancel
Save