Browse Source

Allow configuring the maximum number of concurrent tasks for EC parallelization. (#6376)

Follow-up to b0210df0.
pull/6377/head
Lisandro Pin 4 days ago
committed by GitHub
parent
commit
ba0707af64
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 4
      weed/shell/command_ec_balance.go
  2. 69
      weed/shell/command_ec_common.go
  3. 4
      weed/shell/command_ec_encode.go

4
weed/shell/command_ec_balance.go

@ -35,7 +35,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter") dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
shardReplicaPlacement := balanceCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty") shardReplicaPlacement := balanceCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty")
parallelize := balanceCommand.Bool("parallelize", true, "parallelize operations whenever possible")
maxParallelization := balanceCommand.Int("maxParallelization", 10, "run up to X tasks in parallel, whenever possible")
applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan") applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan")
if err = balanceCommand.Parse(args); err != nil { if err = balanceCommand.Parse(args); err != nil {
return nil return nil
@ -62,5 +62,5 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
return err return err
} }
return EcBalance(commandEnv, collections, *dc, rp, *parallelize, *applyBalancing)
return EcBalance(commandEnv, collections, *dc, rp, *maxParallelization, *applyBalancing)
} }

69
weed/shell/command_ec_common.go

@ -114,47 +114,50 @@ var (
) )
type ErrorWaitGroup struct { type ErrorWaitGroup struct {
parallelize bool
maxConcurrency int
wg *sync.WaitGroup wg *sync.WaitGroup
wgSem chan bool
errors []error errors []error
errorsMu sync.Mutex errorsMu sync.Mutex
} }
type ErrorWaitGroupTask func() error type ErrorWaitGroupTask func() error
func (ewg *ErrorWaitGroup) Init() {
if ewg.wg != nil {
return
func NewErrorWaitGroup(maxConcurrency int) *ErrorWaitGroup {
if maxConcurrency <= 0 {
// No concurrency = one task at the time
maxConcurrency = 1
}
return &ErrorWaitGroup{
maxConcurrency: maxConcurrency,
wg: &sync.WaitGroup{},
wgSem: make(chan bool, maxConcurrency),
} }
ewg.wg = &sync.WaitGroup{}
ewg.errors = nil
} }
func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) { func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) {
if ewg.wg == nil || !ewg.parallelize {
if ewg.maxConcurrency <= 1 {
// Keep run order deterministic when parallelization is off
ewg.errors = append(ewg.errors, f()) ewg.errors = append(ewg.errors, f())
return return
} }
ewg.wg.Add(1) ewg.wg.Add(1)
go func() { go func() {
ewg.wgSem <- true
err := f() err := f()
ewg.errorsMu.Lock() ewg.errorsMu.Lock()
ewg.errors = append(ewg.errors, err) ewg.errors = append(ewg.errors, err)
ewg.errorsMu.Unlock() ewg.errorsMu.Unlock()
<-ewg.wgSem
ewg.wg.Done() ewg.wg.Done()
}() }()
} }
func (ewg *ErrorWaitGroup) Wait() error { func (ewg *ErrorWaitGroup) Wait() error {
if ewg.wg != nil {
ewg.wg.Wait() ewg.wg.Wait()
}
err := errors.Join(ewg.errors...)
ewg.wg = nil
ewg.errors = nil
return err
return errors.Join(ewg.errors...)
} }
func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) { func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
@ -601,8 +604,11 @@ type ecBalancer struct {
ecNodes []*EcNode ecNodes []*EcNode
replicaPlacement *super_block.ReplicaPlacement replicaPlacement *super_block.ReplicaPlacement
applyBalancing bool applyBalancing bool
maxParallelization int
}
ewg ErrorWaitGroup
func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup {
return NewErrorWaitGroup(ecb.maxParallelization)
} }
func (ecb *ecBalancer) racks() map[RackId]*EcRack { func (ecb *ecBalancer) racks() map[RackId]*EcRack {
@ -641,13 +647,13 @@ func (ecb *ecBalancer) balanceEcVolumes(collection string) error {
func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error { func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error {
vidLocations := ecb.collectVolumeIdToEcNodes(collection) vidLocations := ecb.collectVolumeIdToEcNodes(collection)
ecb.ewg.Init()
ewg := ecb.errorWaitGroup()
for vid, locations := range vidLocations { for vid, locations := range vidLocations {
ecb.ewg.Add(func() error {
ewg.Add(func() error {
return ecb.doDeduplicateEcShards(collection, vid, locations) return ecb.doDeduplicateEcShards(collection, vid, locations)
}) })
} }
return ecb.ewg.Wait()
return ewg.Wait()
} }
func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error { func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error {
@ -688,13 +694,13 @@ func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
vidLocations := ecb.collectVolumeIdToEcNodes(collection) vidLocations := ecb.collectVolumeIdToEcNodes(collection)
// spread the ec shards evenly // spread the ec shards evenly
ecb.ewg.Init()
ewg := ecb.errorWaitGroup()
for vid, locations := range vidLocations { for vid, locations := range vidLocations {
ecb.ewg.Add(func() error {
ewg.Add(func() error {
return ecb.doBalanceEcShardsAcrossRacks(collection, vid, locations) return ecb.doBalanceEcShardsAcrossRacks(collection, vid, locations)
}) })
} }
return ecb.ewg.Wait()
return ewg.Wait()
} }
func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int { func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
@ -796,7 +802,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
racks := ecb.racks() racks := ecb.racks()
// spread the ec shards evenly // spread the ec shards evenly
ecb.ewg.Init()
ewg := ecb.errorWaitGroup()
for vid, locations := range vidLocations { for vid, locations := range vidLocations {
// see the volume's shards are in how many racks, and how many in each rack // see the volume's shards are in how many racks, and how many in each rack
@ -815,12 +821,12 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
} }
sourceEcNodes := rackEcNodesWithVid[rackId] sourceEcNodes := rackEcNodesWithVid[rackId]
averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes)) averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
ecb.ewg.Add(func() error {
ewg.Add(func() error {
return ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes) return ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes)
}) })
} }
} }
return ecb.ewg.Wait()
return ewg.Wait()
} }
func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error { func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error {
@ -851,13 +857,13 @@ func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int
func (ecb *ecBalancer) balanceEcRacks() error { func (ecb *ecBalancer) balanceEcRacks() error {
// balance one rack for all ec shards // balance one rack for all ec shards
ecb.ewg.Init()
ewg := ecb.errorWaitGroup()
for _, ecRack := range ecb.racks() { for _, ecRack := range ecb.racks() {
ecb.ewg.Add(func() error {
ewg.Add(func() error {
return ecb.doBalanceEcRack(ecRack) return ecb.doBalanceEcRack(ecRack)
}) })
} }
return ecb.ewg.Wait()
return ewg.Wait()
} }
func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
@ -1052,7 +1058,7 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo
return vidLocations return vidLocations
} }
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, parallelize bool, applyBalancing bool) (err error) {
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, maxParallelization int, applyBalancing bool) (err error) {
if len(collections) == 0 { if len(collections) == 0 {
return fmt.Errorf("no collections to balance") return fmt.Errorf("no collections to balance")
} }
@ -1071,10 +1077,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic
ecNodes: allEcNodes, ecNodes: allEcNodes,
replicaPlacement: ecReplicaPlacement, replicaPlacement: ecReplicaPlacement,
applyBalancing: applyBalancing, applyBalancing: applyBalancing,
ewg: ErrorWaitGroup{
parallelize: parallelize,
},
maxParallelization: maxParallelization,
} }
for _, c := range collections { for _, c := range collections {

4
weed/shell/command_ec_encode.go

@ -65,7 +65,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
collection := encodeCommand.String("collection", "", "the collection name") collection := encodeCommand.String("collection", "", "the collection name")
fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period") quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period")
parallelize := encodeCommand.Bool("parallelize", true, "parallelize operations whenever possible")
maxParallelization := encodeCommand.Int("maxParallelization", 10, "run up to X tasks in parallel, whenever possible")
forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes") forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes")
shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty") shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty")
applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation") applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation")
@ -124,7 +124,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
} }
} }
// ...then re-balance ec shards. // ...then re-balance ec shards.
if err := EcBalance(commandEnv, collections, "", rp, *parallelize, *applyBalancing); err != nil {
if err := EcBalance(commandEnv, collections, "", rp, *maxParallelization, *applyBalancing); err != nil {
return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", collections, err) return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", collections, err)
} }

Loading…
Cancel
Save