You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

262 lines
8.8 KiB

3 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  6. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  7. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  8. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  9. "github.com/chrislusf/seaweedfs/weed/storage/types"
  10. "golang.org/x/exp/slices"
  11. "io"
  12. "os"
  13. "time"
  14. )
  15. func init() {
  16. Commands = append(Commands, &commandVolumeServerEvacuate{})
  17. }
  18. type commandVolumeServerEvacuate struct {
  19. topologyInfo *master_pb.TopologyInfo
  20. targetServer string
  21. volumeRack string
  22. }
  23. func (c *commandVolumeServerEvacuate) Name() string {
  24. return "volumeServer.evacuate"
  25. }
  26. func (c *commandVolumeServerEvacuate) Help() string {
  27. return `move out all data on a volume server
  28. volumeServer.evacuate -node <host:port>
  29. This command moves all data away from the volume server.
  30. The volumes on the volume servers will be redistributed.
  31. Usually this is used to prepare to shutdown or upgrade the volume server.
  32. Sometimes a volume can not be moved because there are no
  33. good destination to meet the replication requirement.
  34. E.g. a volume replication 001 in a cluster with 2 volume servers can not be moved.
  35. You can use "-skipNonMoveable" to move the rest volumes.
  36. `
  37. }
  38. func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  39. vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  40. volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server")
  41. volumeRack := vsEvacuateCommand.String("rack", "", "source rack for the volume servers")
  42. targetServer := vsEvacuateCommand.String("target", "", "<host>:<port> of target volume")
  43. skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved")
  44. applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes")
  45. retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry")
  46. if err = vsEvacuateCommand.Parse(args); err != nil {
  47. return nil
  48. }
  49. infoAboutSimulationMode(writer, *applyChange, "-force")
  50. if err = commandEnv.confirmIsLocked(args); err != nil && *applyChange {
  51. return
  52. }
  53. if *volumeServer == "" && *volumeRack == "" {
  54. return fmt.Errorf("need to specify volume server by -node=<host>:<port> or source rack")
  55. }
  56. if *targetServer != "" {
  57. c.targetServer = *targetServer
  58. }
  59. if *volumeRack != "" {
  60. c.volumeRack = *volumeRack
  61. }
  62. for i := 0; i < *retryCount+1; i++ {
  63. if err = c.volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil {
  64. return nil
  65. }
  66. }
  67. return
  68. }
  69. func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) (err error) {
  70. // 1. confirm the volume server is part of the cluster
  71. // 2. collect all other volume servers, sort by empty slots
  72. // 3. move to any other volume server as long as it satisfy the replication requirements
  73. // list all the volumes
  74. // collect topology information
  75. c.topologyInfo, _, err = collectTopologyInfo(commandEnv, 0)
  76. if err != nil {
  77. return err
  78. }
  79. go func() {
  80. for {
  81. if topologyInfo, _, err := collectTopologyInfo(commandEnv, 5*time.Minute); err != nil {
  82. c.topologyInfo = topologyInfo
  83. }
  84. }
  85. }()
  86. if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
  87. return err
  88. }
  89. if err := c.evacuateEcVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
  90. return err
  91. }
  92. return nil
  93. }
  94. func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
  95. // find this volume server
  96. volumeServers := collectVolumeServersByDc(c.topologyInfo, "")
  97. thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer)
  98. if len(thisNodes) == 0 {
  99. return fmt.Errorf("%s is not found in this cluster", volumeServer)
  100. }
  101. // move away normal volumes
  102. for _, thisNode := range thisNodes {
  103. for _, diskInfo := range thisNode.info.DiskInfos {
  104. volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo)
  105. for _, vol := range diskInfo.VolumeInfos {
  106. hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
  107. if err != nil {
  108. fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err)
  109. }
  110. if !hasMoved {
  111. if skipNonMoveable {
  112. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement))
  113. fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String())
  114. } else {
  115. return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer)
  116. }
  117. }
  118. }
  119. }
  120. }
  121. return nil
  122. }
  123. func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
  124. // find this ec volume server
  125. ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "")
  126. thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
  127. if len(thisNodes) == 0 {
  128. return fmt.Errorf("%s is not found in this cluster\n", volumeServer)
  129. }
  130. // move away ec volumes
  131. for _, thisNode := range thisNodes {
  132. for _, diskInfo := range thisNode.info.DiskInfos {
  133. for _, ecShardInfo := range diskInfo.EcShardInfos {
  134. hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange)
  135. if err != nil {
  136. fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err)
  137. }
  138. if !hasMoved {
  139. if skipNonMoveable {
  140. fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer)
  141. } else {
  142. return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer)
  143. }
  144. }
  145. }
  146. }
  147. }
  148. return nil
  149. }
  150. func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) {
  151. for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
  152. slices.SortFunc(otherNodes, func(a, b *EcNode) bool {
  153. return a.localShardIdCount(ecShardInfo.Id) < b.localShardIdCount(ecShardInfo.Id)
  154. })
  155. for i := 0; i < len(otherNodes); i++ {
  156. emptyNode := otherNodes[i]
  157. collectionPrefix := ""
  158. if ecShardInfo.Collection != "" {
  159. collectionPrefix = ecShardInfo.Collection + "_"
  160. }
  161. fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
  162. err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange)
  163. if err != nil {
  164. return
  165. } else {
  166. hasMoved = true
  167. break
  168. }
  169. }
  170. if !hasMoved {
  171. return
  172. }
  173. }
  174. return
  175. }
  176. func (c *commandVolumeServerEvacuate) moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
  177. fn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType))
  178. for _, n := range otherNodes {
  179. n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
  180. return v.DiskType == vol.DiskType
  181. })
  182. }
  183. slices.SortFunc(otherNodes, func(a, b *Node) bool {
  184. return a.localVolumeRatio(fn) < b.localVolumeRatio(fn)
  185. })
  186. for i := 0; i < len(otherNodes); i++ {
  187. emptyNode := otherNodes[i]
  188. hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange)
  189. if err != nil {
  190. return
  191. }
  192. if hasMoved {
  193. break
  194. }
  195. }
  196. return
  197. }
  198. func (c *commandVolumeServerEvacuate) nodesOtherThan(volumeServers []*Node, thisServer string) (thisNodes []*Node, otherNodes []*Node) {
  199. for _, node := range volumeServers {
  200. if node.info.Id == thisServer || (c.volumeRack != "" && node.rack == c.volumeRack) {
  201. thisNodes = append(thisNodes, node)
  202. continue
  203. }
  204. if c.volumeRack != "" && c.volumeRack == node.rack {
  205. continue
  206. }
  207. if c.targetServer != "" && c.targetServer != node.info.Id {
  208. continue
  209. }
  210. otherNodes = append(otherNodes, node)
  211. }
  212. return
  213. }
  214. func (c *commandVolumeServerEvacuate) ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNodes []*EcNode, otherNodes []*EcNode) {
  215. for _, node := range volumeServers {
  216. if node.info.Id == thisServer || (c.volumeRack != "" && string(node.rack) == c.volumeRack) {
  217. thisNodes = append(thisNodes, node)
  218. continue
  219. }
  220. if c.volumeRack != "" && c.volumeRack == string(node.rack) {
  221. continue
  222. }
  223. if c.targetServer != "" && c.targetServer != node.info.Id {
  224. continue
  225. }
  226. otherNodes = append(otherNodes, node)
  227. }
  228. return
  229. }