diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 31782b279..08bbd2a22 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -35,7 +35,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter") shardReplicaPlacement := balanceCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty") - parallelize := balanceCommand.Bool("parallelize", true, "parallelize operations whenever possible") + maxParallelization := balanceCommand.Int("maxParallelization", 10, "run up to X tasks in parallel, whenever possible") applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan") if err = balanceCommand.Parse(args); err != nil { return nil @@ -62,5 +62,5 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W return err } - return EcBalance(commandEnv, collections, *dc, rp, *parallelize, *applyBalancing) + return EcBalance(commandEnv, collections, *dc, rp, *maxParallelization, *applyBalancing) } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index a09e2ad62..e4629905e 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -114,47 +114,50 @@ var ( ) type ErrorWaitGroup struct { - parallelize bool - wg *sync.WaitGroup - errors []error - errorsMu sync.Mutex + maxConcurrency int + wg *sync.WaitGroup + wgSem chan bool + errors []error + errorsMu sync.Mutex } type ErrorWaitGroupTask func() error -func (ewg *ErrorWaitGroup) Init() { - if ewg.wg != nil { - return +func NewErrorWaitGroup(maxConcurrency int) *ErrorWaitGroup { + if maxConcurrency <= 0 { + // No concurrency = one task at the time + maxConcurrency = 1 + } + return &ErrorWaitGroup{ + maxConcurrency: maxConcurrency, + wg: &sync.WaitGroup{}, + wgSem: make(chan bool, maxConcurrency), } - ewg.wg = &sync.WaitGroup{} - ewg.errors = nil } func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) { - if ewg.wg == nil || !ewg.parallelize { + if ewg.maxConcurrency <= 1 { + // Keep run order deterministic when parallelization is off ewg.errors = append(ewg.errors, f()) return } ewg.wg.Add(1) go func() { + ewg.wgSem <- true + err := f() ewg.errorsMu.Lock() ewg.errors = append(ewg.errors, err) ewg.errorsMu.Unlock() + + <-ewg.wgSem ewg.wg.Done() }() } func (ewg *ErrorWaitGroup) Wait() error { - if ewg.wg != nil { - ewg.wg.Wait() - } - - err := errors.Join(ewg.errors...) - ewg.wg = nil - ewg.errors = nil - - return err + ewg.wg.Wait() + return errors.Join(ewg.errors...) } func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) { @@ -597,12 +600,15 @@ func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string] } type ecBalancer struct { - commandEnv *CommandEnv - ecNodes []*EcNode - replicaPlacement *super_block.ReplicaPlacement - applyBalancing bool + commandEnv *CommandEnv + ecNodes []*EcNode + replicaPlacement *super_block.ReplicaPlacement + applyBalancing bool + maxParallelization int +} - ewg ErrorWaitGroup +func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup { + return NewErrorWaitGroup(ecb.maxParallelization) } func (ecb *ecBalancer) racks() map[RackId]*EcRack { @@ -641,13 +647,13 @@ func (ecb *ecBalancer) balanceEcVolumes(collection string) error { func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error { vidLocations := ecb.collectVolumeIdToEcNodes(collection) - ecb.ewg.Init() + ewg := ecb.errorWaitGroup() for vid, locations := range vidLocations { - ecb.ewg.Add(func() error { + ewg.Add(func() error { return ecb.doDeduplicateEcShards(collection, vid, locations) }) } - return ecb.ewg.Wait() + return ewg.Wait() } func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error { @@ -688,13 +694,13 @@ func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error { vidLocations := ecb.collectVolumeIdToEcNodes(collection) // spread the ec shards evenly - ecb.ewg.Init() + ewg := ecb.errorWaitGroup() for vid, locations := range vidLocations { - ecb.ewg.Add(func() error { + ewg.Add(func() error { return ecb.doBalanceEcShardsAcrossRacks(collection, vid, locations) }) } - return ecb.ewg.Wait() + return ewg.Wait() } func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int { @@ -796,7 +802,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { racks := ecb.racks() // spread the ec shards evenly - ecb.ewg.Init() + ewg := ecb.errorWaitGroup() for vid, locations := range vidLocations { // see the volume's shards are in how many racks, and how many in each rack @@ -815,12 +821,12 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { } sourceEcNodes := rackEcNodesWithVid[rackId] averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes)) - ecb.ewg.Add(func() error { + ewg.Add(func() error { return ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes) }) } } - return ecb.ewg.Wait() + return ewg.Wait() } func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error { @@ -851,13 +857,13 @@ func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int func (ecb *ecBalancer) balanceEcRacks() error { // balance one rack for all ec shards - ecb.ewg.Init() + ewg := ecb.errorWaitGroup() for _, ecRack := range ecb.racks() { - ecb.ewg.Add(func() error { + ewg.Add(func() error { return ecb.doBalanceEcRack(ecRack) }) } - return ecb.ewg.Wait() + return ewg.Wait() } func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { @@ -1052,7 +1058,7 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo return vidLocations } -func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, parallelize bool, applyBalancing bool) (err error) { +func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, maxParallelization int, applyBalancing bool) (err error) { if len(collections) == 0 { return fmt.Errorf("no collections to balance") } @@ -1067,14 +1073,11 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic } ecb := &ecBalancer{ - commandEnv: commandEnv, - ecNodes: allEcNodes, - replicaPlacement: ecReplicaPlacement, - applyBalancing: applyBalancing, - - ewg: ErrorWaitGroup{ - parallelize: parallelize, - }, + commandEnv: commandEnv, + ecNodes: allEcNodes, + replicaPlacement: ecReplicaPlacement, + applyBalancing: applyBalancing, + maxParallelization: maxParallelization, } for _, c := range collections { diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 2b35c5c79..2c1625ea7 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -65,7 +65,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr collection := encodeCommand.String("collection", "", "the collection name") fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period") - parallelize := encodeCommand.Bool("parallelize", true, "parallelize operations whenever possible") + maxParallelization := encodeCommand.Int("maxParallelization", 10, "run up to X tasks in parallel, whenever possible") forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes") shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty") applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation") @@ -124,7 +124,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr } } // ...then re-balance ec shards. - if err := EcBalance(commandEnv, collections, "", rp, *parallelize, *applyBalancing); err != nil { + if err := EcBalance(commandEnv, collections, "", rp, *maxParallelization, *applyBalancing); err != nil { return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", collections, err) }