You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

314 lines
9.3 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  7. "io"
  8. "os"
  9. "sort"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandVolumeBalance{})
  16. }
  17. type commandVolumeBalance struct {
  18. }
  19. func (c *commandVolumeBalance) Name() string {
  20. return "volume.balance"
  21. }
  22. func (c *commandVolumeBalance) Help() string {
  23. return `balance all volumes among volume servers
  24. volume.balance [-collection ALL|EACH_COLLECTION|<collection_name>] [-force] [-dataCenter=<data_center_name>]
  25. Algorithm:
  26. For each type of volume server (different max volume count limit){
  27. for each collection {
  28. balanceWritableVolumes()
  29. balanceReadOnlyVolumes()
  30. }
  31. }
  32. func balanceWritableVolumes(){
  33. idealWritableVolumes = totalWritableVolumes / numVolumeServers
  34. for hasMovedOneVolume {
  35. sort all volume servers ordered by the number of local writable volumes
  36. pick the volume server B with the highest number of writable volumes y
  37. for any the volume server A with the number of writable volumes x +1 <= idealWritableVolume {
  38. if y > idealWritableVolumes and x +1 <= idealWritableVolumes {
  39. if B has a writable volume id v that A does not have, and satisfy v replication requirements {
  40. move writable volume v from A to B
  41. }
  42. }
  43. }
  44. }
  45. }
  46. func balanceReadOnlyVolumes(){
  47. //similar to balanceWritableVolumes
  48. }
  49. `
  50. }
  51. func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  52. if err = commandEnv.confirmIsLocked(); err != nil {
  53. return
  54. }
  55. balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  56. collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection")
  57. dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
  58. applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan.")
  59. if err = balanceCommand.Parse(args); err != nil {
  60. return nil
  61. }
  62. var resp *master_pb.VolumeListResponse
  63. err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  64. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  65. return err
  66. })
  67. if err != nil {
  68. return err
  69. }
  70. typeToNodes := collectVolumeServersByType(resp.TopologyInfo, *dc)
  71. volumeReplicas, _ := collectVolumeReplicaLocations(resp)
  72. for maxVolumeCount, volumeServers := range typeToNodes {
  73. if len(volumeServers) < 2 {
  74. fmt.Printf("only 1 node is configured max %d volumes, skipping balancing\n", maxVolumeCount)
  75. continue
  76. }
  77. if *collection == "EACH_COLLECTION" {
  78. collections, err := ListCollectionNames(commandEnv, true, false)
  79. if err != nil {
  80. return err
  81. }
  82. for _, c := range collections {
  83. if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
  84. return err
  85. }
  86. }
  87. } else if *collection == "ALL_COLLECTIONS" {
  88. if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
  89. return err
  90. }
  91. } else {
  92. if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
  93. return err
  94. }
  95. }
  96. }
  97. return nil
  98. }
  99. func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
  100. // balance writable volumes
  101. for _, n := range nodes {
  102. n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
  103. if collection != "ALL_COLLECTIONS" {
  104. if v.Collection != collection {
  105. return false
  106. }
  107. }
  108. return !v.ReadOnly && v.Size < volumeSizeLimit
  109. })
  110. }
  111. if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortWritableVolumes, applyBalancing); err != nil {
  112. return err
  113. }
  114. // balance readable volumes
  115. for _, n := range nodes {
  116. n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
  117. if collection != "ALL_COLLECTIONS" {
  118. if v.Collection != collection {
  119. return false
  120. }
  121. }
  122. return v.ReadOnly || v.Size >= volumeSizeLimit
  123. })
  124. }
  125. if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortReadOnlyVolumes, applyBalancing); err != nil {
  126. return err
  127. }
  128. return nil
  129. }
  130. func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter string) (typeToNodes map[uint64][]*Node) {
  131. typeToNodes = make(map[uint64][]*Node)
  132. for _, dc := range t.DataCenterInfos {
  133. if selectedDataCenter != "" && dc.Id != selectedDataCenter {
  134. continue
  135. }
  136. for _, r := range dc.RackInfos {
  137. for _, dn := range r.DataNodeInfos {
  138. typeToNodes[dn.MaxVolumeCount] = append(typeToNodes[dn.MaxVolumeCount], &Node{
  139. info: dn,
  140. dc: dc.Id,
  141. rack: r.Id,
  142. })
  143. }
  144. }
  145. }
  146. return
  147. }
  148. type Node struct {
  149. info *master_pb.DataNodeInfo
  150. selectedVolumes map[uint32]*master_pb.VolumeInformationMessage
  151. dc string
  152. rack string
  153. }
  154. func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) {
  155. sort.Slice(volumes, func(i, j int) bool {
  156. return volumes[i].Size < volumes[j].Size
  157. })
  158. }
  159. func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) {
  160. sort.Slice(volumes, func(i, j int) bool {
  161. return volumes[i].Id < volumes[j].Id
  162. })
  163. }
  164. func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) {
  165. selectedVolumeCount := 0
  166. for _, dn := range nodes {
  167. selectedVolumeCount += len(dn.selectedVolumes)
  168. }
  169. idealSelectedVolumes := ceilDivide(selectedVolumeCount, len(nodes))
  170. hasMoved := true
  171. for hasMoved {
  172. hasMoved = false
  173. sort.Slice(nodes, func(i, j int) bool {
  174. // TODO sort by free volume slots???
  175. return len(nodes[i].selectedVolumes) < len(nodes[j].selectedVolumes)
  176. })
  177. fullNode := nodes[len(nodes)-1]
  178. var candidateVolumes []*master_pb.VolumeInformationMessage
  179. for _, v := range fullNode.selectedVolumes {
  180. candidateVolumes = append(candidateVolumes, v)
  181. }
  182. sortCandidatesFn(candidateVolumes)
  183. for i := 0; i < len(nodes)-1; i++ {
  184. emptyNode := nodes[i]
  185. if !(len(fullNode.selectedVolumes) > idealSelectedVolumes && len(emptyNode.selectedVolumes)+1 <= idealSelectedVolumes) {
  186. // no more volume servers with empty slots
  187. break
  188. }
  189. hasMoved, err = attemptToMoveOneVolume(commandEnv, volumeReplicas, fullNode, candidateVolumes, emptyNode, applyBalancing)
  190. if err != nil {
  191. return
  192. }
  193. if hasMoved {
  194. // moved one volume
  195. break
  196. }
  197. }
  198. }
  199. return nil
  200. }
  201. func attemptToMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, candidateVolumes []*master_pb.VolumeInformationMessage, emptyNode *Node, applyBalancing bool) (hasMoved bool, err error) {
  202. for _, v := range candidateVolumes {
  203. if v.ReplicaPlacement > 0 {
  204. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(v.ReplicaPlacement))
  205. if !isGoodMove(replicaPlacement, volumeReplicas[v.Id], fullNode, emptyNode) {
  206. continue
  207. }
  208. }
  209. if _, found := emptyNode.selectedVolumes[v.Id]; !found {
  210. if err = moveVolume(commandEnv, v, fullNode, emptyNode, applyBalancing); err == nil {
  211. delete(fullNode.selectedVolumes, v.Id)
  212. emptyNode.selectedVolumes[v.Id] = v
  213. hasMoved = true
  214. break
  215. } else {
  216. return
  217. }
  218. }
  219. }
  220. return
  221. }
  222. func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyBalancing bool) error {
  223. collectionPrefix := v.Collection + "_"
  224. if v.Collection == "" {
  225. collectionPrefix = ""
  226. }
  227. fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
  228. if applyBalancing {
  229. return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second)
  230. }
  231. return nil
  232. }
  233. func (node *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) {
  234. node.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage)
  235. for _, v := range node.info.VolumeInfos {
  236. if fn(v) {
  237. node.selectedVolumes[v.Id] = v
  238. }
  239. }
  240. }
  241. func isGoodMove(placement *super_block.ReplicaPlacement, existingReplicas []*VolumeReplica, sourceNode, targetNode *Node) bool {
  242. for _, replica := range existingReplicas {
  243. if replica.location.dataNode.Id == targetNode.info.Id &&
  244. replica.location.rack == targetNode.rack &&
  245. replica.location.dc == targetNode.dc {
  246. // never move to existing nodes
  247. return false
  248. }
  249. }
  250. dcs, racks := make(map[string]bool), make(map[string]int)
  251. for _, replica := range existingReplicas {
  252. if replica.location.dataNode.Id != sourceNode.info.Id {
  253. dcs[replica.location.DataCenter()] = true
  254. racks[replica.location.Rack()]++
  255. }
  256. }
  257. dcs[targetNode.dc] = true
  258. racks[fmt.Sprintf("%s %s", targetNode.dc, targetNode.rack)]++
  259. if len(dcs) > placement.DiffDataCenterCount+1 {
  260. return false
  261. }
  262. if len(racks) > placement.DiffRackCount+placement.DiffDataCenterCount+1 {
  263. return false
  264. }
  265. for _, sameRackCount := range racks {
  266. if sameRackCount > placement.SameRackCount+1 {
  267. return false
  268. }
  269. }
  270. return true
  271. }