You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

272 lines
9.2 KiB

3 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  6. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  7. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  8. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  9. "github.com/chrislusf/seaweedfs/weed/storage/types"
  10. "golang.org/x/exp/slices"
  11. "io"
  12. "os"
  13. "time"
  14. )
  15. const topologyInfoUpdateInterval = 5 * time.Minute
  16. func init() {
  17. Commands = append(Commands, &commandVolumeServerEvacuate{})
  18. }
  19. type commandVolumeServerEvacuate struct {
  20. topologyInfo *master_pb.TopologyInfo
  21. targetServer string
  22. volumeRack string
  23. }
  24. func (c *commandVolumeServerEvacuate) Name() string {
  25. return "volumeServer.evacuate"
  26. }
  27. func (c *commandVolumeServerEvacuate) Help() string {
  28. return `move out all data on a volume server
  29. volumeServer.evacuate -node <host:port>
  30. This command moves all data away from the volume server.
  31. The volumes on the volume servers will be redistributed.
  32. Usually this is used to prepare to shutdown or upgrade the volume server.
  33. Sometimes a volume can not be moved because there are no
  34. good destination to meet the replication requirement.
  35. E.g. a volume replication 001 in a cluster with 2 volume servers can not be moved.
  36. You can use "-skipNonMoveable" to move the rest volumes.
  37. `
  38. }
  39. func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  40. vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  41. volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server")
  42. volumeRack := vsEvacuateCommand.String("rack", "", "source rack for the volume servers")
  43. targetServer := vsEvacuateCommand.String("target", "", "<host>:<port> of target volume")
  44. skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved")
  45. applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes")
  46. retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry")
  47. if err = vsEvacuateCommand.Parse(args); err != nil {
  48. return nil
  49. }
  50. infoAboutSimulationMode(writer, *applyChange, "-force")
  51. if err = commandEnv.confirmIsLocked(args); err != nil && *applyChange {
  52. return
  53. }
  54. if *volumeServer == "" && *volumeRack == "" {
  55. return fmt.Errorf("need to specify volume server by -node=<host>:<port> or source rack")
  56. }
  57. if *targetServer != "" {
  58. c.targetServer = *targetServer
  59. }
  60. if *volumeRack != "" {
  61. c.volumeRack = *volumeRack
  62. }
  63. for i := 0; i < *retryCount+1; i++ {
  64. if err = c.volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil {
  65. return nil
  66. }
  67. }
  68. return
  69. }
  70. func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) (err error) {
  71. // 1. confirm the volume server is part of the cluster
  72. // 2. collect all other volume servers, sort by empty slots
  73. // 3. move to any other volume server as long as it satisfy the replication requirements
  74. // list all the volumes
  75. // collect topology information
  76. c.topologyInfo, _, err = collectTopologyInfo(commandEnv, 0)
  77. if err != nil {
  78. return err
  79. }
  80. if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
  81. return err
  82. }
  83. if err := c.evacuateEcVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
  84. return err
  85. }
  86. return nil
  87. }
  88. func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
  89. // find this volume server
  90. volumeServers := collectVolumeServersByDc(c.topologyInfo, "")
  91. thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer)
  92. if len(thisNodes) == 0 {
  93. return fmt.Errorf("%s is not found in this cluster", volumeServer)
  94. }
  95. // move away normal volumes
  96. ticker := time.NewTicker(topologyInfoUpdateInterval)
  97. for _, thisNode := range thisNodes {
  98. for _, diskInfo := range thisNode.info.DiskInfos {
  99. if applyChange {
  100. select {
  101. case <-ticker.C:
  102. if topologyInfo, _, err := collectTopologyInfo(commandEnv, 0); err != nil {
  103. fmt.Fprintf(writer, "update topologyInfo %v", err)
  104. } else {
  105. _, otherNodesNew := c.nodesOtherThan(
  106. collectVolumeServersByDc(topologyInfo, ""), volumeServer)
  107. if len(otherNodesNew) > 0 {
  108. otherNodes = otherNodesNew
  109. c.topologyInfo = topologyInfo
  110. fmt.Fprintf(writer, "topologyInfo updated %v\n", len(otherNodes))
  111. }
  112. }
  113. }
  114. }
  115. volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo)
  116. for _, vol := range diskInfo.VolumeInfos {
  117. hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
  118. if err != nil {
  119. fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err)
  120. }
  121. if !hasMoved {
  122. if skipNonMoveable {
  123. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement))
  124. fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String())
  125. } else {
  126. return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer)
  127. }
  128. }
  129. }
  130. }
  131. }
  132. return nil
  133. }
  134. func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
  135. // find this ec volume server
  136. ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "")
  137. thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
  138. if len(thisNodes) == 0 {
  139. return fmt.Errorf("%s is not found in this cluster\n", volumeServer)
  140. }
  141. // move away ec volumes
  142. for _, thisNode := range thisNodes {
  143. for _, diskInfo := range thisNode.info.DiskInfos {
  144. for _, ecShardInfo := range diskInfo.EcShardInfos {
  145. hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange)
  146. if err != nil {
  147. fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err)
  148. }
  149. if !hasMoved {
  150. if skipNonMoveable {
  151. fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer)
  152. } else {
  153. return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer)
  154. }
  155. }
  156. }
  157. }
  158. }
  159. return nil
  160. }
  161. func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) {
  162. for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
  163. slices.SortFunc(otherNodes, func(a, b *EcNode) bool {
  164. return a.localShardIdCount(ecShardInfo.Id) < b.localShardIdCount(ecShardInfo.Id)
  165. })
  166. for i := 0; i < len(otherNodes); i++ {
  167. emptyNode := otherNodes[i]
  168. collectionPrefix := ""
  169. if ecShardInfo.Collection != "" {
  170. collectionPrefix = ecShardInfo.Collection + "_"
  171. }
  172. fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
  173. err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange)
  174. if err != nil {
  175. return
  176. } else {
  177. hasMoved = true
  178. break
  179. }
  180. }
  181. if !hasMoved {
  182. return
  183. }
  184. }
  185. return
  186. }
  187. func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
  188. fn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType))
  189. for _, n := range otherNodes {
  190. n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
  191. return v.DiskType == vol.DiskType
  192. })
  193. }
  194. slices.SortFunc(otherNodes, func(a, b *Node) bool {
  195. return a.localVolumeRatio(fn) < b.localVolumeRatio(fn)
  196. })
  197. for i := 0; i < len(otherNodes); i++ {
  198. emptyNode := otherNodes[i]
  199. hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange)
  200. if err != nil {
  201. return
  202. }
  203. if hasMoved {
  204. break
  205. }
  206. }
  207. return
  208. }
  209. func (c *commandVolumeServerEvacuate) nodesOtherThan(volumeServers []*Node, thisServer string) (thisNodes []*Node, otherNodes []*Node) {
  210. for _, node := range volumeServers {
  211. if node.info.Id == thisServer || (c.volumeRack != "" && node.rack == c.volumeRack) {
  212. thisNodes = append(thisNodes, node)
  213. continue
  214. }
  215. if c.volumeRack != "" && c.volumeRack == node.rack {
  216. continue
  217. }
  218. if c.targetServer != "" && c.targetServer != node.info.Id {
  219. continue
  220. }
  221. otherNodes = append(otherNodes, node)
  222. }
  223. return
  224. }
  225. func (c *commandVolumeServerEvacuate) ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNodes []*EcNode, otherNodes []*EcNode) {
  226. for _, node := range volumeServers {
  227. if node.info.Id == thisServer || (c.volumeRack != "" && string(node.rack) == c.volumeRack) {
  228. thisNodes = append(thisNodes, node)
  229. continue
  230. }
  231. if c.volumeRack != "" && c.volumeRack == string(node.rack) {
  232. continue
  233. }
  234. if c.targetServer != "" && c.targetServer != node.info.Id {
  235. continue
  236. }
  237. otherNodes = append(otherNodes, node)
  238. }
  239. return
  240. }