Browse Source

Move `shell.ErrorWaitGroup` into a common file, to cleanly reuse across `weed shell` commands. (#6780)

Move `shell.ErrorWaitGroup` into a dedicated common file, to cleanly reuse across `weed shell` commands.
master
Lisandro Pin 22 hours ago
committed by GitHub
parent
commit
ba1d82db90
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 56
      weed/shell/command_ec_common.go
  2. 66
      weed/shell/common.go

56
weed/shell/command_ec_common.go

@ -7,7 +7,6 @@ import (
"math/rand/v2" "math/rand/v2"
"slices" "slices"
"sort" "sort"
"sync"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
@ -113,61 +112,6 @@ var (
getDefaultReplicaPlacement = _getDefaultReplicaPlacement getDefaultReplicaPlacement = _getDefaultReplicaPlacement
) )
type ErrorWaitGroup struct {
maxConcurrency int
wg *sync.WaitGroup
wgSem chan bool
errors []error
errorsMu sync.Mutex
}
type ErrorWaitGroupTask func() error
func NewErrorWaitGroup(maxConcurrency int) *ErrorWaitGroup {
if maxConcurrency <= 0 {
// No concurrency = one task at the time
maxConcurrency = 1
}
return &ErrorWaitGroup{
maxConcurrency: maxConcurrency,
wg: &sync.WaitGroup{},
wgSem: make(chan bool, maxConcurrency),
}
}
func (ewg *ErrorWaitGroup) Reset() {
close(ewg.wgSem)
ewg.wg = &sync.WaitGroup{}
ewg.wgSem = make(chan bool, ewg.maxConcurrency)
ewg.errors = nil
}
func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) {
if ewg.maxConcurrency <= 1 {
// Keep run order deterministic when parallelization is off
ewg.errors = append(ewg.errors, f())
return
}
ewg.wg.Add(1)
go func() {
ewg.wgSem <- true
err := f()
ewg.errorsMu.Lock()
ewg.errors = append(ewg.errors, err)
ewg.errorsMu.Unlock()
<-ewg.wgSem
ewg.wg.Done()
}()
}
func (ewg *ErrorWaitGroup) Wait() error {
ewg.wg.Wait()
return errors.Join(ewg.errors...)
}
func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) { func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
var resp *master_pb.GetMasterConfigurationResponse var resp *master_pb.GetMasterConfigurationResponse
var err error var err error

66
weed/shell/common.go

@ -0,0 +1,66 @@
package shell
import (
"errors"
"sync"
)
// ErrorWaitGroup implements a goroutine wait group which aggregates errors, if any.
type ErrorWaitGroup struct {
maxConcurrency int
wg *sync.WaitGroup
wgSem chan bool
errors []error
errorsMu sync.Mutex
}
type ErrorWaitGroupTask func() error
func NewErrorWaitGroup(maxConcurrency int) *ErrorWaitGroup {
if maxConcurrency <= 0 {
// no concurrency = one task at the time
maxConcurrency = 1
}
return &ErrorWaitGroup{
maxConcurrency: maxConcurrency,
wg: &sync.WaitGroup{},
wgSem: make(chan bool, maxConcurrency),
}
}
// Reset restarts an ErrorWaitGroup, keeping original settings. Errors and pending goroutines, if any, are flushed.
func (ewg *ErrorWaitGroup) Reset() {
close(ewg.wgSem)
ewg.wg = &sync.WaitGroup{}
ewg.wgSem = make(chan bool, ewg.maxConcurrency)
ewg.errors = nil
}
// Add queues an ErrorWaitGroupTask to be executed as a goroutine.
func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) {
if ewg.maxConcurrency <= 1 {
// keep run order deterministic when parallelization is off
ewg.errors = append(ewg.errors, f())
return
}
ewg.wg.Add(1)
go func() {
ewg.wgSem <- true
err := f()
ewg.errorsMu.Lock()
ewg.errors = append(ewg.errors, err)
ewg.errorsMu.Unlock()
<-ewg.wgSem
ewg.wg.Done()
}()
}
// Wait sleeps until all ErrorWaitGroupTasks are completed, then returns errors for them.
func (ewg *ErrorWaitGroup) Wait() error {
ewg.wg.Wait()
return errors.Join(ewg.errors...)
}
Loading…
Cancel
Save