You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1083 lines
33 KiB

2 months ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math/rand/v2"
  7. "sort"
  8. "sync"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/operation"
  12. "github.com/seaweedfs/seaweedfs/weed/pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  19. "golang.org/x/exp/slices"
  20. "google.golang.org/grpc"
  21. )
  22. type DataCenterId string
  23. type EcNodeId string
  24. type RackId string
  25. type EcNode struct {
  26. info *master_pb.DataNodeInfo
  27. dc DataCenterId
  28. rack RackId
  29. freeEcSlot int
  30. }
  31. type CandidateEcNode struct {
  32. ecNode *EcNode
  33. shardCount int
  34. }
  35. type EcRack struct {
  36. ecNodes map[EcNodeId]*EcNode
  37. freeEcSlot int
  38. }
  39. var (
  40. ecBalanceAlgorithmDescription = `
  41. func EcBalance() {
  42. for each collection:
  43. balanceEcVolumes(collectionName)
  44. for each rack:
  45. balanceEcRack(rack)
  46. }
  47. func balanceEcVolumes(collectionName){
  48. for each volume:
  49. doDeduplicateEcShards(volumeId)
  50. tracks rack~shardCount mapping
  51. for each volume:
  52. doBalanceEcShardsAcrossRacks(volumeId)
  53. for each volume:
  54. doBalanceEcShardsWithinRacks(volumeId)
  55. }
  56. // spread ec shards into more racks
  57. func doBalanceEcShardsAcrossRacks(volumeId){
  58. tracks rack~volumeIdShardCount mapping
  59. averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
  60. ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  61. for each ecShardsToMove {
  62. destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, ecShardReplicaPlacement)
  63. destVolumeServers = volume servers on the destRack
  64. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  65. }
  66. }
  67. func doBalanceEcShardsWithinRacks(volumeId){
  68. racks = collect all racks that the volume id is on
  69. for rack, shards := range racks
  70. doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
  71. }
  72. // move ec shards
  73. func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
  74. tracks volumeServer~volumeIdShardCount mapping
  75. averageShardCount = len(shards) / numVolumeServers
  76. volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
  77. ecShardsToMove = select overflown ec shards from volumeServersOverAverage
  78. for each ecShardsToMove {
  79. destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, ecShardReplicaPlacement)
  80. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  81. }
  82. }
  83. // move ec shards while keeping shard distribution for the same volume unchanged or more even
  84. func balanceEcRack(rack){
  85. averageShardCount = total shards / numVolumeServers
  86. for hasMovedOneEcShard {
  87. sort all volume servers ordered by the number of local ec shards
  88. pick the volume server A with the lowest number of ec shards x
  89. pick the volume server B with the highest number of ec shards y
  90. if y > averageShardCount and x +1 <= averageShardCount {
  91. if B has a ec shard with volume id v that A does not have {
  92. move one ec shard v from B to A
  93. hasMovedOneEcShard = true
  94. }
  95. }
  96. }
  97. }
  98. `
  99. // Overridable functions for testing.
  100. getDefaultReplicaPlacement = _getDefaultReplicaPlacement
  101. )
  102. func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
  103. var resp *master_pb.GetMasterConfigurationResponse
  104. var err error
  105. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  106. resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  107. return err
  108. })
  109. if err != nil {
  110. return nil, err
  111. }
  112. return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
  113. }
  114. func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) {
  115. if replicaStr != "" {
  116. rp, err := super_block.NewReplicaPlacementFromString(replicaStr)
  117. if err == nil {
  118. fmt.Printf("using replica placement %q for EC volumes\n", rp.String())
  119. }
  120. return rp, err
  121. }
  122. // No replica placement argument provided, resolve from master default settings.
  123. rp, err := getDefaultReplicaPlacement(commandEnv)
  124. if err == nil {
  125. fmt.Printf("using master default replica placement %q for EC volumes\n", rp.String())
  126. }
  127. return rp, err
  128. }
  129. func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
  130. if delayBeforeCollecting > 0 {
  131. time.Sleep(delayBeforeCollecting)
  132. }
  133. var resp *master_pb.VolumeListResponse
  134. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  135. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  136. return err
  137. })
  138. if err != nil {
  139. return
  140. }
  141. return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
  142. }
  143. func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  144. // list all possible locations
  145. // collect topology information
  146. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  147. if err != nil {
  148. return
  149. }
  150. // find out all volume servers with one slot left.
  151. ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
  152. sortEcNodesByFreeslotsDescending(ecNodes)
  153. return
  154. }
  155. func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  156. return collectEcNodesForDC(commandEnv, "")
  157. }
  158. func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string {
  159. if len(vids) == 0 {
  160. return nil
  161. }
  162. found := map[string]bool{}
  163. for _, dc := range t.DataCenterInfos {
  164. for _, r := range dc.RackInfos {
  165. for _, dn := range r.DataNodeInfos {
  166. for _, diskInfo := range dn.DiskInfos {
  167. for _, vi := range diskInfo.VolumeInfos {
  168. for _, vid := range vids {
  169. if needle.VolumeId(vi.Id) == vid && vi.Collection != "" {
  170. found[vi.Collection] = true
  171. }
  172. }
  173. }
  174. for _, ecs := range diskInfo.EcShardInfos {
  175. for _, vid := range vids {
  176. if needle.VolumeId(ecs.Id) == vid && ecs.Collection != "" {
  177. found[ecs.Collection] = true
  178. }
  179. }
  180. }
  181. }
  182. }
  183. }
  184. }
  185. if len(found) == 0 {
  186. return nil
  187. }
  188. collections := []string{}
  189. for k, _ := range found {
  190. collections = append(collections, k)
  191. }
  192. sort.Strings(collections)
  193. return collections
  194. }
  195. func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
  196. if !commandEnv.isLocked() {
  197. return fmt.Errorf("lock is lost")
  198. }
  199. copiedShardIds := []uint32{uint32(shardId)}
  200. if applyBalancing {
  201. existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
  202. // ask destination node to copy shard and the ecx file from source node, and mount it
  203. copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
  204. if err != nil {
  205. return err
  206. }
  207. // unmount the to be deleted shards
  208. err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds)
  209. if err != nil {
  210. return err
  211. }
  212. // ask source node to delete the shard, and maybe the ecx file
  213. err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds)
  214. if err != nil {
  215. return err
  216. }
  217. fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
  218. }
  219. destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
  220. existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
  221. return nil
  222. }
  223. func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
  224. targetServer *EcNode, shardIdsToCopy []uint32,
  225. volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
  226. fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  227. targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
  228. err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  229. if targetAddress != existingLocation {
  230. fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  231. _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
  232. VolumeId: uint32(volumeId),
  233. Collection: collection,
  234. ShardIds: shardIdsToCopy,
  235. CopyEcxFile: true,
  236. CopyEcjFile: true,
  237. CopyVifFile: true,
  238. SourceDataNode: string(existingLocation),
  239. })
  240. if copyErr != nil {
  241. return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
  242. }
  243. }
  244. fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
  245. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  246. VolumeId: uint32(volumeId),
  247. Collection: collection,
  248. ShardIds: shardIdsToCopy,
  249. })
  250. if mountErr != nil {
  251. return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
  252. }
  253. if targetAddress != existingLocation {
  254. copiedShardIds = shardIdsToCopy
  255. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
  256. }
  257. return nil
  258. })
  259. if err != nil {
  260. return
  261. }
  262. return
  263. }
  264. func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo)) {
  265. for _, dc := range topo.DataCenterInfos {
  266. for _, rack := range dc.RackInfos {
  267. for _, dn := range rack.DataNodeInfos {
  268. fn(DataCenterId(dc.Id), RackId(rack.Id), dn)
  269. }
  270. }
  271. }
  272. }
  273. func sortEcNodesByFreeslotsDescending(ecNodes []*EcNode) {
  274. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  275. return b.freeEcSlot - a.freeEcSlot
  276. })
  277. }
  278. func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
  279. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  280. return a.freeEcSlot - b.freeEcSlot
  281. })
  282. }
  283. // if the index node changed the freeEcSlot, need to keep every EcNode still sorted
  284. func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
  285. for i := index - 1; i >= 0; i-- {
  286. if lessThan(i+1, i) {
  287. swap(data, i, i+1)
  288. } else {
  289. break
  290. }
  291. }
  292. for i := index + 1; i < len(data); i++ {
  293. if lessThan(i, i-1) {
  294. swap(data, i, i-1)
  295. } else {
  296. break
  297. }
  298. }
  299. }
  300. func swap(data []*CandidateEcNode, i, j int) {
  301. t := data[i]
  302. data[i] = data[j]
  303. data[j] = t
  304. }
  305. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  306. for _, ecShardInfo := range ecShardInfos {
  307. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  308. count += shardBits.ShardIdCount()
  309. }
  310. return
  311. }
  312. func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
  313. if dn.DiskInfos == nil {
  314. return 0
  315. }
  316. diskInfo := dn.DiskInfos[string(diskType)]
  317. if diskInfo == nil {
  318. return 0
  319. }
  320. return int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
  321. }
  322. func (ecNode *EcNode) localShardIdCount(vid uint32) int {
  323. for _, diskInfo := range ecNode.info.DiskInfos {
  324. for _, ecShardInfo := range diskInfo.EcShardInfos {
  325. if vid == ecShardInfo.Id {
  326. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  327. return shardBits.ShardIdCount()
  328. }
  329. }
  330. }
  331. return 0
  332. }
  333. func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
  334. eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
  335. if selectedDataCenter != "" && selectedDataCenter != string(dc) {
  336. return
  337. }
  338. freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
  339. ecNodes = append(ecNodes, &EcNode{
  340. info: dn,
  341. dc: dc,
  342. rack: rack,
  343. freeEcSlot: int(freeEcSlots),
  344. })
  345. totalFreeEcSlots += freeEcSlots
  346. })
  347. return
  348. }
  349. func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error {
  350. fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
  351. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  352. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
  353. VolumeId: uint32(volumeId),
  354. Collection: collection,
  355. ShardIds: toBeDeletedShardIds,
  356. })
  357. return deleteErr
  358. })
  359. }
  360. func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error {
  361. fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
  362. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  363. _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
  364. VolumeId: uint32(volumeId),
  365. ShardIds: toBeUnmountedhardIds,
  366. })
  367. return deleteErr
  368. })
  369. }
  370. func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error {
  371. fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
  372. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  373. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  374. VolumeId: uint32(volumeId),
  375. Collection: collection,
  376. ShardIds: toBeMountedhardIds,
  377. })
  378. return mountErr
  379. })
  380. }
  381. func ceilDivide(a, b int) int {
  382. var r int
  383. if (a % b) != 0 {
  384. r = 1
  385. }
  386. return (a / b) + r
  387. }
  388. func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
  389. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  390. for _, shardInfo := range diskInfo.EcShardInfos {
  391. if needle.VolumeId(shardInfo.Id) == vid {
  392. return erasure_coding.ShardBits(shardInfo.EcIndexBits)
  393. }
  394. }
  395. }
  396. return 0
  397. }
  398. func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
  399. foundVolume := false
  400. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  401. if found {
  402. for _, shardInfo := range diskInfo.EcShardInfos {
  403. if needle.VolumeId(shardInfo.Id) == vid {
  404. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  405. newShardBits := oldShardBits
  406. for _, shardId := range shardIds {
  407. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  408. }
  409. shardInfo.EcIndexBits = uint32(newShardBits)
  410. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  411. foundVolume = true
  412. break
  413. }
  414. }
  415. } else {
  416. diskInfo = &master_pb.DiskInfo{
  417. Type: string(types.HardDriveType),
  418. }
  419. ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
  420. }
  421. if !foundVolume {
  422. var newShardBits erasure_coding.ShardBits
  423. for _, shardId := range shardIds {
  424. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  425. }
  426. diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
  427. Id: uint32(vid),
  428. Collection: collection,
  429. EcIndexBits: uint32(newShardBits),
  430. DiskType: string(types.HardDriveType),
  431. })
  432. ecNode.freeEcSlot -= len(shardIds)
  433. }
  434. return ecNode
  435. }
  436. func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
  437. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  438. for _, shardInfo := range diskInfo.EcShardInfos {
  439. if needle.VolumeId(shardInfo.Id) == vid {
  440. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  441. newShardBits := oldShardBits
  442. for _, shardId := range shardIds {
  443. newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
  444. }
  445. shardInfo.EcIndexBits = uint32(newShardBits)
  446. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  447. }
  448. }
  449. }
  450. return ecNode
  451. }
  452. func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
  453. countMap := make(map[string]int)
  454. for _, d := range data {
  455. id, count := identifierFn(d)
  456. countMap[id] += count
  457. }
  458. return countMap
  459. }
  460. func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
  461. groupMap := make(map[string][]*EcNode)
  462. for _, d := range data {
  463. id := identifierFn(d)
  464. groupMap[id] = append(groupMap[id], d)
  465. }
  466. return groupMap
  467. }
  468. type ecBalancer struct {
  469. commandEnv *CommandEnv
  470. ecNodes []*EcNode
  471. replicaPlacement *super_block.ReplicaPlacement
  472. applyBalancing bool
  473. parallelize bool
  474. wg *sync.WaitGroup
  475. wgErrors []error
  476. }
  477. type ecBalancerTask func() error
  478. func (ecb *ecBalancer) wgInit() {
  479. if ecb.wg != nil {
  480. return
  481. }
  482. ecb.wg = &sync.WaitGroup{}
  483. ecb.wgErrors = nil
  484. }
  485. func (ecb *ecBalancer) wgAdd(f ecBalancerTask) {
  486. if ecb.wg == nil || !ecb.parallelize {
  487. if err := f(); err != nil {
  488. ecb.wgErrors = append(ecb.wgErrors, err)
  489. }
  490. return
  491. }
  492. ecb.wg.Add(1)
  493. go func() {
  494. if err := f(); err != nil {
  495. ecb.wgErrors = append(ecb.wgErrors, err)
  496. }
  497. ecb.wg.Done()
  498. }()
  499. }
  500. func (ecb *ecBalancer) wgWait() error {
  501. if ecb.wg != nil {
  502. ecb.wg.Wait()
  503. }
  504. err := errors.Join(ecb.wgErrors...)
  505. ecb.wg = nil
  506. ecb.wgErrors = nil
  507. return err
  508. }
  509. func (ecb *ecBalancer) racks() map[RackId]*EcRack {
  510. racks := make(map[RackId]*EcRack)
  511. for _, ecNode := range ecb.ecNodes {
  512. if racks[ecNode.rack] == nil {
  513. racks[ecNode.rack] = &EcRack{
  514. ecNodes: make(map[EcNodeId]*EcNode),
  515. }
  516. }
  517. racks[ecNode.rack].ecNodes[EcNodeId(ecNode.info.Id)] = ecNode
  518. racks[ecNode.rack].freeEcSlot += ecNode.freeEcSlot
  519. }
  520. return racks
  521. }
  522. func (ecb *ecBalancer) balanceEcVolumes(collection string) error {
  523. fmt.Printf("balanceEcVolumes %s\n", collection)
  524. if err := ecb.deleteDuplicatedEcShards(collection); err != nil {
  525. return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
  526. }
  527. if err := ecb.balanceEcShardsAcrossRacks(collection); err != nil {
  528. return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
  529. }
  530. if err := ecb.balanceEcShardsWithinRacks(collection); err != nil {
  531. return fmt.Errorf("balance within racks collection %s ec shards: %v", collection, err)
  532. }
  533. return nil
  534. }
  535. func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error {
  536. vidLocations := ecb.collectVolumeIdToEcNodes(collection)
  537. ecb.wgInit()
  538. for vid, locations := range vidLocations {
  539. ecb.wgAdd(func() error {
  540. return ecb.doDeduplicateEcShards(collection, vid, locations)
  541. })
  542. }
  543. return ecb.wgWait()
  544. }
  545. func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error {
  546. // check whether this volume has ecNodes that are over average
  547. shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
  548. for _, ecNode := range locations {
  549. shardBits := findEcVolumeShards(ecNode, vid)
  550. for _, shardId := range shardBits.ShardIds() {
  551. shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
  552. }
  553. }
  554. for shardId, ecNodes := range shardToLocations {
  555. if len(ecNodes) <= 1 {
  556. continue
  557. }
  558. sortEcNodesByFreeslotsAscending(ecNodes)
  559. fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id)
  560. if !ecb.applyBalancing {
  561. continue
  562. }
  563. duplicatedShardIds := []uint32{uint32(shardId)}
  564. for _, ecNode := range ecNodes[1:] {
  565. if err := unmountEcShards(ecb.commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  566. return err
  567. }
  568. if err := sourceServerDeleteEcShards(ecb.commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  569. return err
  570. }
  571. ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
  572. }
  573. }
  574. return nil
  575. }
  576. func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
  577. // collect vid => []ecNode, since previous steps can change the locations
  578. vidLocations := ecb.collectVolumeIdToEcNodes(collection)
  579. // spread the ec shards evenly
  580. ecb.wgInit()
  581. for vid, locations := range vidLocations {
  582. ecb.wgAdd(func() error {
  583. return ecb.doBalanceEcShardsAcrossRacks(collection, vid, locations)
  584. })
  585. }
  586. return ecb.wgWait()
  587. }
  588. func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
  589. return groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
  590. shardBits := findEcVolumeShards(ecNode, vid)
  591. return string(ecNode.rack), shardBits.ShardIdCount()
  592. })
  593. }
  594. func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error {
  595. racks := ecb.racks()
  596. // calculate average number of shards an ec rack should have for one volume
  597. averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
  598. // see the volume's shards are in how many racks, and how many in each rack
  599. rackToShardCount := countShardsByRack(vid, locations)
  600. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  601. return string(ecNode.rack)
  602. })
  603. // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  604. ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
  605. for rackId, count := range rackToShardCount {
  606. if count <= averageShardsPerEcRack {
  607. continue
  608. }
  609. possibleEcNodes := rackEcNodesWithVid[rackId]
  610. for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
  611. ecShardsToMove[shardId] = ecNode
  612. }
  613. }
  614. for shardId, ecNode := range ecShardsToMove {
  615. rackId, err := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount)
  616. if err != nil {
  617. fmt.Printf("ec shard %d.%d at %s can not find a destination rack:\n%s\n", vid, shardId, ecNode.info.Id, err.Error())
  618. continue
  619. }
  620. var possibleDestinationEcNodes []*EcNode
  621. for _, n := range racks[rackId].ecNodes {
  622. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  623. }
  624. err = ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes)
  625. if err != nil {
  626. return err
  627. }
  628. rackToShardCount[string(rackId)] += 1
  629. rackToShardCount[string(ecNode.rack)] -= 1
  630. racks[rackId].freeEcSlot -= 1
  631. racks[ecNode.rack].freeEcSlot += 1
  632. }
  633. return nil
  634. }
  635. func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int) (RackId, error) {
  636. targets := []RackId{}
  637. targetShards := -1
  638. for _, shards := range rackToShardCount {
  639. if shards > targetShards {
  640. targetShards = shards
  641. }
  642. }
  643. details := ""
  644. for rackId, rack := range rackToEcNodes {
  645. shards := rackToShardCount[string(rackId)]
  646. if rack.freeEcSlot <= 0 {
  647. details += fmt.Sprintf(" Skipped %s because it has no free slots\n", rackId)
  648. continue
  649. }
  650. if ecb.replicaPlacement != nil && shards >= ecb.replicaPlacement.DiffRackCount {
  651. details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for other racks (%d)\n", rackId, shards, ecb.replicaPlacement.DiffRackCount)
  652. continue
  653. }
  654. if shards < targetShards {
  655. // Favor racks with less shards, to ensure an uniform distribution.
  656. targets = nil
  657. targetShards = shards
  658. }
  659. if shards == targetShards {
  660. targets = append(targets, rackId)
  661. }
  662. }
  663. if len(targets) == 0 {
  664. return "", errors.New(details)
  665. }
  666. return targets[rand.IntN(len(targets))], nil
  667. }
  668. func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
  669. // collect vid => []ecNode, since previous steps can change the locations
  670. vidLocations := ecb.collectVolumeIdToEcNodes(collection)
  671. racks := ecb.racks()
  672. // spread the ec shards evenly
  673. ecb.wgInit()
  674. for vid, locations := range vidLocations {
  675. // see the volume's shards are in how many racks, and how many in each rack
  676. rackToShardCount := countShardsByRack(vid, locations)
  677. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  678. return string(ecNode.rack)
  679. })
  680. for rackId, _ := range rackToShardCount {
  681. var possibleDestinationEcNodes []*EcNode
  682. for _, n := range racks[RackId(rackId)].ecNodes {
  683. if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
  684. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  685. }
  686. }
  687. sourceEcNodes := rackEcNodesWithVid[rackId]
  688. averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
  689. ecb.wgAdd(func() error {
  690. return ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes)
  691. })
  692. }
  693. }
  694. return ecb.wgWait()
  695. }
  696. func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error {
  697. for _, ecNode := range existingLocations {
  698. shardBits := findEcVolumeShards(ecNode, vid)
  699. overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
  700. for _, shardId := range shardBits.ShardIds() {
  701. if overLimitCount <= 0 {
  702. break
  703. }
  704. fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
  705. err := ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes)
  706. if err != nil {
  707. return err
  708. }
  709. overLimitCount--
  710. }
  711. }
  712. return nil
  713. }
  714. func (ecb *ecBalancer) balanceEcRacks() error {
  715. // balance one rack for all ec shards
  716. ecb.wgInit()
  717. for _, ecRack := range ecb.racks() {
  718. ecb.wgAdd(func() error {
  719. return ecb.doBalanceEcRack(ecRack)
  720. })
  721. }
  722. return ecb.wgWait()
  723. }
  724. func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
  725. if len(ecRack.ecNodes) <= 1 {
  726. return nil
  727. }
  728. var rackEcNodes []*EcNode
  729. for _, node := range ecRack.ecNodes {
  730. rackEcNodes = append(rackEcNodes, node)
  731. }
  732. ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
  733. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  734. if !found {
  735. return
  736. }
  737. for _, ecShardInfo := range diskInfo.EcShardInfos {
  738. count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
  739. }
  740. return ecNode.info.Id, count
  741. })
  742. var totalShardCount int
  743. for _, count := range ecNodeIdToShardCount {
  744. totalShardCount += count
  745. }
  746. averageShardCount := ceilDivide(totalShardCount, len(rackEcNodes))
  747. hasMove := true
  748. for hasMove {
  749. hasMove = false
  750. slices.SortFunc(rackEcNodes, func(a, b *EcNode) int {
  751. return b.freeEcSlot - a.freeEcSlot
  752. })
  753. emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1]
  754. emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id]
  755. if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
  756. emptyNodeIds := make(map[uint32]bool)
  757. if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
  758. for _, shards := range emptyDiskInfo.EcShardInfos {
  759. emptyNodeIds[shards.Id] = true
  760. }
  761. }
  762. if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
  763. for _, shards := range fullDiskInfo.EcShardInfos {
  764. if _, found := emptyNodeIds[shards.Id]; !found {
  765. for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
  766. fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
  767. err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing)
  768. if err != nil {
  769. return err
  770. }
  771. ecNodeIdToShardCount[emptyNode.info.Id]++
  772. ecNodeIdToShardCount[fullNode.info.Id]--
  773. hasMove = true
  774. break
  775. }
  776. break
  777. }
  778. }
  779. }
  780. }
  781. }
  782. return nil
  783. }
  784. func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, error) {
  785. if existingLocation == nil {
  786. return nil, fmt.Errorf("INTERNAL: missing source nodes")
  787. }
  788. if len(possibleDestinations) == 0 {
  789. return nil, fmt.Errorf("INTERNAL: missing destination nodes")
  790. }
  791. nodeShards := map[*EcNode]int{}
  792. for _, node := range possibleDestinations {
  793. nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount()
  794. }
  795. targets := []*EcNode{}
  796. targetShards := -1
  797. for _, shards := range nodeShards {
  798. if shards > targetShards {
  799. targetShards = shards
  800. }
  801. }
  802. details := ""
  803. for _, node := range possibleDestinations {
  804. if node.info.Id == existingLocation.info.Id {
  805. continue
  806. }
  807. if node.freeEcSlot <= 0 {
  808. details += fmt.Sprintf(" Skipped %s because it has no free slots\n", node.info.Id)
  809. continue
  810. }
  811. shards := nodeShards[node]
  812. if ecb.replicaPlacement != nil && shards >= ecb.replicaPlacement.SameRackCount {
  813. details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for the rack (%d)\n", node.info.Id, shards, ecb.replicaPlacement.SameRackCount)
  814. continue
  815. }
  816. if shards < targetShards {
  817. // Favor nodes with less shards, to ensure an uniform distribution.
  818. targets = nil
  819. targetShards = shards
  820. }
  821. if shards == targetShards {
  822. targets = append(targets, node)
  823. }
  824. }
  825. if len(targets) == 0 {
  826. return nil, errors.New(details)
  827. }
  828. return targets[rand.IntN(len(targets))], nil
  829. }
  830. func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error {
  831. destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
  832. if err != nil {
  833. fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error())
  834. return nil
  835. }
  836. fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
  837. return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing)
  838. }
  839. func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
  840. picked := make(map[erasure_coding.ShardId]*EcNode)
  841. var candidateEcNodes []*CandidateEcNode
  842. for _, ecNode := range ecNodes {
  843. shardBits := findEcVolumeShards(ecNode, vid)
  844. if shardBits.ShardIdCount() > 0 {
  845. candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
  846. ecNode: ecNode,
  847. shardCount: shardBits.ShardIdCount(),
  848. })
  849. }
  850. }
  851. slices.SortFunc(candidateEcNodes, func(a, b *CandidateEcNode) int {
  852. return b.shardCount - a.shardCount
  853. })
  854. for i := 0; i < n; i++ {
  855. selectedEcNodeIndex := -1
  856. for i, candidateEcNode := range candidateEcNodes {
  857. shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
  858. if shardBits > 0 {
  859. selectedEcNodeIndex = i
  860. for _, shardId := range shardBits.ShardIds() {
  861. candidateEcNode.shardCount--
  862. picked[shardId] = candidateEcNode.ecNode
  863. candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
  864. break
  865. }
  866. break
  867. }
  868. }
  869. if selectedEcNodeIndex >= 0 {
  870. ensureSortedEcNodes(candidateEcNodes, selectedEcNodeIndex, func(i, j int) bool {
  871. return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
  872. })
  873. }
  874. }
  875. return picked
  876. }
  877. func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.VolumeId][]*EcNode {
  878. vidLocations := make(map[needle.VolumeId][]*EcNode)
  879. for _, ecNode := range ecb.ecNodes {
  880. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  881. if !found {
  882. continue
  883. }
  884. for _, shardInfo := range diskInfo.EcShardInfos {
  885. // ignore if not in current collection
  886. if shardInfo.Collection == collection {
  887. vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
  888. }
  889. }
  890. }
  891. return vidLocations
  892. }
  893. func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, parallelize bool, applyBalancing bool) (err error) {
  894. if len(collections) == 0 {
  895. return fmt.Errorf("no collections to balance")
  896. }
  897. // collect all ec nodes
  898. allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc)
  899. if err != nil {
  900. return err
  901. }
  902. if totalFreeEcSlots < 1 {
  903. return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots)
  904. }
  905. ecb := &ecBalancer{
  906. commandEnv: commandEnv,
  907. ecNodes: allEcNodes,
  908. replicaPlacement: ecReplicaPlacement,
  909. applyBalancing: applyBalancing,
  910. parallelize: parallelize,
  911. }
  912. for _, c := range collections {
  913. if err = ecb.balanceEcVolumes(c); err != nil {
  914. return err
  915. }
  916. }
  917. if err := ecb.balanceEcRacks(); err != nil {
  918. return fmt.Errorf("balance ec racks: %v", err)
  919. }
  920. return nil
  921. }