From ba1d82db90235fb7ee589bce94cd575e2a16e26e Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Mon, 12 May 2025 23:38:55 +0200 Subject: [PATCH] Move `shell.ErrorWaitGroup` into a common file, to cleanly reuse across `weed shell` commands. (#6780) Move `shell.ErrorWaitGroup` into a dedicated common file, to cleanly reuse across `weed shell` commands. --- weed/shell/command_ec_common.go | 56 ---------------------------- weed/shell/common.go | 66 +++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 56 deletions(-) create mode 100644 weed/shell/common.go diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 209d8a733..a6f27232e 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -7,7 +7,6 @@ import ( "math/rand/v2" "slices" "sort" - "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -113,61 +112,6 @@ var ( getDefaultReplicaPlacement = _getDefaultReplicaPlacement ) -type ErrorWaitGroup struct { - maxConcurrency int - wg *sync.WaitGroup - wgSem chan bool - errors []error - errorsMu sync.Mutex -} -type ErrorWaitGroupTask func() error - -func NewErrorWaitGroup(maxConcurrency int) *ErrorWaitGroup { - if maxConcurrency <= 0 { - // No concurrency = one task at the time - maxConcurrency = 1 - } - return &ErrorWaitGroup{ - maxConcurrency: maxConcurrency, - wg: &sync.WaitGroup{}, - wgSem: make(chan bool, maxConcurrency), - } -} - -func (ewg *ErrorWaitGroup) Reset() { - close(ewg.wgSem) - - ewg.wg = &sync.WaitGroup{} - ewg.wgSem = make(chan bool, ewg.maxConcurrency) - ewg.errors = nil -} - -func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) { - if ewg.maxConcurrency <= 1 { - // Keep run order deterministic when parallelization is off - ewg.errors = append(ewg.errors, f()) - return - } - - ewg.wg.Add(1) - go func() { - ewg.wgSem <- true - - err := f() - ewg.errorsMu.Lock() - ewg.errors = append(ewg.errors, err) - ewg.errorsMu.Unlock() - - <-ewg.wgSem - ewg.wg.Done() - }() -} - -func (ewg *ErrorWaitGroup) Wait() error { - ewg.wg.Wait() - return errors.Join(ewg.errors...) -} - func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) { var resp *master_pb.GetMasterConfigurationResponse var err error diff --git a/weed/shell/common.go b/weed/shell/common.go new file mode 100644 index 000000000..9fe190b31 --- /dev/null +++ b/weed/shell/common.go @@ -0,0 +1,66 @@ +package shell + +import ( + "errors" + "sync" +) + +// ErrorWaitGroup implements a goroutine wait group which aggregates errors, if any. +type ErrorWaitGroup struct { + maxConcurrency int + wg *sync.WaitGroup + wgSem chan bool + errors []error + errorsMu sync.Mutex +} + +type ErrorWaitGroupTask func() error + +func NewErrorWaitGroup(maxConcurrency int) *ErrorWaitGroup { + if maxConcurrency <= 0 { + // no concurrency = one task at the time + maxConcurrency = 1 + } + return &ErrorWaitGroup{ + maxConcurrency: maxConcurrency, + wg: &sync.WaitGroup{}, + wgSem: make(chan bool, maxConcurrency), + } +} + +// Reset restarts an ErrorWaitGroup, keeping original settings. Errors and pending goroutines, if any, are flushed. +func (ewg *ErrorWaitGroup) Reset() { + close(ewg.wgSem) + + ewg.wg = &sync.WaitGroup{} + ewg.wgSem = make(chan bool, ewg.maxConcurrency) + ewg.errors = nil +} + +// Add queues an ErrorWaitGroupTask to be executed as a goroutine. +func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) { + if ewg.maxConcurrency <= 1 { + // keep run order deterministic when parallelization is off + ewg.errors = append(ewg.errors, f()) + return + } + + ewg.wg.Add(1) + go func() { + ewg.wgSem <- true + + err := f() + ewg.errorsMu.Lock() + ewg.errors = append(ewg.errors, err) + ewg.errorsMu.Unlock() + + <-ewg.wgSem + ewg.wg.Done() + }() +} + +// Wait sleeps until all ErrorWaitGroupTasks are completed, then returns errors for them. +func (ewg *ErrorWaitGroup) Wait() error { + ewg.wg.Wait() + return errors.Join(ewg.errors...) +}