From 44c48c929ac2e513a3ad5749744c77ab480ae1fe Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Wed, 18 Dec 2024 20:59:48 +0100 Subject: [PATCH] Parallelize volume replica operations within `ec.encode`. (#6374) --- weed/shell/command_ec_encode.go | 21 +++++++++++++++------ weed/shell/command_volume_move.go | 8 ++++++-- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 62bf7fbbf..2b35c5c79 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -119,7 +119,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr // encode all requested volumes... for _, vid := range volumeIds { - if err = doEcEncode(commandEnv, *collection, vid); err != nil { + if err = doEcEncode(commandEnv, *collection, vid, *parallelize); err != nil { return fmt.Errorf("ec encode for volume %d: %v", vid, err) } } @@ -131,7 +131,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } -func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) error { +func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelize bool) error { if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") } @@ -142,11 +142,20 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) return fmt.Errorf("volume %d not found", vid) } - // fmt.Printf("found ec %d shards on %v\n", vid, locations) - // mark the volume as readonly - if err := markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false); err != nil { - return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) + ewg := ErrorWaitGroup{ + parallelize: parallelize, + } + for _, location := range locations { + ewg.Add(func() error { + if err := markVolumeReplicaWritable(commandEnv.option.GrpcDialOption, vid, location, false, false); err != nil { + return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, location.Url, err) + } + return nil + }) + } + if err := ewg.Wait(); err != nil { + return err } // generate ec shards diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index 2ddd3f625..26fd5fc58 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -227,10 +227,14 @@ func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId }) } +func markVolumeReplicaWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, location wdclient.Location, writable, persist bool) error { + fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url) + return markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable, persist) +} + func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable, persist bool) error { for _, location := range locations { - fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url) - if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable, persist); err != nil { + if err := markVolumeReplicaWritable(grpcDialOption, volumeId, location, writable, persist); err != nil { return err } }