You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

243 lines
8.1 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  6. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  7. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  8. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  9. "github.com/chrislusf/seaweedfs/weed/storage/types"
  10. "golang.org/x/exp/slices"
  11. "io"
  12. "os"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandVolumeServerEvacuate{})
  16. }
  17. type commandVolumeServerEvacuate struct {
  18. targetServer string
  19. }
  20. func (c *commandVolumeServerEvacuate) Name() string {
  21. return "volumeServer.evacuate"
  22. }
  23. func (c *commandVolumeServerEvacuate) Help() string {
  24. return `move out all data on a volume server
  25. volumeServer.evacuate -node <host:port>
  26. This command moves all data away from the volume server.
  27. The volumes on the volume servers will be redistributed.
  28. Usually this is used to prepare to shutdown or upgrade the volume server.
  29. Sometimes a volume can not be moved because there are no
  30. good destination to meet the replication requirement.
  31. E.g. a volume replication 001 in a cluster with 2 volume servers can not be moved.
  32. You can use "-skipNonMoveable" to move the rest volumes.
  33. `
  34. }
  35. func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  36. vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  37. volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server")
  38. targetServer := vsEvacuateCommand.String("target", "", "<host>:<port> of target volume")
  39. skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved")
  40. applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes")
  41. retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry")
  42. if err = vsEvacuateCommand.Parse(args); err != nil {
  43. return nil
  44. }
  45. infoAboutSimulationMode(writer, *applyChange, "-force")
  46. if err = commandEnv.confirmIsLocked(args); err != nil {
  47. return
  48. }
  49. if *volumeServer == "" {
  50. return fmt.Errorf("need to specify volume server by -node=<host>:<port>")
  51. }
  52. if *targetServer != "" {
  53. c.targetServer = *targetServer
  54. }
  55. for i := 0; i < *retryCount+1; i++ {
  56. if err = c.volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil {
  57. return nil
  58. }
  59. }
  60. return
  61. }
  62. func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) (err error) {
  63. // 1. confirm the volume server is part of the cluster
  64. // 2. collect all other volume servers, sort by empty slots
  65. // 3. move to any other volume server as long as it satisfy the replication requirements
  66. // list all the volumes
  67. // collect topology information
  68. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  69. if err != nil {
  70. return err
  71. }
  72. if err := c.evacuateNormalVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
  73. return err
  74. }
  75. if err := c.evacuateEcVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
  76. return err
  77. }
  78. return nil
  79. }
  80. func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
  81. // find this volume server
  82. volumeServers := collectVolumeServersByDc(topologyInfo, "")
  83. thisNode, otherNodes := nodesOtherThan(volumeServers, volumeServer)
  84. if thisNode == nil {
  85. return fmt.Errorf("%s is not found in this cluster", volumeServer)
  86. }
  87. if c.targetServer != "" {
  88. targetServerFound := false
  89. for _, otherNode := range otherNodes {
  90. if otherNode.info.Id == c.targetServer {
  91. otherNodes = []*Node{otherNode}
  92. targetServerFound = true
  93. break
  94. }
  95. }
  96. if !targetServerFound {
  97. return fmt.Errorf("target %s is not found in this cluster", c.targetServer)
  98. }
  99. }
  100. // move away normal volumes
  101. volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
  102. for _, diskInfo := range thisNode.info.DiskInfos {
  103. for _, vol := range diskInfo.VolumeInfos {
  104. hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
  105. if err != nil {
  106. fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err)
  107. }
  108. if !hasMoved {
  109. if skipNonMoveable {
  110. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement))
  111. fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String())
  112. } else {
  113. return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer)
  114. }
  115. }
  116. }
  117. }
  118. return nil
  119. }
  120. func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
  121. // find this ec volume server
  122. ecNodes, _ := collectEcVolumeServersByDc(topologyInfo, "")
  123. thisNode, otherNodes := ecNodesOtherThan(ecNodes, volumeServer)
  124. if thisNode == nil {
  125. return fmt.Errorf("%s is not found in this cluster\n", volumeServer)
  126. }
  127. // move away ec volumes
  128. for _, diskInfo := range thisNode.info.DiskInfos {
  129. for _, ecShardInfo := range diskInfo.EcShardInfos {
  130. hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange)
  131. if err != nil {
  132. fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err)
  133. }
  134. if !hasMoved {
  135. if skipNonMoveable {
  136. fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer)
  137. } else {
  138. return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer)
  139. }
  140. }
  141. }
  142. }
  143. return nil
  144. }
  145. func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) {
  146. for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
  147. slices.SortFunc(otherNodes, func(a, b *EcNode) bool {
  148. return a.localShardIdCount(ecShardInfo.Id) < b.localShardIdCount(ecShardInfo.Id)
  149. })
  150. for i := 0; i < len(otherNodes); i++ {
  151. emptyNode := otherNodes[i]
  152. collectionPrefix := ""
  153. if ecShardInfo.Collection != "" {
  154. collectionPrefix = ecShardInfo.Collection + "_"
  155. }
  156. fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
  157. err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange)
  158. if err != nil {
  159. return
  160. } else {
  161. hasMoved = true
  162. break
  163. }
  164. }
  165. if !hasMoved {
  166. return
  167. }
  168. }
  169. return
  170. }
  171. func (c *commandVolumeServerEvacuate) moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
  172. fn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType))
  173. for _, n := range otherNodes {
  174. n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
  175. return v.DiskType == vol.DiskType
  176. })
  177. }
  178. slices.SortFunc(otherNodes, func(a, b *Node) bool {
  179. return a.localVolumeRatio(fn) < b.localVolumeRatio(fn)
  180. })
  181. for i := 0; i < len(otherNodes); i++ {
  182. emptyNode := otherNodes[i]
  183. hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange)
  184. if err != nil {
  185. return
  186. }
  187. if hasMoved {
  188. break
  189. }
  190. }
  191. return
  192. }
  193. func nodesOtherThan(volumeServers []*Node, thisServer string) (thisNode *Node, otherNodes []*Node) {
  194. for _, node := range volumeServers {
  195. if node.info.Id == thisServer {
  196. thisNode = node
  197. continue
  198. }
  199. otherNodes = append(otherNodes, node)
  200. }
  201. return
  202. }
  203. func ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNode *EcNode, otherNodes []*EcNode) {
  204. for _, node := range volumeServers {
  205. if node.info.Id == thisServer {
  206. thisNode = node
  207. continue
  208. }
  209. otherNodes = append(otherNodes, node)
  210. }
  211. return
  212. }