You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

515 lines
17 KiB

6 years ago
  1. package shell
  2. import (
  3. "flag"
  4. "fmt"
  5. "io"
  6. "sort"
  7. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. )
  10. func init() {
  11. Commands = append(Commands, &commandEcBalance{})
  12. }
  13. type commandEcBalance struct {
  14. }
  15. func (c *commandEcBalance) Name() string {
  16. return "ec.balance"
  17. }
  18. func (c *commandEcBalance) Help() string {
  19. return `balance all ec shards among all racks and volume servers
  20. ec.balance [-c EACH_COLLECTION|<collection_name>] [-force] [-dataCenter <data_center>]
  21. Algorithm:
  22. For each type of volume server (different max volume count limit){
  23. for each collection:
  24. balanceEcVolumes(collectionName)
  25. for each rack:
  26. balanceEcRack(rack)
  27. }
  28. func balanceEcVolumes(collectionName){
  29. for each volume:
  30. doDeduplicateEcShards(volumeId)
  31. tracks rack~shardCount mapping
  32. for each volume:
  33. doBalanceEcShardsAcrossRacks(volumeId)
  34. for each volume:
  35. doBalanceEcShardsWithinRacks(volumeId)
  36. }
  37. // spread ec shards into more racks
  38. func doBalanceEcShardsAcrossRacks(volumeId){
  39. tracks rack~volumeIdShardCount mapping
  40. averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
  41. ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  42. for each ecShardsToMove {
  43. destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, averageShardsPerEcRack)
  44. destVolumeServers = volume servers on the destRack
  45. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  46. }
  47. }
  48. func doBalanceEcShardsWithinRacks(volumeId){
  49. racks = collect all racks that the volume id is on
  50. for rack, shards := range racks
  51. doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
  52. }
  53. // move ec shards
  54. func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
  55. tracks volumeServer~volumeIdShardCount mapping
  56. averageShardCount = len(shards) / numVolumeServers
  57. volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
  58. ecShardsToMove = select overflown ec shards from volumeServersOverAverage
  59. for each ecShardsToMove {
  60. destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, averageShardCount)
  61. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  62. }
  63. }
  64. // move ec shards while keeping shard distribution for the same volume unchanged or more even
  65. func balanceEcRack(rack){
  66. averageShardCount = total shards / numVolumeServers
  67. for hasMovedOneEcShard {
  68. sort all volume servers ordered by the number of local ec shards
  69. pick the volume server A with the lowest number of ec shards x
  70. pick the volume server B with the highest number of ec shards y
  71. if y > averageShardCount and x +1 <= averageShardCount {
  72. if B has a ec shard with volume id v that A does not have {
  73. move one ec shard v from B to A
  74. hasMovedOneEcShard = true
  75. }
  76. }
  77. }
  78. }
  79. `
  80. }
  81. func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  82. balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  83. collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
  84. dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
  85. applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan")
  86. if err = balanceCommand.Parse(args); err != nil {
  87. return nil
  88. }
  89. // collect all ec nodes
  90. allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, *dc)
  91. if err != nil {
  92. return err
  93. }
  94. if totalFreeEcSlots < 1 {
  95. return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots)
  96. }
  97. racks := collectRacks(allEcNodes)
  98. if *collection == "EACH_COLLECTION" {
  99. collections, err := ListCollectionNames(commandEnv, false, true)
  100. if err != nil {
  101. return err
  102. }
  103. fmt.Printf("balanceEcVolumes collections %+v\n", len(collections))
  104. for _, c := range collections {
  105. fmt.Printf("balanceEcVolumes collection %+v\n", c)
  106. if err = balanceEcVolumes(commandEnv, c, allEcNodes, racks, *applyBalancing); err != nil {
  107. return err
  108. }
  109. }
  110. } else {
  111. if err = balanceEcVolumes(commandEnv, *collection, allEcNodes, racks, *applyBalancing); err != nil {
  112. return err
  113. }
  114. }
  115. if err := balanceEcRacks(commandEnv, racks, *applyBalancing); err != nil {
  116. return fmt.Errorf("balance ec racks: %v", err)
  117. }
  118. return nil
  119. }
  120. func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack {
  121. // collect racks info
  122. racks := make(map[RackId]*EcRack)
  123. for _, ecNode := range allEcNodes {
  124. if racks[ecNode.rack] == nil {
  125. racks[ecNode.rack] = &EcRack{
  126. ecNodes: make(map[EcNodeId]*EcNode),
  127. }
  128. }
  129. racks[ecNode.rack].ecNodes[EcNodeId(ecNode.info.Id)] = ecNode
  130. racks[ecNode.rack].freeEcSlot += ecNode.freeEcSlot
  131. }
  132. return racks
  133. }
  134. func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
  135. fmt.Printf("balanceEcVolumes %s\n", collection)
  136. if err := deleteDuplicatedEcShards(commandEnv, allEcNodes, collection, applyBalancing); err != nil {
  137. return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
  138. }
  139. if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
  140. return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
  141. }
  142. if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
  143. return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
  144. }
  145. return nil
  146. }
  147. func deleteDuplicatedEcShards(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error {
  148. // vid => []ecNode
  149. vidLocations := collectVolumeIdToEcNodes(allEcNodes)
  150. // deduplicate ec shards
  151. for vid, locations := range vidLocations {
  152. if err := doDeduplicateEcShards(commandEnv, collection, vid, locations, applyBalancing); err != nil {
  153. return err
  154. }
  155. }
  156. return nil
  157. }
  158. func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
  159. // check whether this volume has ecNodes that are over average
  160. shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
  161. for _, ecNode := range locations {
  162. shardBits := findEcVolumeShards(ecNode, vid)
  163. for _, shardId := range shardBits.ShardIds() {
  164. shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
  165. }
  166. }
  167. for shardId, ecNodes := range shardToLocations {
  168. if len(ecNodes) <= 1 {
  169. continue
  170. }
  171. sortEcNodesByFreeslotsAscending(ecNodes)
  172. fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id)
  173. if !applyBalancing {
  174. continue
  175. }
  176. duplicatedShardIds := []uint32{uint32(shardId)}
  177. for _, ecNode := range ecNodes[1:] {
  178. if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
  179. return err
  180. }
  181. if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
  182. return err
  183. }
  184. ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
  185. }
  186. }
  187. return nil
  188. }
  189. func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
  190. // collect vid => []ecNode, since previous steps can change the locations
  191. vidLocations := collectVolumeIdToEcNodes(allEcNodes)
  192. // spread the ec shards evenly
  193. for vid, locations := range vidLocations {
  194. if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, applyBalancing); err != nil {
  195. return err
  196. }
  197. }
  198. return nil
  199. }
  200. func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
  201. // calculate average number of shards an ec rack should have for one volume
  202. averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
  203. // see the volume's shards are in how many racks, and how many in each rack
  204. rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
  205. shardBits := findEcVolumeShards(ecNode, vid)
  206. return string(ecNode.rack), shardBits.ShardIdCount()
  207. })
  208. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  209. return string(ecNode.rack)
  210. })
  211. // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  212. ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
  213. for rackId, count := range rackToShardCount {
  214. if count > averageShardsPerEcRack {
  215. possibleEcNodes := rackEcNodesWithVid[rackId]
  216. for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
  217. ecShardsToMove[shardId] = ecNode
  218. }
  219. }
  220. }
  221. for shardId, ecNode := range ecShardsToMove {
  222. rackId := pickOneRack(racks, rackToShardCount, averageShardsPerEcRack)
  223. if rackId == "" {
  224. fmt.Printf("ec shard %d.%d at %s can not find a destination rack\n", vid, shardId, ecNode.info.Id)
  225. continue
  226. }
  227. var possibleDestinationEcNodes []*EcNode
  228. for _, n := range racks[rackId].ecNodes {
  229. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  230. }
  231. err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
  232. if err != nil {
  233. return err
  234. }
  235. rackToShardCount[string(rackId)] += 1
  236. rackToShardCount[string(ecNode.rack)] -= 1
  237. racks[rackId].freeEcSlot -= 1
  238. racks[ecNode.rack].freeEcSlot += 1
  239. }
  240. return nil
  241. }
  242. func pickOneRack(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, averageShardsPerEcRack int) RackId {
  243. // TODO later may need to add some randomness
  244. for rackId, rack := range rackToEcNodes {
  245. if rackToShardCount[string(rackId)] >= averageShardsPerEcRack {
  246. continue
  247. }
  248. if rack.freeEcSlot <= 0 {
  249. continue
  250. }
  251. return rackId
  252. }
  253. return ""
  254. }
  255. func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
  256. // collect vid => []ecNode, since previous steps can change the locations
  257. vidLocations := collectVolumeIdToEcNodes(allEcNodes)
  258. // spread the ec shards evenly
  259. for vid, locations := range vidLocations {
  260. // see the volume's shards are in how many racks, and how many in each rack
  261. rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
  262. shardBits := findEcVolumeShards(ecNode, vid)
  263. return string(ecNode.rack), shardBits.ShardIdCount()
  264. })
  265. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  266. return string(ecNode.rack)
  267. })
  268. for rackId, _ := range rackToShardCount {
  269. var possibleDestinationEcNodes []*EcNode
  270. for _, n := range racks[RackId(rackId)].ecNodes {
  271. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  272. }
  273. sourceEcNodes := rackEcNodesWithVid[rackId]
  274. averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
  275. if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil {
  276. return err
  277. }
  278. }
  279. }
  280. return nil
  281. }
  282. func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
  283. for _, ecNode := range existingLocations {
  284. shardBits := findEcVolumeShards(ecNode, vid)
  285. overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
  286. for _, shardId := range shardBits.ShardIds() {
  287. if overLimitCount <= 0 {
  288. break
  289. }
  290. fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
  291. err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
  292. if err != nil {
  293. return err
  294. }
  295. overLimitCount--
  296. }
  297. }
  298. return nil
  299. }
  300. func balanceEcRacks(commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error {
  301. // balance one rack for all ec shards
  302. for _, ecRack := range racks {
  303. if err := doBalanceEcRack(commandEnv, ecRack, applyBalancing); err != nil {
  304. return err
  305. }
  306. }
  307. return nil
  308. }
  309. func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error {
  310. if len(ecRack.ecNodes) <= 1 {
  311. return nil
  312. }
  313. var rackEcNodes []*EcNode
  314. for _, node := range ecRack.ecNodes {
  315. rackEcNodes = append(rackEcNodes, node)
  316. }
  317. ecNodeIdToShardCount := groupByCount(rackEcNodes, func(node *EcNode) (id string, count int) {
  318. for _, ecShardInfo := range node.info.EcShardInfos {
  319. count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
  320. }
  321. return node.info.Id, count
  322. })
  323. var totalShardCount int
  324. for _, count := range ecNodeIdToShardCount {
  325. totalShardCount += count
  326. }
  327. averageShardCount := ceilDivide(totalShardCount, len(rackEcNodes))
  328. hasMove := true
  329. for hasMove {
  330. hasMove = false
  331. sort.Slice(rackEcNodes, func(i, j int) bool {
  332. return rackEcNodes[i].freeEcSlot > rackEcNodes[j].freeEcSlot
  333. })
  334. emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1]
  335. emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id]
  336. if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
  337. emptyNodeIds := make(map[uint32]bool)
  338. for _, shards := range emptyNode.info.EcShardInfos {
  339. emptyNodeIds[shards.Id] = true
  340. }
  341. for _, shards := range fullNode.info.EcShardInfos {
  342. if _, found := emptyNodeIds[shards.Id]; !found {
  343. for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
  344. fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
  345. err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
  346. if err != nil {
  347. return err
  348. }
  349. ecNodeIdToShardCount[emptyNode.info.Id]++
  350. ecNodeIdToShardCount[fullNode.info.Id]--
  351. hasMove = true
  352. break
  353. }
  354. break
  355. }
  356. }
  357. }
  358. }
  359. return nil
  360. }
  361. func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
  362. sortEcNodesByFreeslotsDecending(possibleDestinationEcNodes)
  363. for _, destEcNode := range possibleDestinationEcNodes {
  364. if destEcNode.info.Id == existingLocation.info.Id {
  365. continue
  366. }
  367. if destEcNode.freeEcSlot <= 0 {
  368. continue
  369. }
  370. if findEcVolumeShards(destEcNode, vid).ShardIdCount() >= averageShardsPerEcNode {
  371. continue
  372. }
  373. fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id)
  374. err := moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing)
  375. if err != nil {
  376. return err
  377. }
  378. return nil
  379. }
  380. return nil
  381. }
  382. func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
  383. picked := make(map[erasure_coding.ShardId]*EcNode)
  384. var candidateEcNodes []*CandidateEcNode
  385. for _, ecNode := range ecNodes {
  386. shardBits := findEcVolumeShards(ecNode, vid)
  387. if shardBits.ShardIdCount() > 0 {
  388. candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
  389. ecNode: ecNode,
  390. shardCount: shardBits.ShardIdCount(),
  391. })
  392. }
  393. }
  394. sort.Slice(candidateEcNodes, func(i, j int) bool {
  395. return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
  396. })
  397. for i := 0; i < n; i++ {
  398. selectedEcNodeIndex := -1
  399. for i, candidateEcNode := range candidateEcNodes {
  400. shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
  401. if shardBits > 0 {
  402. selectedEcNodeIndex = i
  403. for _, shardId := range shardBits.ShardIds() {
  404. candidateEcNode.shardCount--
  405. picked[shardId] = candidateEcNode.ecNode
  406. candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
  407. break
  408. }
  409. break
  410. }
  411. }
  412. if selectedEcNodeIndex >= 0 {
  413. ensureSortedEcNodes(candidateEcNodes, selectedEcNodeIndex, func(i, j int) bool {
  414. return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
  415. })
  416. }
  417. }
  418. return picked
  419. }
  420. func collectVolumeIdToEcNodes(allEcNodes []*EcNode) map[needle.VolumeId][]*EcNode {
  421. vidLocations := make(map[needle.VolumeId][]*EcNode)
  422. for _, ecNode := range allEcNodes {
  423. for _, shardInfo := range ecNode.info.EcShardInfos {
  424. vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
  425. }
  426. }
  427. return vidLocations
  428. }