Browse Source

Collect, and return, all errors on concurrent tasks for `shell.EcBalance()`.

pull/6351/head
Lisandro Pin 4 weeks ago
parent
commit
1f736ce9c1
  1. 32
      weed/shell/command_ec_common.go

32
weed/shell/command_ec_common.go

@ -560,43 +560,45 @@ type ecBalancer struct {
parallelize bool parallelize bool
wg *sync.WaitGroup wg *sync.WaitGroup
// TODO: Maybe accumulate all errors instead of just the last one.
wgError error
wgErrors []error
} }
type ecBalancerTask func() error type ecBalancerTask func() error
func (ecb *ecBalancer) wgInit() { func (ecb *ecBalancer) wgInit() {
if ecb.wg == nil {
ecb.wg = &sync.WaitGroup{}
ecb.wgError = nil
if ecb.wg != nil {
return
} }
ecb.wg = &sync.WaitGroup{}
ecb.wgErrors = nil
} }
func (ecb *ecBalancer) wgAdd(f ecBalancerTask) { func (ecb *ecBalancer) wgAdd(f ecBalancerTask) {
if ecb.wg == nil || !ecb.parallelize {
wrapper := func() {
if ecb.wg != nil {
defer ecb.wg.Done()
}
if err := f(); err != nil { if err := f(); err != nil {
ecb.wgError = err
ecb.wgErrors = append(ecb.wgErrors, err)
}
} }
if ecb.wg == nil || !ecb.parallelize {
wrapper()
return return
} }
ecb.wg.Add(1) ecb.wg.Add(1)
go func() {
if err := f(); err != nil {
ecb.wgError = err
}
ecb.wg.Done()
}()
go wrapper()
} }
func (ecb *ecBalancer) wgWait() error { func (ecb *ecBalancer) wgWait() error {
if ecb.wg != nil { if ecb.wg != nil {
ecb.wg.Wait() ecb.wg.Wait()
} }
err := ecb.wgError
err := errors.Join(ecb.wgErrors...)
ecb.wg = nil ecb.wg = nil
ecb.wgError = nil
ecb.wgErrors = nil
return err return err
} }

Loading…
Cancel
Save