From 6f8333b2e5f927033d70b205c930731954f3d0d3 Mon Sep 17 00:00:00 2001 From: Misty Date: Mon, 27 May 2024 21:14:26 +0800 Subject: [PATCH] support concurrent ec.decode --- weed/shell/command_ec_decode.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index aa0ca5045..890466614 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/storage/types" + "golang.org/x/sync/errgroup" "io" "time" @@ -41,6 +42,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr decodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeId := decodeCommand.Int("volumeId", 0, "the volume id") collection := decodeCommand.String("collection", "", "the collection name") + concurrency := decodeCommand.Int("concurrency", 0, "number of parallel ec decode threads") if err = decodeCommand.Parse(args); err != nil { return nil } @@ -65,13 +67,18 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr // apply to all volumes in the collection volumeIds := collectEcShardIds(topologyInfo, *collection) fmt.Printf("ec encode volumes: %v\n", volumeIds) - for _, vid := range volumeIds { - if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil { - return err - } + eg, _ := errgroup.WithContext(context.Background()) + eg.SetLimit(*concurrency) + for _, _vid := range volumeIds { + vid := _vid + eg.Go(func() error { + if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil { + return err + } + return nil + }) } - - return nil + return eg.Wait() } func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {