From 1f736ce9c137b8a438d08847c85d5c5d1ea87080 Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Fri, 13 Dec 2024 10:54:52 +0100 Subject: [PATCH] Collect, and return, all errors on concurrent tasks for `shell.EcBalance()`. --- weed/shell/command_ec_common.go | 34 +++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index b5ba657e1..03988bf7d 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -559,44 +559,46 @@ type ecBalancer struct { applyBalancing bool parallelize bool - wg *sync.WaitGroup - // TODO: Maybe accumulate all errors instead of just the last one. - wgError error + wg *sync.WaitGroup + wgErrors []error } type ecBalancerTask func() error func (ecb *ecBalancer) wgInit() { - if ecb.wg == nil { - ecb.wg = &sync.WaitGroup{} - ecb.wgError = nil + if ecb.wg != nil { + return } + ecb.wg = &sync.WaitGroup{} + ecb.wgErrors = nil } func (ecb *ecBalancer) wgAdd(f ecBalancerTask) { - if ecb.wg == nil || !ecb.parallelize { + wrapper := func() { + if ecb.wg != nil { + defer ecb.wg.Done() + } if err := f(); err != nil { - ecb.wgError = err + ecb.wgErrors = append(ecb.wgErrors, err) } + } + + if ecb.wg == nil || !ecb.parallelize { + wrapper() return } ecb.wg.Add(1) - go func() { - if err := f(); err != nil { - ecb.wgError = err - } - ecb.wg.Done() - }() + go wrapper() } func (ecb *ecBalancer) wgWait() error { if ecb.wg != nil { ecb.wg.Wait() } - err := ecb.wgError + err := errors.Join(ecb.wgErrors...) ecb.wg = nil - ecb.wgError = nil + ecb.wgErrors = nil return err }