@ -18,7 +18,9 @@ func init() {
}
}
type commandVolumeServerEvacuate struct {
type commandVolumeServerEvacuate struct {
topologyInfo * master_pb . TopologyInfo
targetServer string
targetServer string
volumeRack string
}
}
func ( c * commandVolumeServerEvacuate ) Name ( ) string {
func ( c * commandVolumeServerEvacuate ) Name ( ) string {
@ -47,7 +49,8 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv,
vsEvacuateCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
vsEvacuateCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
volumeServer := vsEvacuateCommand . String ( "node" , "" , "<host>:<port> of the volume server" )
volumeServer := vsEvacuateCommand . String ( "node" , "" , "<host>:<port> of the volume server" )
c . targetServer = * vsEvacuateCommand . String ( "target" , "" , "<host>:<port> of target volume" )
volumeRack := vsEvacuateCommand . String ( "rack" , "" , "source rack for the volume servers" )
targetServer := vsEvacuateCommand . String ( "target" , "" , "<host>:<port> of target volume" )
skipNonMoveable := vsEvacuateCommand . Bool ( "skipNonMoveable" , false , "skip volumes that can not be moved" )
skipNonMoveable := vsEvacuateCommand . Bool ( "skipNonMoveable" , false , "skip volumes that can not be moved" )
applyChange := vsEvacuateCommand . Bool ( "force" , false , "actually apply the changes" )
applyChange := vsEvacuateCommand . Bool ( "force" , false , "actually apply the changes" )
retryCount := vsEvacuateCommand . Int ( "retry" , 0 , "how many times to retry" )
retryCount := vsEvacuateCommand . Int ( "retry" , 0 , "how many times to retry" )
@ -56,12 +59,18 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv,
}
}
infoAboutSimulationMode ( writer , * applyChange , "-force" )
infoAboutSimulationMode ( writer , * applyChange , "-force" )
if err = commandEnv . confirmIsLocked ( args ) ; err != nil {
if err = commandEnv . confirmIsLocked ( args ) ; err != nil && * applyChange {
return
return
}
}
if * volumeServer == "" {
return fmt . Errorf ( "need to specify volume server by -node=<host>:<port>" )
if * volumeServer == "" && * volumeRack == "" {
return fmt . Errorf ( "need to specify volume server by -node=<host>:<port> or source rack" )
}
if * targetServer != "" {
c . targetServer = * targetServer
}
if * volumeRack != "" {
c . volumeRack = * volumeRack
}
}
for i := 0 ; i < * retryCount + 1 ; i ++ {
for i := 0 ; i < * retryCount + 1 ; i ++ {
if err = c . volumeServerEvacuate ( commandEnv , * volumeServer , * skipNonMoveable , * applyChange , writer ) ; err == nil {
if err = c . volumeServerEvacuate ( commandEnv , * volumeServer , * skipNonMoveable , * applyChange , writer ) ; err == nil {
@ -80,37 +89,51 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn
// list all the volumes
// list all the volumes
// collect topology information
// collect topology information
topologyInfo , _ , err : = collectTopologyInfo ( commandEnv , 0 )
c . topologyInfo , _ , err = collectTopologyInfo ( commandEnv , 0 )
if err != nil {
if err != nil {
return err
return err
}
}
if err := c . evacuateNormalVolumes ( commandEnv , topologyInfo , volumeServer , skipNonMoveable , applyChange , writer ) ; err != nil {
if err := c . evacuateNormalVolumes ( commandEnv , volumeServer , skipNonMoveable , applyChange , writer ) ; err != nil {
return err
return err
}
}
if err := c . evacuateEcVolumes ( commandEnv , topologyInfo , volumeServer , skipNonMoveable , applyChange , writer ) ; err != nil {
if err := c . evacuateEcVolumes ( commandEnv , volumeServer , skipNonMoveable , applyChange , writer ) ; err != nil {
return err
return err
}
}
return nil
return nil
}
}
func ( c * commandVolumeServerEvacuate ) evacuateNormalVolumes ( commandEnv * CommandEnv , topologyInfo * master_pb . TopologyInfo , volumeServer string , skipNonMoveable , applyChange bool , writer io . Writer ) error {
func ( c * commandVolumeServerEvacuate ) evacuateNormalVolumes ( commandEnv * CommandEnv , volumeServer string , skipNonMoveable , applyChange bool , writer io . Writer ) error {
// find this volume server
// find this volume server
volumeServers := collectVolumeServersByDc ( topologyInfo , "" )
thisNode , otherNodes := nodesOtherThan ( volumeServers , volumeServer )
if thisNode == nil {
volumeServers := collectVolumeServersByDc ( c . topologyInfo , "" )
thisNodes , otherNodes := c . nodesOtherThan ( volumeServers , volumeServer )
if len ( thisNodes ) == 0 {
return fmt . Errorf ( "%s is not found in this cluster" , volumeServer )
return fmt . Errorf ( "%s is not found in this cluster" , volumeServer )
}
}
// move away normal volumes
// move away normal volumes
volumeReplicas , _ := collectVolumeReplicaLocations ( topologyInfo )
for _ , thisNode := range thisNodes {
for _ , diskInfo := range thisNode . info . DiskInfos {
for _ , diskInfo := range thisNode . info . DiskInfos {
if applyChange {
if topologyInfo , _ , err := collectTopologyInfo ( commandEnv , 0 ) ; err != nil {
fmt . Fprintf ( writer , "update topologyInfo %v" , err )
} else {
_ , otherNodesNew := c . nodesOtherThan (
collectVolumeServersByDc ( topologyInfo , "" ) , volumeServer )
if len ( otherNodesNew ) > 0 {
otherNodes = otherNodesNew
c . topologyInfo = topologyInfo
fmt . Fprintf ( writer , "topologyInfo updated %v\n" , len ( otherNodes ) )
}
}
}
volumeReplicas , _ := collectVolumeReplicaLocations ( c . topologyInfo )
for _ , vol := range diskInfo . VolumeInfos {
for _ , vol := range diskInfo . VolumeInfos {
hasMoved , err := moveAwayOneNormalVolume ( commandEnv , volumeReplicas , vol , thisNode , otherNodes , applyChange )
hasMoved , err := moveAwayOneNormalVolume ( commandEnv , volumeReplicas , vol , thisNode , otherNodes , applyChange )
if err != nil {
if err != nil {
return fmt . Errorf ( "move away volume %d from %s: %v" , vol . Id , volumeServer , err )
fmt . Fprintf ( writer , "move away volume %d from %s: %v" , vol . Id , volumeServer , err )
}
}
if ! hasMoved {
if ! hasMoved {
if skipNonMoveable {
if skipNonMoveable {
@ -122,23 +145,25 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE
}
}
}
}
}
}
}
return nil
return nil
}
}
func ( c * commandVolumeServerEvacuate ) evacuateEcVolumes ( commandEnv * CommandEnv , topologyInfo * master_pb . TopologyInfo , volumeServer string , skipNonMoveable , applyChange bool , writer io . Writer ) error {
func ( c * commandVolumeServerEvacuate ) evacuateEcVolumes ( commandEnv * CommandEnv , volumeServer string , skipNonMoveable , applyChange bool , writer io . Writer ) error {
// find this ec volume server
// find this ec volume server
ecNodes , _ := collectEcVolumeServersByDc ( topologyInfo , "" )
thisNode , otherNodes := ecNodesOtherThan ( ecNodes , volumeServer )
if thisNode == nil {
ecNodes , _ := collectEcVolumeServersByDc ( c . topologyInfo , "" )
thisNodes , otherNodes := c . ecNodesOtherThan ( ecNodes , volumeServer )
if len ( thisNodes ) == 0 {
return fmt . Errorf ( "%s is not found in this cluster\n" , volumeServer )
return fmt . Errorf ( "%s is not found in this cluster\n" , volumeServer )
}
}
// move away ec volumes
// move away ec volumes
for _ , thisNode := range thisNodes {
for _ , diskInfo := range thisNode . info . DiskInfos {
for _ , diskInfo := range thisNode . info . DiskInfos {
for _ , ecShardInfo := range diskInfo . EcShardInfos {
for _ , ecShardInfo := range diskInfo . EcShardInfos {
hasMoved , err := c . moveAwayOneEcVolume ( commandEnv , ecShardInfo , thisNode , otherNodes , applyChange )
hasMoved , err := c . moveAwayOneEcVolume ( commandEnv , ecShardInfo , thisNode , otherNodes , applyChange )
if err != nil {
if err != nil {
return fmt . Errorf ( "move away volume %d from %s: %v" , ecShardInfo . Id , volumeServer , err )
fmt . Fprintf ( writer , "move away volume %d from %s: %v" , ecShardInfo . Id , volumeServer , err )
}
}
if ! hasMoved {
if ! hasMoved {
if skipNonMoveable {
if skipNonMoveable {
@ -149,6 +174,7 @@ func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv,
}
}
}
}
}
}
}
return nil
return nil
}
}
@ -160,9 +186,6 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv
} )
} )
for i := 0 ; i < len ( otherNodes ) ; i ++ {
for i := 0 ; i < len ( otherNodes ) ; i ++ {
emptyNode := otherNodes [ i ]
emptyNode := otherNodes [ i ]
if c . targetServer != "" && c . targetServer != emptyNode . info . Id {
continue
}
collectionPrefix := ""
collectionPrefix := ""
if ecShardInfo . Collection != "" {
if ecShardInfo . Collection != "" {
collectionPrefix = ecShardInfo . Collection + "_"
collectionPrefix = ecShardInfo . Collection + "_"
@ -207,10 +230,16 @@ func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][
return
return
}
}
func nodesOtherThan ( volumeServers [ ] * Node , thisServer string ) ( thisNode * Node , otherNodes [ ] * Node ) {
func ( c * commandVolumeServerEvacuate ) nodesOtherThan ( volumeServers [ ] * Node , thisServer string ) ( thisNodes [ ] * Node , otherNodes [ ] * Node ) {
for _ , node := range volumeServers {
for _ , node := range volumeServers {
if node . info . Id == thisServer {
thisNode = node
if node . info . Id == thisServer || ( c . volumeRack != "" && node . rack == c . volumeRack ) {
thisNodes = append ( thisNodes , node )
continue
}
if c . volumeRack != "" && c . volumeRack == node . rack {
continue
}
if c . targetServer != "" && c . targetServer != node . info . Id {
continue
continue
}
}
otherNodes = append ( otherNodes , node )
otherNodes = append ( otherNodes , node )
@ -218,10 +247,16 @@ func nodesOtherThan(volumeServers []*Node, thisServer string) (thisNode *Node, o
return
return
}
}
func ecNodesOtherThan ( volumeServers [ ] * EcNode , thisServer string ) ( thisNode * EcNode , otherNodes [ ] * EcNode ) {
func ( c * commandVolumeServerEvacuate ) ecNodesOtherThan ( volumeServers [ ] * EcNode , thisServer string ) ( thisNodes [ ] * EcNode , otherNodes [ ] * EcNode ) {
for _ , node := range volumeServers {
for _ , node := range volumeServers {
if node . info . Id == thisServer {
thisNode = node
if node . info . Id == thisServer || ( c . volumeRack != "" && string ( node . rack ) == c . volumeRack ) {
thisNodes = append ( thisNodes , node )
continue
}
if c . volumeRack != "" && c . volumeRack == string ( node . rack ) {
continue
}
if c . targetServer != "" && c . targetServer != node . info . Id {
continue
continue
}
}
otherNodes = append ( otherNodes , node )
otherNodes = append ( otherNodes , node )