Browse Source

Support concurrent volume.tier.upload

pull/5631/head
Misty 8 months ago
parent
commit
045908b12e
  1. 17
      weed/shell/command_volume_tier_upload.go

17
weed/shell/command_volume_tier_upload.go

@ -5,6 +5,7 @@ import (
"flag" "flag"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"golang.org/x/sync/errgroup"
"io" "io"
"time" "time"
@ -64,6 +65,7 @@ func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writ
quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period") quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period")
dest := tierCommand.String("dest", "", "the target tier name") dest := tierCommand.String("dest", "", "the target tier name")
keepLocalDatFile := tierCommand.Bool("keepLocalDatFile", false, "whether keep local dat file") keepLocalDatFile := tierCommand.Bool("keepLocalDatFile", false, "whether keep local dat file")
concurrency := tierCommand.Int("concurrency", 1, "concurrency to use when uploading")
if err = tierCommand.Parse(args); err != nil { if err = tierCommand.Parse(args); err != nil {
return nil return nil
} }
@ -86,13 +88,24 @@ func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writ
return err return err
} }
fmt.Printf("tier upload volumes: %v\n", volumeIds) fmt.Printf("tier upload volumes: %v\n", volumeIds)
for _, vid := range volumeIds {
eg, ctx := errgroup.WithContext(context.Background())
eg.SetLimit(*concurrency)
for _, _vid := range volumeIds {
vid := _vid // capture the loop variable
eg.Go(func() error {
if ctx.Err() != nil {
return nil
}
if err = doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil { if err = doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil {
fmt.Fprintf(writer, "tier upload volume %v error: %v\n", vid, err)
return err return err
} }
return nil
})
} }
return nil
return eg.Wait()
} }
func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) { func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) {

Loading…
Cancel
Save