|
@ -14,6 +14,8 @@ import ( |
|
|
"time" |
|
|
"time" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
const topologyInfoUpdateInterval = 5 * time.Minute |
|
|
|
|
|
|
|
|
func init() { |
|
|
func init() { |
|
|
Commands = append(Commands, &commandVolumeServerEvacuate{}) |
|
|
Commands = append(Commands, &commandVolumeServerEvacuate{}) |
|
|
} |
|
|
} |
|
@ -95,13 +97,22 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
stopchan := make(chan struct{}) |
|
|
go func() { |
|
|
go func() { |
|
|
for { |
|
|
for { |
|
|
if topologyInfo, _, err := collectTopologyInfo(commandEnv, 5*time.Minute); err != nil { |
|
|
|
|
|
c.topologyInfo = topologyInfo |
|
|
|
|
|
|
|
|
select { |
|
|
|
|
|
default: |
|
|
|
|
|
if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil { |
|
|
|
|
|
c.topologyInfo = topologyInfo |
|
|
|
|
|
} else { |
|
|
|
|
|
fmt.Fprintf(writer, "update topologyInfo %v", err) |
|
|
|
|
|
} |
|
|
|
|
|
case <-stopchan: |
|
|
|
|
|
return |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
}() |
|
|
}() |
|
|
|
|
|
defer close(stopchan) |
|
|
|
|
|
|
|
|
if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { |
|
|
if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { |
|
|
return err |
|
|
return err |
|
@ -127,7 +138,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE |
|
|
for _, diskInfo := range thisNode.info.DiskInfos { |
|
|
for _, diskInfo := range thisNode.info.DiskInfos { |
|
|
volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo) |
|
|
volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo) |
|
|
for _, vol := range diskInfo.VolumeInfos { |
|
|
for _, vol := range diskInfo.VolumeInfos { |
|
|
hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) |
|
|
|
|
|
|
|
|
hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) |
|
|
fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) |
|
|
} |
|
|
} |
|
@ -204,7 +215,7 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (c *commandVolumeServerEvacuate) moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { |
|
|
|
|
|
|
|
|
func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { |
|
|
fn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType)) |
|
|
fn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType)) |
|
|
for _, n := range otherNodes { |
|
|
for _, n := range otherNodes { |
|
|
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { |
|
|
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { |
|
|