You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1084 lines
33 KiB

2 months ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math/rand/v2"
  7. "sort"
  8. "sync"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/operation"
  12. "github.com/seaweedfs/seaweedfs/weed/pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  19. "golang.org/x/exp/slices"
  20. "google.golang.org/grpc"
  21. )
  22. type DataCenterId string
  23. type EcNodeId string
  24. type RackId string
  25. type EcNode struct {
  26. info *master_pb.DataNodeInfo
  27. dc DataCenterId
  28. rack RackId
  29. freeEcSlot int
  30. }
  31. type CandidateEcNode struct {
  32. ecNode *EcNode
  33. shardCount int
  34. }
  35. type EcRack struct {
  36. ecNodes map[EcNodeId]*EcNode
  37. freeEcSlot int
  38. }
  39. var (
  40. ecBalanceAlgorithmDescription = `
  41. func EcBalance() {
  42. for each collection:
  43. balanceEcVolumes(collectionName)
  44. for each rack:
  45. balanceEcRack(rack)
  46. }
  47. func balanceEcVolumes(collectionName){
  48. for each volume:
  49. doDeduplicateEcShards(volumeId)
  50. tracks rack~shardCount mapping
  51. for each volume:
  52. doBalanceEcShardsAcrossRacks(volumeId)
  53. for each volume:
  54. doBalanceEcShardsWithinRacks(volumeId)
  55. }
  56. // spread ec shards into more racks
  57. func doBalanceEcShardsAcrossRacks(volumeId){
  58. tracks rack~volumeIdShardCount mapping
  59. averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
  60. ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  61. for each ecShardsToMove {
  62. destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, ecShardReplicaPlacement)
  63. destVolumeServers = volume servers on the destRack
  64. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  65. }
  66. }
  67. func doBalanceEcShardsWithinRacks(volumeId){
  68. racks = collect all racks that the volume id is on
  69. for rack, shards := range racks
  70. doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
  71. }
  72. // move ec shards
  73. func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
  74. tracks volumeServer~volumeIdShardCount mapping
  75. averageShardCount = len(shards) / numVolumeServers
  76. volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
  77. ecShardsToMove = select overflown ec shards from volumeServersOverAverage
  78. for each ecShardsToMove {
  79. destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, ecShardReplicaPlacement)
  80. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  81. }
  82. }
  83. // move ec shards while keeping shard distribution for the same volume unchanged or more even
  84. func balanceEcRack(rack){
  85. averageShardCount = total shards / numVolumeServers
  86. for hasMovedOneEcShard {
  87. sort all volume servers ordered by the number of local ec shards
  88. pick the volume server A with the lowest number of ec shards x
  89. pick the volume server B with the highest number of ec shards y
  90. if y > averageShardCount and x +1 <= averageShardCount {
  91. if B has a ec shard with volume id v that A does not have {
  92. move one ec shard v from B to A
  93. hasMovedOneEcShard = true
  94. }
  95. }
  96. }
  97. }
  98. `
  99. // Overridable functions for testing.
  100. getDefaultReplicaPlacement = _getDefaultReplicaPlacement
  101. )
  102. func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
  103. var resp *master_pb.GetMasterConfigurationResponse
  104. var err error
  105. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  106. resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  107. return err
  108. })
  109. if err != nil {
  110. return nil, err
  111. }
  112. return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
  113. }
  114. func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) {
  115. if replicaStr != "" {
  116. rp, err := super_block.NewReplicaPlacementFromString(replicaStr)
  117. if err == nil {
  118. fmt.Printf("using replica placement %q for EC volumes\n", rp.String())
  119. }
  120. return rp, err
  121. }
  122. // No replica placement argument provided, resolve from master default settings.
  123. rp, err := getDefaultReplicaPlacement(commandEnv)
  124. if err == nil {
  125. fmt.Printf("using master default replica placement %q for EC volumes\n", rp.String())
  126. }
  127. return rp, err
  128. }
  129. func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
  130. if delayBeforeCollecting > 0 {
  131. time.Sleep(delayBeforeCollecting)
  132. }
  133. var resp *master_pb.VolumeListResponse
  134. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  135. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  136. return err
  137. })
  138. if err != nil {
  139. return
  140. }
  141. return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
  142. }
  143. func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  144. // list all possible locations
  145. // collect topology information
  146. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  147. if err != nil {
  148. return
  149. }
  150. // find out all volume servers with one slot left.
  151. ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
  152. sortEcNodesByFreeslotsDescending(ecNodes)
  153. return
  154. }
  155. func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  156. return collectEcNodesForDC(commandEnv, "")
  157. }
  158. func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string {
  159. if len(vids) == 0 {
  160. return nil
  161. }
  162. found := map[string]bool{}
  163. for _, dc := range t.DataCenterInfos {
  164. for _, r := range dc.RackInfos {
  165. for _, dn := range r.DataNodeInfos {
  166. for _, diskInfo := range dn.DiskInfos {
  167. for _, vi := range diskInfo.VolumeInfos {
  168. for _, vid := range vids {
  169. if needle.VolumeId(vi.Id) == vid && vi.Collection != "" {
  170. found[vi.Collection] = true
  171. }
  172. }
  173. }
  174. for _, ecs := range diskInfo.EcShardInfos {
  175. for _, vid := range vids {
  176. if needle.VolumeId(ecs.Id) == vid && ecs.Collection != "" {
  177. found[ecs.Collection] = true
  178. }
  179. }
  180. }
  181. }
  182. }
  183. }
  184. }
  185. if len(found) == 0 {
  186. return nil
  187. }
  188. collections := []string{}
  189. for k, _ := range found {
  190. collections = append(collections, k)
  191. }
  192. sort.Strings(collections)
  193. return collections
  194. }
  195. func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
  196. if !commandEnv.isLocked() {
  197. return fmt.Errorf("lock is lost")
  198. }
  199. copiedShardIds := []uint32{uint32(shardId)}
  200. if applyBalancing {
  201. existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
  202. // ask destination node to copy shard and the ecx file from source node, and mount it
  203. copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
  204. if err != nil {
  205. return err
  206. }
  207. // unmount the to be deleted shards
  208. err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds)
  209. if err != nil {
  210. return err
  211. }
  212. // ask source node to delete the shard, and maybe the ecx file
  213. err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds)
  214. if err != nil {
  215. return err
  216. }
  217. fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
  218. }
  219. destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
  220. existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
  221. return nil
  222. }
  223. func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
  224. targetServer *EcNode, shardIdsToCopy []uint32,
  225. volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
  226. fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  227. targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
  228. err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  229. if targetAddress != existingLocation {
  230. fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  231. _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
  232. VolumeId: uint32(volumeId),
  233. Collection: collection,
  234. ShardIds: shardIdsToCopy,
  235. CopyEcxFile: true,
  236. CopyEcjFile: true,
  237. CopyVifFile: true,
  238. SourceDataNode: string(existingLocation),
  239. })
  240. if copyErr != nil {
  241. return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
  242. }
  243. }
  244. fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
  245. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  246. VolumeId: uint32(volumeId),
  247. Collection: collection,
  248. ShardIds: shardIdsToCopy,
  249. })
  250. if mountErr != nil {
  251. return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
  252. }
  253. if targetAddress != existingLocation {
  254. copiedShardIds = shardIdsToCopy
  255. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
  256. }
  257. return nil
  258. })
  259. if err != nil {
  260. return
  261. }
  262. return
  263. }
  264. func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo)) {
  265. for _, dc := range topo.DataCenterInfos {
  266. for _, rack := range dc.RackInfos {
  267. for _, dn := range rack.DataNodeInfos {
  268. fn(DataCenterId(dc.Id), RackId(rack.Id), dn)
  269. }
  270. }
  271. }
  272. }
  273. func sortEcNodesByFreeslotsDescending(ecNodes []*EcNode) {
  274. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  275. return b.freeEcSlot - a.freeEcSlot
  276. })
  277. }
  278. func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
  279. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  280. return a.freeEcSlot - b.freeEcSlot
  281. })
  282. }
  283. // if the index node changed the freeEcSlot, need to keep every EcNode still sorted
  284. func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
  285. for i := index - 1; i >= 0; i-- {
  286. if lessThan(i+1, i) {
  287. swap(data, i, i+1)
  288. } else {
  289. break
  290. }
  291. }
  292. for i := index + 1; i < len(data); i++ {
  293. if lessThan(i, i-1) {
  294. swap(data, i, i-1)
  295. } else {
  296. break
  297. }
  298. }
  299. }
  300. func swap(data []*CandidateEcNode, i, j int) {
  301. t := data[i]
  302. data[i] = data[j]
  303. data[j] = t
  304. }
  305. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  306. for _, ecShardInfo := range ecShardInfos {
  307. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  308. count += shardBits.ShardIdCount()
  309. }
  310. return
  311. }
  312. func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
  313. if dn.DiskInfos == nil {
  314. return 0
  315. }
  316. diskInfo := dn.DiskInfos[string(diskType)]
  317. if diskInfo == nil {
  318. return 0
  319. }
  320. return int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
  321. }
  322. func (ecNode *EcNode) localShardIdCount(vid uint32) int {
  323. for _, diskInfo := range ecNode.info.DiskInfos {
  324. for _, ecShardInfo := range diskInfo.EcShardInfos {
  325. if vid == ecShardInfo.Id {
  326. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  327. return shardBits.ShardIdCount()
  328. }
  329. }
  330. }
  331. return 0
  332. }
  333. func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
  334. eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
  335. if selectedDataCenter != "" && selectedDataCenter != string(dc) {
  336. return
  337. }
  338. freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
  339. ecNodes = append(ecNodes, &EcNode{
  340. info: dn,
  341. dc: dc,
  342. rack: rack,
  343. freeEcSlot: int(freeEcSlots),
  344. })
  345. totalFreeEcSlots += freeEcSlots
  346. })
  347. return
  348. }
  349. func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error {
  350. fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
  351. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  352. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
  353. VolumeId: uint32(volumeId),
  354. Collection: collection,
  355. ShardIds: toBeDeletedShardIds,
  356. })
  357. return deleteErr
  358. })
  359. }
  360. func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error {
  361. fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
  362. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  363. _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
  364. VolumeId: uint32(volumeId),
  365. ShardIds: toBeUnmountedhardIds,
  366. })
  367. return deleteErr
  368. })
  369. }
  370. func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error {
  371. fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
  372. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  373. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  374. VolumeId: uint32(volumeId),
  375. Collection: collection,
  376. ShardIds: toBeMountedhardIds,
  377. })
  378. return mountErr
  379. })
  380. }
  381. func ceilDivide(a, b int) int {
  382. var r int
  383. if (a % b) != 0 {
  384. r = 1
  385. }
  386. return (a / b) + r
  387. }
  388. func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
  389. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  390. for _, shardInfo := range diskInfo.EcShardInfos {
  391. if needle.VolumeId(shardInfo.Id) == vid {
  392. return erasure_coding.ShardBits(shardInfo.EcIndexBits)
  393. }
  394. }
  395. }
  396. return 0
  397. }
  398. func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
  399. foundVolume := false
  400. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  401. if found {
  402. for _, shardInfo := range diskInfo.EcShardInfos {
  403. if needle.VolumeId(shardInfo.Id) == vid {
  404. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  405. newShardBits := oldShardBits
  406. for _, shardId := range shardIds {
  407. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  408. }
  409. shardInfo.EcIndexBits = uint32(newShardBits)
  410. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  411. foundVolume = true
  412. break
  413. }
  414. }
  415. } else {
  416. diskInfo = &master_pb.DiskInfo{
  417. Type: string(types.HardDriveType),
  418. }
  419. ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
  420. }
  421. if !foundVolume {
  422. var newShardBits erasure_coding.ShardBits
  423. for _, shardId := range shardIds {
  424. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  425. }
  426. diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
  427. Id: uint32(vid),
  428. Collection: collection,
  429. EcIndexBits: uint32(newShardBits),
  430. DiskType: string(types.HardDriveType),
  431. })
  432. ecNode.freeEcSlot -= len(shardIds)
  433. }
  434. return ecNode
  435. }
  436. func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
  437. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  438. for _, shardInfo := range diskInfo.EcShardInfos {
  439. if needle.VolumeId(shardInfo.Id) == vid {
  440. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  441. newShardBits := oldShardBits
  442. for _, shardId := range shardIds {
  443. newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
  444. }
  445. shardInfo.EcIndexBits = uint32(newShardBits)
  446. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  447. }
  448. }
  449. }
  450. return ecNode
  451. }
  452. func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
  453. countMap := make(map[string]int)
  454. for _, d := range data {
  455. id, count := identifierFn(d)
  456. countMap[id] += count
  457. }
  458. return countMap
  459. }
  460. func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
  461. groupMap := make(map[string][]*EcNode)
  462. for _, d := range data {
  463. id := identifierFn(d)
  464. groupMap[id] = append(groupMap[id], d)
  465. }
  466. return groupMap
  467. }
  468. type ecBalancer struct {
  469. commandEnv *CommandEnv
  470. ecNodes []*EcNode
  471. replicaPlacement *super_block.ReplicaPlacement
  472. applyBalancing bool
  473. parallelize bool
  474. wg *sync.WaitGroup
  475. wgErrors []error
  476. }
  477. type ecBalancerTask func() error
  478. func (ecb *ecBalancer) wgInit() {
  479. if ecb.wg != nil {
  480. return
  481. }
  482. ecb.wg = &sync.WaitGroup{}
  483. ecb.wgErrors = nil
  484. }
  485. func (ecb *ecBalancer) wgAdd(f ecBalancerTask) {
  486. wrapper := func() {
  487. if ecb.wg != nil {
  488. defer ecb.wg.Done()
  489. }
  490. if err := f(); err != nil {
  491. ecb.wgErrors = append(ecb.wgErrors, err)
  492. }
  493. }
  494. if ecb.wg == nil || !ecb.parallelize {
  495. wrapper()
  496. return
  497. }
  498. ecb.wg.Add(1)
  499. go wrapper()
  500. }
  501. func (ecb *ecBalancer) wgWait() error {
  502. if ecb.wg != nil {
  503. ecb.wg.Wait()
  504. }
  505. err := errors.Join(ecb.wgErrors...)
  506. ecb.wg = nil
  507. ecb.wgErrors = nil
  508. return err
  509. }
  510. func (ecb *ecBalancer) racks() map[RackId]*EcRack {
  511. racks := make(map[RackId]*EcRack)
  512. for _, ecNode := range ecb.ecNodes {
  513. if racks[ecNode.rack] == nil {
  514. racks[ecNode.rack] = &EcRack{
  515. ecNodes: make(map[EcNodeId]*EcNode),
  516. }
  517. }
  518. racks[ecNode.rack].ecNodes[EcNodeId(ecNode.info.Id)] = ecNode
  519. racks[ecNode.rack].freeEcSlot += ecNode.freeEcSlot
  520. }
  521. return racks
  522. }
  523. func (ecb *ecBalancer) balanceEcVolumes(collection string) error {
  524. fmt.Printf("balanceEcVolumes %s\n", collection)
  525. if err := ecb.deleteDuplicatedEcShards(collection); err != nil {
  526. return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
  527. }
  528. if err := ecb.balanceEcShardsAcrossRacks(collection); err != nil {
  529. return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
  530. }
  531. if err := ecb.balanceEcShardsWithinRacks(collection); err != nil {
  532. return fmt.Errorf("balance within racks collection %s ec shards: %v", collection, err)
  533. }
  534. return nil
  535. }
  536. func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error {
  537. vidLocations := ecb.collectVolumeIdToEcNodes(collection)
  538. ecb.wgInit()
  539. for vid, locations := range vidLocations {
  540. ecb.wgAdd(func() error {
  541. return ecb.doDeduplicateEcShards(collection, vid, locations)
  542. })
  543. }
  544. return ecb.wgWait()
  545. }
  546. func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error {
  547. // check whether this volume has ecNodes that are over average
  548. shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
  549. for _, ecNode := range locations {
  550. shardBits := findEcVolumeShards(ecNode, vid)
  551. for _, shardId := range shardBits.ShardIds() {
  552. shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
  553. }
  554. }
  555. for shardId, ecNodes := range shardToLocations {
  556. if len(ecNodes) <= 1 {
  557. continue
  558. }
  559. sortEcNodesByFreeslotsAscending(ecNodes)
  560. fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id)
  561. if !ecb.applyBalancing {
  562. continue
  563. }
  564. duplicatedShardIds := []uint32{uint32(shardId)}
  565. for _, ecNode := range ecNodes[1:] {
  566. if err := unmountEcShards(ecb.commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  567. return err
  568. }
  569. if err := sourceServerDeleteEcShards(ecb.commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  570. return err
  571. }
  572. ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
  573. }
  574. }
  575. return nil
  576. }
  577. // TODO: enable parallelization
  578. func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
  579. // collect vid => []ecNode, since previous steps can change the locations
  580. vidLocations := ecb.collectVolumeIdToEcNodes(collection)
  581. // spread the ec shards evenly
  582. for vid, locations := range vidLocations {
  583. if err := ecb.doBalanceEcShardsAcrossRacks(collection, vid, locations); err != nil {
  584. return err
  585. }
  586. }
  587. return nil
  588. }
  589. func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
  590. return groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
  591. shardBits := findEcVolumeShards(ecNode, vid)
  592. return string(ecNode.rack), shardBits.ShardIdCount()
  593. })
  594. }
  595. func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error {
  596. racks := ecb.racks()
  597. // calculate average number of shards an ec rack should have for one volume
  598. averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
  599. // see the volume's shards are in how many racks, and how many in each rack
  600. rackToShardCount := countShardsByRack(vid, locations)
  601. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  602. return string(ecNode.rack)
  603. })
  604. // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  605. ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
  606. for rackId, count := range rackToShardCount {
  607. if count <= averageShardsPerEcRack {
  608. continue
  609. }
  610. possibleEcNodes := rackEcNodesWithVid[rackId]
  611. for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
  612. ecShardsToMove[shardId] = ecNode
  613. }
  614. }
  615. for shardId, ecNode := range ecShardsToMove {
  616. rackId, err := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount)
  617. if err != nil {
  618. fmt.Printf("ec shard %d.%d at %s can not find a destination rack:\n%s\n", vid, shardId, ecNode.info.Id, err.Error())
  619. continue
  620. }
  621. var possibleDestinationEcNodes []*EcNode
  622. for _, n := range racks[rackId].ecNodes {
  623. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  624. }
  625. err = ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes)
  626. if err != nil {
  627. return err
  628. }
  629. rackToShardCount[string(rackId)] += 1
  630. rackToShardCount[string(ecNode.rack)] -= 1
  631. racks[rackId].freeEcSlot -= 1
  632. racks[ecNode.rack].freeEcSlot += 1
  633. }
  634. return nil
  635. }
  636. func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int) (RackId, error) {
  637. targets := []RackId{}
  638. targetShards := -1
  639. for _, shards := range rackToShardCount {
  640. if shards > targetShards {
  641. targetShards = shards
  642. }
  643. }
  644. details := ""
  645. for rackId, rack := range rackToEcNodes {
  646. shards := rackToShardCount[string(rackId)]
  647. if rack.freeEcSlot <= 0 {
  648. details += fmt.Sprintf(" Skipped %s because it has no free slots\n", rackId)
  649. continue
  650. }
  651. if ecb.replicaPlacement != nil && shards >= ecb.replicaPlacement.DiffRackCount {
  652. details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for other racks (%d)\n", rackId, shards, ecb.replicaPlacement.DiffRackCount)
  653. continue
  654. }
  655. if shards < targetShards {
  656. // Favor racks with less shards, to ensure an uniform distribution.
  657. targets = nil
  658. targetShards = shards
  659. }
  660. if shards == targetShards {
  661. targets = append(targets, rackId)
  662. }
  663. }
  664. if len(targets) == 0 {
  665. return "", errors.New(details)
  666. }
  667. return targets[rand.IntN(len(targets))], nil
  668. }
  669. // TODO: enable parallelization
  670. func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
  671. // collect vid => []ecNode, since previous steps can change the locations
  672. vidLocations := ecb.collectVolumeIdToEcNodes(collection)
  673. racks := ecb.racks()
  674. // spread the ec shards evenly
  675. for vid, locations := range vidLocations {
  676. // see the volume's shards are in how many racks, and how many in each rack
  677. rackToShardCount := countShardsByRack(vid, locations)
  678. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  679. return string(ecNode.rack)
  680. })
  681. for rackId, _ := range rackToShardCount {
  682. var possibleDestinationEcNodes []*EcNode
  683. for _, n := range racks[RackId(rackId)].ecNodes {
  684. if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
  685. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  686. }
  687. }
  688. sourceEcNodes := rackEcNodesWithVid[rackId]
  689. averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
  690. if err := ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes); err != nil {
  691. return err
  692. }
  693. }
  694. }
  695. return nil
  696. }
  697. func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error {
  698. for _, ecNode := range existingLocations {
  699. shardBits := findEcVolumeShards(ecNode, vid)
  700. overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
  701. for _, shardId := range shardBits.ShardIds() {
  702. if overLimitCount <= 0 {
  703. break
  704. }
  705. fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
  706. err := ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes)
  707. if err != nil {
  708. return err
  709. }
  710. overLimitCount--
  711. }
  712. }
  713. return nil
  714. }
  715. func (ecb *ecBalancer) balanceEcRacks() error {
  716. // balance one rack for all ec shards
  717. for _, ecRack := range ecb.racks() {
  718. if err := ecb.doBalanceEcRack(ecRack); err != nil {
  719. return err
  720. }
  721. }
  722. return nil
  723. }
  724. // TODO: enable parallelization
  725. func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
  726. if len(ecRack.ecNodes) <= 1 {
  727. return nil
  728. }
  729. var rackEcNodes []*EcNode
  730. for _, node := range ecRack.ecNodes {
  731. rackEcNodes = append(rackEcNodes, node)
  732. }
  733. ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
  734. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  735. if !found {
  736. return
  737. }
  738. for _, ecShardInfo := range diskInfo.EcShardInfos {
  739. count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
  740. }
  741. return ecNode.info.Id, count
  742. })
  743. var totalShardCount int
  744. for _, count := range ecNodeIdToShardCount {
  745. totalShardCount += count
  746. }
  747. averageShardCount := ceilDivide(totalShardCount, len(rackEcNodes))
  748. hasMove := true
  749. for hasMove {
  750. hasMove = false
  751. slices.SortFunc(rackEcNodes, func(a, b *EcNode) int {
  752. return b.freeEcSlot - a.freeEcSlot
  753. })
  754. emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1]
  755. emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id]
  756. if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
  757. emptyNodeIds := make(map[uint32]bool)
  758. if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
  759. for _, shards := range emptyDiskInfo.EcShardInfos {
  760. emptyNodeIds[shards.Id] = true
  761. }
  762. }
  763. if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
  764. for _, shards := range fullDiskInfo.EcShardInfos {
  765. if _, found := emptyNodeIds[shards.Id]; !found {
  766. for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
  767. fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
  768. err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing)
  769. if err != nil {
  770. return err
  771. }
  772. ecNodeIdToShardCount[emptyNode.info.Id]++
  773. ecNodeIdToShardCount[fullNode.info.Id]--
  774. hasMove = true
  775. break
  776. }
  777. break
  778. }
  779. }
  780. }
  781. }
  782. }
  783. return nil
  784. }
  785. func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, error) {
  786. if existingLocation == nil {
  787. return nil, fmt.Errorf("INTERNAL: missing source nodes")
  788. }
  789. if len(possibleDestinations) == 0 {
  790. return nil, fmt.Errorf("INTERNAL: missing destination nodes")
  791. }
  792. nodeShards := map[*EcNode]int{}
  793. for _, node := range possibleDestinations {
  794. nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount()
  795. }
  796. targets := []*EcNode{}
  797. targetShards := -1
  798. for _, shards := range nodeShards {
  799. if shards > targetShards {
  800. targetShards = shards
  801. }
  802. }
  803. details := ""
  804. for _, node := range possibleDestinations {
  805. if node.info.Id == existingLocation.info.Id {
  806. continue
  807. }
  808. if node.freeEcSlot <= 0 {
  809. details += fmt.Sprintf(" Skipped %s because it has no free slots\n", node.info.Id)
  810. continue
  811. }
  812. shards := nodeShards[node]
  813. if ecb.replicaPlacement != nil && shards >= ecb.replicaPlacement.SameRackCount {
  814. details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for the rack (%d)\n", node.info.Id, shards, ecb.replicaPlacement.SameRackCount)
  815. continue
  816. }
  817. if shards < targetShards {
  818. // Favor nodes with less shards, to ensure an uniform distribution.
  819. targets = nil
  820. targetShards = shards
  821. }
  822. if shards == targetShards {
  823. targets = append(targets, node)
  824. }
  825. }
  826. if len(targets) == 0 {
  827. return nil, errors.New(details)
  828. }
  829. return targets[rand.IntN(len(targets))], nil
  830. }
  831. func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error {
  832. destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
  833. if err != nil {
  834. fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error())
  835. return nil
  836. }
  837. fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
  838. return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing)
  839. }
  840. func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
  841. picked := make(map[erasure_coding.ShardId]*EcNode)
  842. var candidateEcNodes []*CandidateEcNode
  843. for _, ecNode := range ecNodes {
  844. shardBits := findEcVolumeShards(ecNode, vid)
  845. if shardBits.ShardIdCount() > 0 {
  846. candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
  847. ecNode: ecNode,
  848. shardCount: shardBits.ShardIdCount(),
  849. })
  850. }
  851. }
  852. slices.SortFunc(candidateEcNodes, func(a, b *CandidateEcNode) int {
  853. return b.shardCount - a.shardCount
  854. })
  855. for i := 0; i < n; i++ {
  856. selectedEcNodeIndex := -1
  857. for i, candidateEcNode := range candidateEcNodes {
  858. shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
  859. if shardBits > 0 {
  860. selectedEcNodeIndex = i
  861. for _, shardId := range shardBits.ShardIds() {
  862. candidateEcNode.shardCount--
  863. picked[shardId] = candidateEcNode.ecNode
  864. candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
  865. break
  866. }
  867. break
  868. }
  869. }
  870. if selectedEcNodeIndex >= 0 {
  871. ensureSortedEcNodes(candidateEcNodes, selectedEcNodeIndex, func(i, j int) bool {
  872. return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
  873. })
  874. }
  875. }
  876. return picked
  877. }
  878. func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.VolumeId][]*EcNode {
  879. vidLocations := make(map[needle.VolumeId][]*EcNode)
  880. for _, ecNode := range ecb.ecNodes {
  881. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  882. if !found {
  883. continue
  884. }
  885. for _, shardInfo := range diskInfo.EcShardInfos {
  886. // ignore if not in current collection
  887. if shardInfo.Collection == collection {
  888. vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
  889. }
  890. }
  891. }
  892. return vidLocations
  893. }
  894. func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, parallelize bool, applyBalancing bool) (err error) {
  895. if len(collections) == 0 {
  896. return fmt.Errorf("no collections to balance")
  897. }
  898. // collect all ec nodes
  899. allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc)
  900. if err != nil {
  901. return err
  902. }
  903. if totalFreeEcSlots < 1 {
  904. return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots)
  905. }
  906. ecb := &ecBalancer{
  907. commandEnv: commandEnv,
  908. ecNodes: allEcNodes,
  909. replicaPlacement: ecReplicaPlacement,
  910. applyBalancing: applyBalancing,
  911. parallelize: parallelize,
  912. }
  913. for _, c := range collections {
  914. if err = ecb.balanceEcVolumes(c); err != nil {
  915. return err
  916. }
  917. }
  918. if err := ecb.balanceEcRacks(); err != nil {
  919. return fmt.Errorf("balance ec racks: %v", err)
  920. }
  921. return nil
  922. }