From 045908b12e5ff61b003540241e6ad9daf05da278 Mon Sep 17 00:00:00 2001 From: Misty Date: Sun, 2 Jun 2024 16:42:15 +0800 Subject: [PATCH] Support concurrent volume.tier.upload --- weed/shell/command_volume_tier_upload.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go index c109d59d8..2b92f5105 100644 --- a/weed/shell/command_volume_tier_upload.go +++ b/weed/shell/command_volume_tier_upload.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" + "golang.org/x/sync/errgroup" "io" "time" @@ -64,6 +65,7 @@ func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writ quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period") dest := tierCommand.String("dest", "", "the target tier name") keepLocalDatFile := tierCommand.Bool("keepLocalDatFile", false, "whether keep local dat file") + concurrency := tierCommand.Int("concurrency", 1, "concurrency to use when uploading") if err = tierCommand.Parse(args); err != nil { return nil } @@ -86,13 +88,24 @@ func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writ return err } fmt.Printf("tier upload volumes: %v\n", volumeIds) - for _, vid := range volumeIds { - if err = doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil { - return err - } + + eg, ctx := errgroup.WithContext(context.Background()) + eg.SetLimit(*concurrency) + for _, _vid := range volumeIds { + vid := _vid // capture the loop variable + eg.Go(func() error { + if ctx.Err() != nil { + return nil + } + if err = doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil { + fmt.Fprintf(writer, "tier upload volume %v error: %v\n", vid, err) + return err + } + return nil + }) } - return nil + return eg.Wait() } func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) {