From 93040b8240259833c1b53e7a0b34906f0a9474aa Mon Sep 17 00:00:00 2001 From: Sakuragawa Misty Date: Tue, 19 Mar 2024 03:13:33 +0800 Subject: [PATCH] Better volume.tier.move performance --- weed/shell/command_volume_tier_move.go | 29 +++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index bf41c2ea0..113274077 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -64,6 +64,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer applyChange := tierCommand.Bool("force", false, "actually apply the changes") ioBytePerSecond := tierCommand.Int64("ioBytePerSecond", 0, "limit the speed of move") replicationString := tierCommand.String("toReplication", "", "the new target replication setting") + disableSrcParallelLimit := tierCommand.Bool("disableSrcParallelLimit", false, "limit each src server to only one job") if err = tierCommand.Parse(args); err != nil { return nil @@ -103,7 +104,8 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer } wg := sync.WaitGroup{} - bufferLen := len(allLocations) + //bufferLen := len(allLocations) + bufferLen := 0 c.queues = make(map[pb.ServerAddress]chan volumeTierMoveJob) for _, dst := range allLocations { @@ -114,6 +116,9 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer go func(dst location, jobs <-chan volumeTierMoveJob, applyChanges bool) { defer wg.Done() for job := range jobs { + if job.src == "" && job.vid == 0 { + continue // ignore testing job + } fmt.Fprintf(writer, "moving volume %d from %s to %s with disk type %s ...\n", job.vid, job.src, dst.dataNode.Id, toDiskType.ReadableString()) locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(job.vid)) @@ -122,11 +127,16 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer continue } - unlock := c.Lock(job.src) + unlock := func() {} + if !*disableSrcParallelLimit { // when srcParallelLimit disabled, don't do the locking limit + unlock = c.Lock(job.src) + } if applyChanges { if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst, *ioBytePerSecond, replicationString); err != nil { fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err) + } else { + fmt.Fprintf(writer, "tier moved volume %d %s => %s\n", job.vid, job.src, dst.dataNode.Id) } } unlock() @@ -193,6 +203,7 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer // find one server with the most empty volume slots with target disk type hasFoundTarget := false + hasAvailableTarget := false fn := capacityByFreeVolumeCount(toDiskType) for _, dst := range allLocations { if fn(dst.dataNode) > 0 && !hasFoundTarget { @@ -209,6 +220,14 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer if sourceVolumeServer == "" { continue } + hasAvailableTarget = true + select { + case c.queues[pb.NewServerAddressFromDataNode(dst.dataNode)] <- volumeTierMoveJob{"", 0}: + break + default: + continue + } + hasFoundTarget = true // adjust volume count @@ -219,8 +238,12 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer } } - if !hasFoundTarget { + if !hasAvailableTarget { fmt.Fprintf(writer, "can not find disk type %s for volume %d\n", toDiskType.ReadableString(), vid) + } else if !hasFoundTarget { + fmt.Fprintf(writer, "waiting for free worker for disk type %s volume %d\n", toDiskType.ReadableString(), vid) + time.Sleep(5 * time.Second) + return c.doVolumeTierMove(commandEnv, writer, vid, toDiskType, allLocations) } return nil