From 2b4112e462d48af926caf6431a3de6ef256afcae Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 18 Jul 2022 11:32:28 +0500 Subject: [PATCH] update otherNodes --- weed/shell/command_volume_server_evacuate.go | 40 +++++++++++--------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 0595ef308..7d50b7f81 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -24,6 +24,7 @@ type commandVolumeServerEvacuate struct { topologyInfo *master_pb.TopologyInfo targetServer string volumeRack string + otherNodes []*Node } func (c *commandVolumeServerEvacuate) Name() string { @@ -97,22 +98,27 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn return err } - stopchan := make(chan struct{}) - go func() { - for { - select { - default: - if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil { - fmt.Fprintf(writer, "update topologyInfo %v", err) - } else { - c.topologyInfo = topologyInfo + if applyChange { + stopchan := make(chan struct{}) + go func() { + for { + select { + default: + if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil { + fmt.Fprintf(writer, "update topologyInfo %v", err) + } else { + c.topologyInfo = topologyInfo + _, c.otherNodes = c.nodesOtherThan( + collectVolumeServersByDc(c.topologyInfo, ""), volumeServer) + fmt.Fprintf(writer, "topologyInfo updated %v\n", len(c.otherNodes)) + } + case <-stopchan: + return } - case <-stopchan: - return } - } - }() - defer close(stopchan) + }() + defer close(stopchan) + } if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err @@ -128,7 +134,8 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this volume server volumeServers := collectVolumeServersByDc(c.topologyInfo, "") - thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer) + var thisNodes []*Node + thisNodes, c.otherNodes = c.nodesOtherThan(volumeServers, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster", volumeServer) } @@ -138,7 +145,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE for _, diskInfo := range thisNode.info.DiskInfos { volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo) for _, vol := range diskInfo.VolumeInfos { - hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) + hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, c.otherNodes, applyChange) if err != nil { fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) } @@ -152,7 +159,6 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE } } } - } return nil }