You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

252 lines
8.6 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  6. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  7. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  8. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  9. "github.com/chrislusf/seaweedfs/weed/storage/types"
  10. "golang.org/x/exp/slices"
  11. "io"
  12. "os"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandVolumeServerEvacuate{})
  16. }
  17. type commandVolumeServerEvacuate struct {
  18. targetServer string
  19. volumeRack string
  20. }
  21. func (c *commandVolumeServerEvacuate) Name() string {
  22. return "volumeServer.evacuate"
  23. }
  24. func (c *commandVolumeServerEvacuate) Help() string {
  25. return `move out all data on a volume server
  26. volumeServer.evacuate -node <host:port>
  27. This command moves all data away from the volume server.
  28. The volumes on the volume servers will be redistributed.
  29. Usually this is used to prepare to shutdown or upgrade the volume server.
  30. Sometimes a volume can not be moved because there are no
  31. good destination to meet the replication requirement.
  32. E.g. a volume replication 001 in a cluster with 2 volume servers can not be moved.
  33. You can use "-skipNonMoveable" to move the rest volumes.
  34. `
  35. }
  36. func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  37. vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  38. volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server")
  39. volumeRack := vsEvacuateCommand.String("rack", "", "rack for then volume servers")
  40. targetServer := vsEvacuateCommand.String("target", "", "<host>:<port> of target volume")
  41. skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved")
  42. applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes")
  43. retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry")
  44. if err = vsEvacuateCommand.Parse(args); err != nil {
  45. return nil
  46. }
  47. infoAboutSimulationMode(writer, *applyChange, "-force")
  48. if err = commandEnv.confirmIsLocked(args); err != nil {
  49. return
  50. }
  51. if *volumeServer == "" {
  52. return fmt.Errorf("need to specify volume server by -node=<host>:<port>")
  53. }
  54. if *targetServer != "" {
  55. c.targetServer = *targetServer
  56. }
  57. if *volumeRack != "" {
  58. c.volumeRack = *volumeRack
  59. }
  60. for i := 0; i < *retryCount+1; i++ {
  61. if err = c.volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil {
  62. return nil
  63. }
  64. }
  65. return
  66. }
  67. func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) (err error) {
  68. // 1. confirm the volume server is part of the cluster
  69. // 2. collect all other volume servers, sort by empty slots
  70. // 3. move to any other volume server as long as it satisfy the replication requirements
  71. // list all the volumes
  72. // collect topology information
  73. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  74. if err != nil {
  75. return err
  76. }
  77. if err := c.evacuateNormalVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
  78. return err
  79. }
  80. if err := c.evacuateEcVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
  81. return err
  82. }
  83. return nil
  84. }
  85. func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
  86. // find this volume server
  87. volumeServers := collectVolumeServersByDc(topologyInfo, "")
  88. thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer)
  89. if len(thisNodes) == 0 {
  90. return fmt.Errorf("%s is not found in this cluster", volumeServer)
  91. }
  92. // move away normal volumes
  93. for _, thisNode := range thisNodes {
  94. for _, diskInfo := range thisNode.info.DiskInfos {
  95. volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
  96. for _, vol := range diskInfo.VolumeInfos {
  97. hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
  98. if err != nil {
  99. fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err)
  100. }
  101. if !hasMoved {
  102. if skipNonMoveable {
  103. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement))
  104. fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String())
  105. } else {
  106. return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer)
  107. }
  108. }
  109. }
  110. }
  111. }
  112. return nil
  113. }
  114. func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
  115. // find this ec volume server
  116. ecNodes, _ := collectEcVolumeServersByDc(topologyInfo, "")
  117. thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
  118. if len(thisNodes) == 0 {
  119. return fmt.Errorf("%s is not found in this cluster\n", volumeServer)
  120. }
  121. // move away ec volumes
  122. for _, thisNode := range thisNodes {
  123. for _, diskInfo := range thisNode.info.DiskInfos {
  124. for _, ecShardInfo := range diskInfo.EcShardInfos {
  125. hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange)
  126. if err != nil {
  127. fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err)
  128. }
  129. if !hasMoved {
  130. if skipNonMoveable {
  131. fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer)
  132. } else {
  133. return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer)
  134. }
  135. }
  136. }
  137. }
  138. }
  139. return nil
  140. }
  141. func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) {
  142. for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
  143. slices.SortFunc(otherNodes, func(a, b *EcNode) bool {
  144. return a.localShardIdCount(ecShardInfo.Id) < b.localShardIdCount(ecShardInfo.Id)
  145. })
  146. for i := 0; i < len(otherNodes); i++ {
  147. emptyNode := otherNodes[i]
  148. collectionPrefix := ""
  149. if ecShardInfo.Collection != "" {
  150. collectionPrefix = ecShardInfo.Collection + "_"
  151. }
  152. fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
  153. err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange)
  154. if err != nil {
  155. return
  156. } else {
  157. hasMoved = true
  158. break
  159. }
  160. }
  161. if !hasMoved {
  162. return
  163. }
  164. }
  165. return
  166. }
  167. func (c *commandVolumeServerEvacuate) moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
  168. fn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType))
  169. for _, n := range otherNodes {
  170. n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
  171. return v.DiskType == vol.DiskType
  172. })
  173. }
  174. slices.SortFunc(otherNodes, func(a, b *Node) bool {
  175. return a.localVolumeRatio(fn) < b.localVolumeRatio(fn)
  176. })
  177. for i := 0; i < len(otherNodes); i++ {
  178. emptyNode := otherNodes[i]
  179. hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange)
  180. if err != nil {
  181. return
  182. }
  183. if hasMoved {
  184. break
  185. }
  186. }
  187. return
  188. }
  189. func (c *commandVolumeServerEvacuate) nodesOtherThan(volumeServers []*Node, thisServer string) (thisNodes []*Node, otherNodes []*Node) {
  190. for _, node := range volumeServers {
  191. if node.info.Id == thisServer || (c.volumeRack != "" && node.rack == c.volumeRack) {
  192. thisNodes = append(thisNodes, node)
  193. continue
  194. }
  195. if c.volumeRack != "" && c.volumeRack == node.rack {
  196. continue
  197. }
  198. if c.targetServer != "" && c.targetServer != node.info.Id {
  199. continue
  200. }
  201. otherNodes = append(otherNodes, node)
  202. }
  203. return
  204. }
  205. func (c *commandVolumeServerEvacuate) ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNodes []*EcNode, otherNodes []*EcNode) {
  206. for _, node := range volumeServers {
  207. if node.info.Id == thisServer || (c.volumeRack != "" && string(node.rack) == c.volumeRack) {
  208. thisNodes = append(thisNodes, node)
  209. continue
  210. }
  211. if c.volumeRack != "" && c.volumeRack == string(node.rack) {
  212. continue
  213. }
  214. if c.targetServer != "" && c.targetServer != node.info.Id {
  215. continue
  216. }
  217. otherNodes = append(otherNodes, node)
  218. }
  219. return
  220. }