You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

226 lines
7.4 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  6. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  7. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  8. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  9. "github.com/chrislusf/seaweedfs/weed/storage/types"
  10. "golang.org/x/exp/slices"
  11. "io"
  12. "os"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandVolumeServerEvacuate{})
  16. }
  17. type commandVolumeServerEvacuate struct {
  18. }
  19. func (c *commandVolumeServerEvacuate) Name() string {
  20. return "volumeServer.evacuate"
  21. }
  22. func (c *commandVolumeServerEvacuate) Help() string {
  23. return `move out all data on a volume server
  24. volumeServer.evacuate -node <host:port>
  25. This command moves all data away from the volume server.
  26. The volumes on the volume servers will be redistributed.
  27. Usually this is used to prepare to shutdown or upgrade the volume server.
  28. Sometimes a volume can not be moved because there are no
  29. good destination to meet the replication requirement.
  30. E.g. a volume replication 001 in a cluster with 2 volume servers can not be moved.
  31. You can use "-skipNonMoveable" to move the rest volumes.
  32. `
  33. }
  34. func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  35. vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  36. volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server")
  37. skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved")
  38. applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes")
  39. retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry")
  40. if err = vsEvacuateCommand.Parse(args); err != nil {
  41. return nil
  42. }
  43. infoAboutSimulationMode(writer, *applyChange, "-force")
  44. if err = commandEnv.confirmIsLocked(args); err != nil {
  45. return
  46. }
  47. if *volumeServer == "" {
  48. return fmt.Errorf("need to specify volume server by -node=<host>:<port>")
  49. }
  50. for i := 0; i < *retryCount+1; i++ {
  51. if err = volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil {
  52. return nil
  53. }
  54. }
  55. return
  56. }
  57. func volumeServerEvacuate(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) (err error) {
  58. // 1. confirm the volume server is part of the cluster
  59. // 2. collect all other volume servers, sort by empty slots
  60. // 3. move to any other volume server as long as it satisfy the replication requirements
  61. // list all the volumes
  62. // collect topology information
  63. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  64. if err != nil {
  65. return err
  66. }
  67. if err := evacuateNormalVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
  68. return err
  69. }
  70. if err := evacuateEcVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
  71. return err
  72. }
  73. return nil
  74. }
  75. func evacuateNormalVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
  76. // find this volume server
  77. volumeServers := collectVolumeServersByDc(topologyInfo, "")
  78. thisNode, otherNodes := nodesOtherThan(volumeServers, volumeServer)
  79. if thisNode == nil {
  80. return fmt.Errorf("%s is not found in this cluster", volumeServer)
  81. }
  82. // move away normal volumes
  83. volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
  84. for _, diskInfo := range thisNode.info.DiskInfos {
  85. for _, vol := range diskInfo.VolumeInfos {
  86. hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
  87. if err != nil {
  88. return fmt.Errorf("move away volume %d from %s: %v", vol.Id, volumeServer, err)
  89. }
  90. if !hasMoved {
  91. if skipNonMoveable {
  92. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement))
  93. fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String())
  94. } else {
  95. return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer)
  96. }
  97. }
  98. }
  99. }
  100. return nil
  101. }
  102. func evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
  103. // find this ec volume server
  104. ecNodes, _ := collectEcVolumeServersByDc(topologyInfo, "")
  105. thisNode, otherNodes := ecNodesOtherThan(ecNodes, volumeServer)
  106. if thisNode == nil {
  107. return fmt.Errorf("%s is not found in this cluster\n", volumeServer)
  108. }
  109. // move away ec volumes
  110. for _, diskInfo := range thisNode.info.DiskInfos {
  111. for _, ecShardInfo := range diskInfo.EcShardInfos {
  112. hasMoved, err := moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange)
  113. if err != nil {
  114. return fmt.Errorf("move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err)
  115. }
  116. if !hasMoved {
  117. if skipNonMoveable {
  118. fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer)
  119. } else {
  120. return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer)
  121. }
  122. }
  123. }
  124. }
  125. return nil
  126. }
  127. func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) {
  128. for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
  129. slices.SortFunc(otherNodes, func(a, b *EcNode) bool {
  130. return a.localShardIdCount(ecShardInfo.Id) < b.localShardIdCount(ecShardInfo.Id)
  131. })
  132. for i := 0; i < len(otherNodes); i++ {
  133. emptyNode := otherNodes[i]
  134. collectionPrefix := ""
  135. if ecShardInfo.Collection != "" {
  136. collectionPrefix = ecShardInfo.Collection + "_"
  137. }
  138. fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
  139. err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange)
  140. if err != nil {
  141. return
  142. } else {
  143. hasMoved = true
  144. break
  145. }
  146. }
  147. if !hasMoved {
  148. return
  149. }
  150. }
  151. return
  152. }
  153. func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
  154. fn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType))
  155. for _, n := range otherNodes {
  156. n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
  157. return v.DiskType == vol.DiskType
  158. })
  159. }
  160. slices.SortFunc(otherNodes, func(a, b *Node) bool {
  161. return a.localVolumeRatio(fn) > b.localVolumeRatio(fn)
  162. })
  163. for i := 0; i < len(otherNodes); i++ {
  164. emptyNode := otherNodes[i]
  165. hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange)
  166. if err != nil {
  167. return
  168. }
  169. if hasMoved {
  170. break
  171. }
  172. }
  173. return
  174. }
  175. func nodesOtherThan(volumeServers []*Node, thisServer string) (thisNode *Node, otherNodes []*Node) {
  176. for _, node := range volumeServers {
  177. if node.info.Id == thisServer {
  178. thisNode = node
  179. continue
  180. }
  181. otherNodes = append(otherNodes, node)
  182. }
  183. return
  184. }
  185. func ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNode *EcNode, otherNodes []*EcNode) {
  186. for _, node := range volumeServers {
  187. if node.info.Id == thisServer {
  188. thisNode = node
  189. continue
  190. }
  191. otherNodes = append(otherNodes, node)
  192. }
  193. return
  194. }