You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

535 lines
17 KiB

4 years ago
6 years ago
  1. package shell
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "github.com/chrislusf/seaweedfs/weed/storage/types"
  7. "io"
  8. "sort"
  9. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  10. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  11. )
  12. func init() {
  13. Commands = append(Commands, &commandEcBalance{})
  14. }
  15. type commandEcBalance struct {
  16. }
  17. func (c *commandEcBalance) Name() string {
  18. return "ec.balance"
  19. }
  20. func (c *commandEcBalance) Help() string {
  21. return `balance all ec shards among all racks and volume servers
  22. ec.balance [-c EACH_COLLECTION|<collection_name>] [-force] [-dataCenter <data_center>]
  23. Algorithm:
  24. func EcBalance() {
  25. for each collection:
  26. balanceEcVolumes(collectionName)
  27. for each rack:
  28. balanceEcRack(rack)
  29. }
  30. func balanceEcVolumes(collectionName){
  31. for each volume:
  32. doDeduplicateEcShards(volumeId)
  33. tracks rack~shardCount mapping
  34. for each volume:
  35. doBalanceEcShardsAcrossRacks(volumeId)
  36. for each volume:
  37. doBalanceEcShardsWithinRacks(volumeId)
  38. }
  39. // spread ec shards into more racks
  40. func doBalanceEcShardsAcrossRacks(volumeId){
  41. tracks rack~volumeIdShardCount mapping
  42. averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
  43. ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  44. for each ecShardsToMove {
  45. destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, averageShardsPerEcRack)
  46. destVolumeServers = volume servers on the destRack
  47. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  48. }
  49. }
  50. func doBalanceEcShardsWithinRacks(volumeId){
  51. racks = collect all racks that the volume id is on
  52. for rack, shards := range racks
  53. doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
  54. }
  55. // move ec shards
  56. func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
  57. tracks volumeServer~volumeIdShardCount mapping
  58. averageShardCount = len(shards) / numVolumeServers
  59. volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
  60. ecShardsToMove = select overflown ec shards from volumeServersOverAverage
  61. for each ecShardsToMove {
  62. destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, averageShardCount)
  63. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  64. }
  65. }
  66. // move ec shards while keeping shard distribution for the same volume unchanged or more even
  67. func balanceEcRack(rack){
  68. averageShardCount = total shards / numVolumeServers
  69. for hasMovedOneEcShard {
  70. sort all volume servers ordered by the number of local ec shards
  71. pick the volume server A with the lowest number of ec shards x
  72. pick the volume server B with the highest number of ec shards y
  73. if y > averageShardCount and x +1 <= averageShardCount {
  74. if B has a ec shard with volume id v that A does not have {
  75. move one ec shard v from B to A
  76. hasMovedOneEcShard = true
  77. }
  78. }
  79. }
  80. }
  81. `
  82. }
  83. func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  84. balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  85. collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
  86. dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
  87. applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan")
  88. if err = balanceCommand.Parse(args); err != nil {
  89. return nil
  90. }
  91. if err = commandEnv.confirmIsLocked(); err != nil {
  92. return
  93. }
  94. // collect all ec nodes
  95. allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, *dc)
  96. if err != nil {
  97. return err
  98. }
  99. if totalFreeEcSlots < 1 {
  100. return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots)
  101. }
  102. racks := collectRacks(allEcNodes)
  103. if *collection == "EACH_COLLECTION" {
  104. collections, err := ListCollectionNames(commandEnv, false, true)
  105. if err != nil {
  106. return err
  107. }
  108. fmt.Printf("balanceEcVolumes collections %+v\n", len(collections))
  109. for _, c := range collections {
  110. fmt.Printf("balanceEcVolumes collection %+v\n", c)
  111. if err = balanceEcVolumes(commandEnv, c, allEcNodes, racks, *applyBalancing); err != nil {
  112. return err
  113. }
  114. }
  115. } else {
  116. if err = balanceEcVolumes(commandEnv, *collection, allEcNodes, racks, *applyBalancing); err != nil {
  117. return err
  118. }
  119. }
  120. if err := balanceEcRacks(commandEnv, racks, *applyBalancing); err != nil {
  121. return fmt.Errorf("balance ec racks: %v", err)
  122. }
  123. return nil
  124. }
  125. func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack {
  126. // collect racks info
  127. racks := make(map[RackId]*EcRack)
  128. for _, ecNode := range allEcNodes {
  129. if racks[ecNode.rack] == nil {
  130. racks[ecNode.rack] = &EcRack{
  131. ecNodes: make(map[EcNodeId]*EcNode),
  132. }
  133. }
  134. racks[ecNode.rack].ecNodes[EcNodeId(ecNode.info.Id)] = ecNode
  135. racks[ecNode.rack].freeEcSlot += ecNode.freeEcSlot
  136. }
  137. return racks
  138. }
  139. func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
  140. fmt.Printf("balanceEcVolumes %s\n", collection)
  141. if err := deleteDuplicatedEcShards(commandEnv, allEcNodes, collection, applyBalancing); err != nil {
  142. return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
  143. }
  144. if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
  145. return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
  146. }
  147. if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
  148. return fmt.Errorf("balance within racks collection %s ec shards: %v", collection, err)
  149. }
  150. return nil
  151. }
  152. func deleteDuplicatedEcShards(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error {
  153. // vid => []ecNode
  154. vidLocations := collectVolumeIdToEcNodes(allEcNodes)
  155. // deduplicate ec shards
  156. for vid, locations := range vidLocations {
  157. if err := doDeduplicateEcShards(commandEnv, collection, vid, locations, applyBalancing); err != nil {
  158. return err
  159. }
  160. }
  161. return nil
  162. }
  163. func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
  164. // check whether this volume has ecNodes that are over average
  165. shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
  166. for _, ecNode := range locations {
  167. shardBits := findEcVolumeShards(ecNode, vid)
  168. for _, shardId := range shardBits.ShardIds() {
  169. shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
  170. }
  171. }
  172. for shardId, ecNodes := range shardToLocations {
  173. if len(ecNodes) <= 1 {
  174. continue
  175. }
  176. sortEcNodesByFreeslotsAscending(ecNodes)
  177. fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id)
  178. if !applyBalancing {
  179. continue
  180. }
  181. duplicatedShardIds := []uint32{uint32(shardId)}
  182. for _, ecNode := range ecNodes[1:] {
  183. if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  184. return err
  185. }
  186. if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  187. return err
  188. }
  189. ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
  190. }
  191. }
  192. return nil
  193. }
  194. func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
  195. // collect vid => []ecNode, since previous steps can change the locations
  196. vidLocations := collectVolumeIdToEcNodes(allEcNodes)
  197. // spread the ec shards evenly
  198. for vid, locations := range vidLocations {
  199. if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, applyBalancing); err != nil {
  200. return err
  201. }
  202. }
  203. return nil
  204. }
  205. func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
  206. // calculate average number of shards an ec rack should have for one volume
  207. averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
  208. // see the volume's shards are in how many racks, and how many in each rack
  209. rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
  210. shardBits := findEcVolumeShards(ecNode, vid)
  211. return string(ecNode.rack), shardBits.ShardIdCount()
  212. })
  213. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  214. return string(ecNode.rack)
  215. })
  216. // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  217. ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
  218. for rackId, count := range rackToShardCount {
  219. if count > averageShardsPerEcRack {
  220. possibleEcNodes := rackEcNodesWithVid[rackId]
  221. for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
  222. ecShardsToMove[shardId] = ecNode
  223. }
  224. }
  225. }
  226. for shardId, ecNode := range ecShardsToMove {
  227. rackId := pickOneRack(racks, rackToShardCount, averageShardsPerEcRack)
  228. if rackId == "" {
  229. fmt.Printf("ec shard %d.%d at %s can not find a destination rack\n", vid, shardId, ecNode.info.Id)
  230. continue
  231. }
  232. var possibleDestinationEcNodes []*EcNode
  233. for _, n := range racks[rackId].ecNodes {
  234. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  235. }
  236. err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
  237. if err != nil {
  238. return err
  239. }
  240. rackToShardCount[string(rackId)] += 1
  241. rackToShardCount[string(ecNode.rack)] -= 1
  242. racks[rackId].freeEcSlot -= 1
  243. racks[ecNode.rack].freeEcSlot += 1
  244. }
  245. return nil
  246. }
  247. func pickOneRack(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, averageShardsPerEcRack int) RackId {
  248. // TODO later may need to add some randomness
  249. for rackId, rack := range rackToEcNodes {
  250. if rackToShardCount[string(rackId)] >= averageShardsPerEcRack {
  251. continue
  252. }
  253. if rack.freeEcSlot <= 0 {
  254. continue
  255. }
  256. return rackId
  257. }
  258. return ""
  259. }
  260. func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
  261. // collect vid => []ecNode, since previous steps can change the locations
  262. vidLocations := collectVolumeIdToEcNodes(allEcNodes)
  263. // spread the ec shards evenly
  264. for vid, locations := range vidLocations {
  265. // see the volume's shards are in how many racks, and how many in each rack
  266. rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
  267. shardBits := findEcVolumeShards(ecNode, vid)
  268. return string(ecNode.rack), shardBits.ShardIdCount()
  269. })
  270. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  271. return string(ecNode.rack)
  272. })
  273. for rackId, _ := range rackToShardCount {
  274. var possibleDestinationEcNodes []*EcNode
  275. for _, n := range racks[RackId(rackId)].ecNodes {
  276. if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
  277. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  278. }
  279. }
  280. sourceEcNodes := rackEcNodesWithVid[rackId]
  281. averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
  282. if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil {
  283. return err
  284. }
  285. }
  286. }
  287. return nil
  288. }
  289. func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
  290. for _, ecNode := range existingLocations {
  291. shardBits := findEcVolumeShards(ecNode, vid)
  292. overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
  293. for _, shardId := range shardBits.ShardIds() {
  294. if overLimitCount <= 0 {
  295. break
  296. }
  297. fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
  298. err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
  299. if err != nil {
  300. return err
  301. }
  302. overLimitCount--
  303. }
  304. }
  305. return nil
  306. }
  307. func balanceEcRacks(commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error {
  308. // balance one rack for all ec shards
  309. for _, ecRack := range racks {
  310. if err := doBalanceEcRack(commandEnv, ecRack, applyBalancing); err != nil {
  311. return err
  312. }
  313. }
  314. return nil
  315. }
  316. func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error {
  317. if len(ecRack.ecNodes) <= 1 {
  318. return nil
  319. }
  320. var rackEcNodes []*EcNode
  321. for _, node := range ecRack.ecNodes {
  322. rackEcNodes = append(rackEcNodes, node)
  323. }
  324. ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
  325. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  326. if !found {
  327. return
  328. }
  329. for _, ecShardInfo := range diskInfo.EcShardInfos {
  330. count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
  331. }
  332. return ecNode.info.Id, count
  333. })
  334. var totalShardCount int
  335. for _, count := range ecNodeIdToShardCount {
  336. totalShardCount += count
  337. }
  338. averageShardCount := ceilDivide(totalShardCount, len(rackEcNodes))
  339. hasMove := true
  340. for hasMove {
  341. hasMove = false
  342. sort.Slice(rackEcNodes, func(i, j int) bool {
  343. return rackEcNodes[i].freeEcSlot > rackEcNodes[j].freeEcSlot
  344. })
  345. emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1]
  346. emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id]
  347. if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
  348. emptyNodeIds := make(map[uint32]bool)
  349. if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
  350. for _, shards := range emptyDiskInfo.EcShardInfos {
  351. emptyNodeIds[shards.Id] = true
  352. }
  353. }
  354. if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
  355. for _, shards := range fullDiskInfo.EcShardInfos {
  356. if _, found := emptyNodeIds[shards.Id]; !found {
  357. for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
  358. fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
  359. err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
  360. if err != nil {
  361. return err
  362. }
  363. ecNodeIdToShardCount[emptyNode.info.Id]++
  364. ecNodeIdToShardCount[fullNode.info.Id]--
  365. hasMove = true
  366. break
  367. }
  368. break
  369. }
  370. }
  371. }
  372. }
  373. }
  374. return nil
  375. }
  376. func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
  377. sortEcNodesByFreeslotsDecending(possibleDestinationEcNodes)
  378. for _, destEcNode := range possibleDestinationEcNodes {
  379. if destEcNode.info.Id == existingLocation.info.Id {
  380. continue
  381. }
  382. if destEcNode.freeEcSlot <= 0 {
  383. continue
  384. }
  385. if findEcVolumeShards(destEcNode, vid).ShardIdCount() >= averageShardsPerEcNode {
  386. continue
  387. }
  388. fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id)
  389. err := moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing)
  390. if err != nil {
  391. return err
  392. }
  393. return nil
  394. }
  395. return nil
  396. }
  397. func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
  398. picked := make(map[erasure_coding.ShardId]*EcNode)
  399. var candidateEcNodes []*CandidateEcNode
  400. for _, ecNode := range ecNodes {
  401. shardBits := findEcVolumeShards(ecNode, vid)
  402. if shardBits.ShardIdCount() > 0 {
  403. candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
  404. ecNode: ecNode,
  405. shardCount: shardBits.ShardIdCount(),
  406. })
  407. }
  408. }
  409. sort.Slice(candidateEcNodes, func(i, j int) bool {
  410. return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
  411. })
  412. for i := 0; i < n; i++ {
  413. selectedEcNodeIndex := -1
  414. for i, candidateEcNode := range candidateEcNodes {
  415. shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
  416. if shardBits > 0 {
  417. selectedEcNodeIndex = i
  418. for _, shardId := range shardBits.ShardIds() {
  419. candidateEcNode.shardCount--
  420. picked[shardId] = candidateEcNode.ecNode
  421. candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
  422. break
  423. }
  424. break
  425. }
  426. }
  427. if selectedEcNodeIndex >= 0 {
  428. ensureSortedEcNodes(candidateEcNodes, selectedEcNodeIndex, func(i, j int) bool {
  429. return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
  430. })
  431. }
  432. }
  433. return picked
  434. }
  435. func collectVolumeIdToEcNodes(allEcNodes []*EcNode) map[needle.VolumeId][]*EcNode {
  436. vidLocations := make(map[needle.VolumeId][]*EcNode)
  437. for _, ecNode := range allEcNodes {
  438. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  439. if !found {
  440. continue
  441. }
  442. for _, shardInfo := range diskInfo.EcShardInfos {
  443. vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
  444. }
  445. }
  446. return vidLocations
  447. }