From 2190a833d3bc032a6afda3fde9df72e2854f0724 Mon Sep 17 00:00:00 2001 From: dsd Date: Thu, 9 Jan 2025 15:09:29 +0800 Subject: [PATCH] add parallel for volume.fix.replication --- weed/shell/command_volume_fix_replication.go | 37 ++++++++++++++------ 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index a687a0f23..171465a5c 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -7,6 +7,7 @@ import ( "io" "path/filepath" "strconv" + "sync" "time" "slices" @@ -69,6 +70,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, doCheck := volFixReplicationCommand.Bool("doCheck", true, "Also check synchronization before deleting") retryCount := volFixReplicationCommand.Int("retry", 5, "how many times to retry") volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle") + maxParallelization := volFixReplicationCommand.Int("maxParallelization", 1, "run up to X tasks in parallel, whenever possible") if err = volFixReplicationCommand.Parse(args); err != nil { return nil @@ -135,7 +137,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds) if underReplicatedVolumeIdsCount > 0 { // find the most underpopulated data nodes - fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, *applyChanges, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep) + fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, *applyChanges, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep, *maxParallelization) if err != nil { return err } @@ -279,23 +281,38 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr return nil } -func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) { +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep, maxParallelization int) (fixedVolumes map[string]int, err error) { fixedVolumes = map[string]int{} if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 { underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep] } + + var ( + wg sync.WaitGroup + semaphore = make(chan struct{}, maxParallelization) + ) + for _, vid := range underReplicatedVolumeIds { - for i := 0; i < retryCount+1; i++ { - if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, applyChanges, volumeReplicas, vid, allLocations); err == nil { - if applyChanges { - fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid]) + wg.Add(1) + semaphore <- struct{}{} // Acquire semaphore + + go func(vid uint32) { + defer wg.Done() + defer func() { <-semaphore }() // Release semaphore + + for attempt := 0; attempt <= retryCount; attempt++ { + if err := c.fixOneUnderReplicatedVolume(commandEnv, writer, applyChanges, volumeReplicas, vid, allLocations); err == nil { + if applyChanges { + fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid]) + } + break } - break - } else { - fmt.Fprintf(writer, "fixing under replicated volume %d: %v\n", vid, err) + fmt.Fprintf(writer, "Failed to fix volume %d (attempt %d): %v\n", vid, attempt+1, err) } - } + }(vid) } + + wg.Wait() return fixedVolumes, nil }