From 64198dad8346fe284cbef944fe01ff0d062c147d Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Wed, 30 Jul 2025 19:58:30 +0200 Subject: [PATCH] Paralleize operations for `weed shell`'s `volume.fix.replication`. (#6789) Paralleize operations for `weed shell`s `volume.fix.replication`. --- weed/shell/command_volume_fix_replication.go | 58 ++++++++++++-------- 1 file changed, 35 insertions(+), 23 deletions(-) diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index a687a0f23..65e212444 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -29,6 +29,7 @@ func init() { type commandVolumeFixReplication struct { collectionPattern *string + // TODO: move parameter flags here so we don't shuffle them around via function calls. } func (c *commandVolumeFixReplication) Name() string { @@ -67,6 +68,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, applyChanges := volFixReplicationCommand.Bool("force", false, "apply the fix") doDelete := volFixReplicationCommand.Bool("doDelete", true, "Also delete over-replicated volumes besides fixing under-replication") doCheck := volFixReplicationCommand.Bool("doCheck", true, "Also check synchronization before deleting") + maxParallelization := volFixReplicationCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") retryCount := volFixReplicationCommand.Int("retry", 5, "how many times to retry") volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle") @@ -81,6 +83,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, return } + ewg := NewErrorWaitGroup(*maxParallelization) underReplicatedVolumeIdsCount := 1 for underReplicatedVolumeIdsCount > 0 { fixedVolumeReplicas := map[string]int{} @@ -107,6 +110,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, switch { case replicaPlacement.GetCopyCount() > len(replicas) || !satisfyReplicaCurrentLocation(replicaPlacement, replicas): underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) + fmt.Fprintf(writer, "volume %d replication %s, but under replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) case isMisplaced(replicas, replicaPlacement): misplacedVolumeIds = append(misplacedVolumeIds, vid) fmt.Fprintf(writer, "volume %d replication %s is not well placed %s\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id) @@ -115,30 +119,28 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) } } + underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds) if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") } - if len(overReplicatedVolumeIds) > 0 && *doDelete { - if err := c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete); err != nil { - return err - } - } - - if len(misplacedVolumeIds) > 0 && *doDelete { - if err := c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume); err != nil { - return err - } - } - - underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds) - if underReplicatedVolumeIdsCount > 0 { + ewg.Reset() + ewg.Add(func() error { // find the most underpopulated data nodes fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, *applyChanges, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep) - if err != nil { - return err - } + return err + }) + if *doDelete { + ewg.Add(func() error { + return c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete) + }) + ewg.Add(func() error { + return c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume) + }) + } + if err := ewg.Wait(); err != nil { + return nil } if !*applyChanges { @@ -219,8 +221,13 @@ func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, grpcDi return } -func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, doCheck bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error { - for _, vid := range overReplicatedVolumeIds { +func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, doCheck bool, volumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error { + if len(volumeIds) == 0 { + // nothing to do + return nil + } + + for _, vid := range volumeIds { replicas := volumeReplicas[vid] replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement)) @@ -279,12 +286,17 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr return nil } -func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) { +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, volumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) { fixedVolumes = map[string]int{} - if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 { - underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep] + + if len(volumeIds) == 0 { + return fixedVolumes, nil + } + + if len(volumeIds) > volumesPerStep && volumesPerStep > 0 { + volumeIds = volumeIds[0:volumesPerStep] } - for _, vid := range underReplicatedVolumeIds { + for _, vid := range volumeIds { for i := 0; i < retryCount+1; i++ { if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, applyChanges, volumeReplicas, vid, allLocations); err == nil { if applyChanges {