diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go index c109d59d8..2b92f5105 100644 --- a/weed/shell/command_volume_tier_upload.go +++ b/weed/shell/command_volume_tier_upload.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" + "golang.org/x/sync/errgroup" "io" "time" @@ -64,6 +65,7 @@ func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writ quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period") dest := tierCommand.String("dest", "", "the target tier name") keepLocalDatFile := tierCommand.Bool("keepLocalDatFile", false, "whether keep local dat file") + concurrency := tierCommand.Int("concurrency", 1, "concurrency to use when uploading") if err = tierCommand.Parse(args); err != nil { return nil } @@ -86,13 +88,24 @@ func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writ return err } fmt.Printf("tier upload volumes: %v\n", volumeIds) - for _, vid := range volumeIds { - if err = doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil { - return err - } + + eg, ctx := errgroup.WithContext(context.Background()) + eg.SetLimit(*concurrency) + for _, _vid := range volumeIds { + vid := _vid // capture the loop variable + eg.Go(func() error { + if ctx.Err() != nil { + return nil + } + if err = doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil { + fmt.Fprintf(writer, "tier upload volume %v error: %v\n", vid, err) + return err + } + return nil + }) } - return nil + return eg.Wait() } func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) {