You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

208 lines
6.7 KiB

  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  7. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  10. "io"
  11. "sort"
  12. )
  13. func init() {
  14. Commands = append(Commands, &commandVolumeServerEvacuate{})
  15. }
  16. type commandVolumeServerEvacuate struct {
  17. }
  18. func (c *commandVolumeServerEvacuate) Name() string {
  19. return "volumeServer.evacuate"
  20. }
  21. func (c *commandVolumeServerEvacuate) Help() string {
  22. return `move out all data on a volume server
  23. volumeServer.evacuate -node <host:port>
  24. This command moves all data away from the volume server.
  25. The volumes on the volume servers will be redistributed.
  26. Usually this is used to prepare to shutdown or upgrade the volume server.
  27. Sometimes a volume can not be moved because there are no
  28. good destination to meet the replication requirement.
  29. E.g. a volume replication 001 in a cluster with 2 volume servers can not be moved.
  30. You can use "-skipNonMoveable" to move the rest volumes.
  31. `
  32. }
  33. func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  34. if err = commandEnv.confirmIsLocked(); err != nil {
  35. return
  36. }
  37. vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  38. volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server")
  39. skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved")
  40. applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes")
  41. if err = vsEvacuateCommand.Parse(args); err != nil {
  42. return nil
  43. }
  44. if *volumeServer == "" {
  45. return fmt.Errorf("need to specify volume server by -node=<host>:<port>")
  46. }
  47. return volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer)
  48. }
  49. func volumeServerEvacuate(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) (err error) {
  50. // 1. confirm the volume server is part of the cluster
  51. // 2. collect all other volume servers, sort by empty slots
  52. // 3. move to any other volume server as long as it satisfy the replication requirements
  53. // list all the volumes
  54. var resp *master_pb.VolumeListResponse
  55. err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  56. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  57. return err
  58. })
  59. if err != nil {
  60. return err
  61. }
  62. if err := evacuateNormalVolumes(commandEnv, resp, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
  63. return err
  64. }
  65. if err := evacuateEcVolumes(commandEnv, resp, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
  66. return err
  67. }
  68. return nil
  69. }
  70. func evacuateNormalVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListResponse, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
  71. // find this volume server
  72. volumeServers := collectVolumeServersByDc(resp.TopologyInfo, "")
  73. thisNode, otherNodes := nodesOtherThan(volumeServers, volumeServer)
  74. if thisNode == nil {
  75. return fmt.Errorf("%s is not found in this cluster", volumeServer)
  76. }
  77. // move away normal volumes
  78. volumeReplicas, _ := collectVolumeReplicaLocations(resp)
  79. for _, vol := range thisNode.info.VolumeInfos {
  80. hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
  81. if err != nil {
  82. return fmt.Errorf("move away volume %d from %s: %v", vol.Id, volumeServer, err)
  83. }
  84. if !hasMoved {
  85. if skipNonMoveable {
  86. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement))
  87. fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String())
  88. } else {
  89. return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer)
  90. }
  91. }
  92. }
  93. return nil
  94. }
  95. func evacuateEcVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListResponse, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
  96. // find this ec volume server
  97. ecNodes, _ := collectEcVolumeServersByDc(resp.TopologyInfo, "")
  98. thisNode, otherNodes := ecNodesOtherThan(ecNodes, volumeServer)
  99. if thisNode == nil {
  100. return fmt.Errorf("%s is not found in this cluster\n", volumeServer)
  101. }
  102. // move away ec volumes
  103. for _, ecShardInfo := range thisNode.info.EcShardInfos {
  104. hasMoved, err := moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange)
  105. if err != nil {
  106. return fmt.Errorf("move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err)
  107. }
  108. if !hasMoved {
  109. if skipNonMoveable {
  110. fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer)
  111. } else {
  112. return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer)
  113. }
  114. }
  115. }
  116. return nil
  117. }
  118. func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) {
  119. for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
  120. sort.Slice(otherNodes, func(i, j int) bool {
  121. return otherNodes[i].localShardIdCount(ecShardInfo.Id) < otherNodes[j].localShardIdCount(ecShardInfo.Id)
  122. })
  123. for i := 0; i < len(otherNodes); i++ {
  124. emptyNode := otherNodes[i]
  125. err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange)
  126. if err != nil {
  127. return
  128. } else {
  129. hasMoved = true
  130. break
  131. }
  132. }
  133. if !hasMoved {
  134. return
  135. }
  136. }
  137. return
  138. }
  139. func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
  140. sort.Slice(otherNodes, func(i, j int) bool {
  141. return otherNodes[i].localVolumeRatio() < otherNodes[j].localVolumeRatio()
  142. })
  143. for i := 0; i < len(otherNodes); i++ {
  144. emptyNode := otherNodes[i]
  145. hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange)
  146. if err != nil {
  147. return
  148. }
  149. if hasMoved {
  150. break
  151. }
  152. }
  153. return
  154. }
  155. func nodesOtherThan(volumeServers []*Node, thisServer string) (thisNode *Node, otherNodes []*Node) {
  156. for _, node := range volumeServers {
  157. if node.info.Id == thisServer {
  158. thisNode = node
  159. continue
  160. }
  161. otherNodes = append(otherNodes, node)
  162. }
  163. return
  164. }
  165. func ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNode *EcNode, otherNodes []*EcNode) {
  166. for _, node := range volumeServers {
  167. if node.info.Id == thisServer {
  168. thisNode = node
  169. continue
  170. }
  171. otherNodes = append(otherNodes, node)
  172. }
  173. return
  174. }