From f524d0801e97e0325a51f4963aed918da3adbd60 Mon Sep 17 00:00:00 2001 From: Misty Date: Sun, 2 Jun 2024 16:44:17 +0800 Subject: [PATCH] Support concurrenct volume.configure.replication --- .../command_volume_configure_replication.go | 60 +++++++++++-------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go index a6acd6838..5634e9a91 100644 --- a/weed/shell/command_volume_configure_replication.go +++ b/weed/shell/command_volume_configure_replication.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" + "golang.org/x/sync/errgroup" "io" "path/filepath" @@ -35,7 +36,7 @@ func (c *commandVolumeConfigureReplication) Help() string { ` } -func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, _ io.Writer) (err error) { +func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeIdInt := configureReplicationCommand.Int("volumeId", 0, "the volume id") @@ -67,39 +68,50 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman vid := needle.VolumeId(*volumeIdInt) volumeFilter := getVolumeFilter(replicaPlacement, uint32(vid), *collectionPattern) + eg, gCtx := errgroup.WithContext(context.Background()) + _ = gCtx // find all data nodes with volumes that needs replication change eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - var targetVolumeIds []uint32 - for _, diskInfo := range dn.DiskInfos { - for _, v := range diskInfo.VolumeInfos { - if volumeFilter(v) { - targetVolumeIds = append(targetVolumeIds, v.Id) + eg.Go(func() error { + var targetVolumeIds []uint32 + for _, diskInfo := range dn.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + if volumeFilter(v) { + targetVolumeIds = append(targetVolumeIds, v.Id) + } } } - } - if len(targetVolumeIds) == 0 { - return - } - err = operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dn), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - for _, targetVolumeId := range targetVolumeIds { - resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{ - VolumeId: targetVolumeId, - Replication: replicaPlacement.String(), - }) - if configureErr != nil { - return configureErr - } - if resp.Error != "" { - return errors.New(resp.Error) + if len(targetVolumeIds) == 0 { + return nil + } + fmt.Fprintf(writer, "volume server %s has %d volumes\n", dn.Id, len(targetVolumeIds)) + err = operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dn), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + for i, targetVolumeId := range targetVolumeIds { + if i%100 == 0 { + fmt.Fprintf(writer, "volume marking progress on %s: %.2f (%d/%d)\n", dn.Id, float32(i)/float32(len(targetVolumeIds))*100, i, len(targetVolumeIds)) + } + resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{ + VolumeId: targetVolumeId, + Replication: replicaPlacement.String(), + }) + if configureErr != nil { + return configureErr + } + if resp.Error != "" { + return errors.New(resp.Error) + } } + return nil + }) + if err != nil { + return err } return nil }) - if err != nil { - return - } }) + err = eg.Wait() + return err }