You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1035 lines
32 KiB

2 months ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math/rand/v2"
  7. "sort"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/operation"
  11. "github.com/seaweedfs/seaweedfs/weed/pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  18. "golang.org/x/exp/slices"
  19. "google.golang.org/grpc"
  20. )
  21. type DataCenterId string
  22. type EcNodeId string
  23. type RackId string
  24. type EcNode struct {
  25. info *master_pb.DataNodeInfo
  26. dc DataCenterId
  27. rack RackId
  28. freeEcSlot int
  29. }
  30. type CandidateEcNode struct {
  31. ecNode *EcNode
  32. shardCount int
  33. }
  34. type EcRack struct {
  35. ecNodes map[EcNodeId]*EcNode
  36. freeEcSlot int
  37. }
  38. var (
  39. ecBalanceAlgorithmDescription = `
  40. func EcBalance() {
  41. for each collection:
  42. balanceEcVolumes(collectionName)
  43. for each rack:
  44. balanceEcRack(rack)
  45. }
  46. func balanceEcVolumes(collectionName){
  47. for each volume:
  48. doDeduplicateEcShards(volumeId)
  49. tracks rack~shardCount mapping
  50. for each volume:
  51. doBalanceEcShardsAcrossRacks(volumeId)
  52. for each volume:
  53. doBalanceEcShardsWithinRacks(volumeId)
  54. }
  55. // spread ec shards into more racks
  56. func doBalanceEcShardsAcrossRacks(volumeId){
  57. tracks rack~volumeIdShardCount mapping
  58. averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
  59. ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  60. for each ecShardsToMove {
  61. destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, ecShardReplicaPlacement)
  62. destVolumeServers = volume servers on the destRack
  63. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  64. }
  65. }
  66. func doBalanceEcShardsWithinRacks(volumeId){
  67. racks = collect all racks that the volume id is on
  68. for rack, shards := range racks
  69. doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
  70. }
  71. // move ec shards
  72. func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
  73. tracks volumeServer~volumeIdShardCount mapping
  74. averageShardCount = len(shards) / numVolumeServers
  75. volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
  76. ecShardsToMove = select overflown ec shards from volumeServersOverAverage
  77. for each ecShardsToMove {
  78. destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, ecShardReplicaPlacement)
  79. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  80. }
  81. }
  82. // move ec shards while keeping shard distribution for the same volume unchanged or more even
  83. func balanceEcRack(rack){
  84. averageShardCount = total shards / numVolumeServers
  85. for hasMovedOneEcShard {
  86. sort all volume servers ordered by the number of local ec shards
  87. pick the volume server A with the lowest number of ec shards x
  88. pick the volume server B with the highest number of ec shards y
  89. if y > averageShardCount and x +1 <= averageShardCount {
  90. if B has a ec shard with volume id v that A does not have {
  91. move one ec shard v from B to A
  92. hasMovedOneEcShard = true
  93. }
  94. }
  95. }
  96. }
  97. `
  98. // Overridable functions for testing.
  99. getDefaultReplicaPlacement = _getDefaultReplicaPlacement
  100. )
  101. func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
  102. var resp *master_pb.GetMasterConfigurationResponse
  103. var err error
  104. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  105. resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  106. return err
  107. })
  108. if err != nil {
  109. return nil, err
  110. }
  111. return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
  112. }
  113. func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) {
  114. if replicaStr != "" {
  115. rp, err := super_block.NewReplicaPlacementFromString(replicaStr)
  116. if err == nil {
  117. fmt.Printf("using replica placement %q for EC volumes\n", rp.String())
  118. }
  119. return rp, err
  120. }
  121. // No replica placement argument provided, resolve from master default settings.
  122. rp, err := getDefaultReplicaPlacement(commandEnv)
  123. if err == nil {
  124. fmt.Printf("using master default replica placement %q for EC volumes\n", rp.String())
  125. }
  126. return rp, err
  127. }
  128. func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
  129. if delayBeforeCollecting > 0 {
  130. time.Sleep(delayBeforeCollecting)
  131. }
  132. var resp *master_pb.VolumeListResponse
  133. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  134. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  135. return err
  136. })
  137. if err != nil {
  138. return
  139. }
  140. return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
  141. }
  142. func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  143. // list all possible locations
  144. // collect topology information
  145. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  146. if err != nil {
  147. return
  148. }
  149. // find out all volume servers with one slot left.
  150. ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
  151. sortEcNodesByFreeslotsDescending(ecNodes)
  152. return
  153. }
  154. func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  155. return collectEcNodesForDC(commandEnv, "")
  156. }
  157. func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string {
  158. if len(vids) == 0 {
  159. return nil
  160. }
  161. found := map[string]bool{}
  162. for _, dc := range t.DataCenterInfos {
  163. for _, r := range dc.RackInfos {
  164. for _, dn := range r.DataNodeInfos {
  165. for _, diskInfo := range dn.DiskInfos {
  166. for _, vi := range diskInfo.VolumeInfos {
  167. for _, vid := range vids {
  168. if needle.VolumeId(vi.Id) == vid && vi.Collection != "" {
  169. found[vi.Collection] = true
  170. }
  171. }
  172. }
  173. for _, ecs := range diskInfo.EcShardInfos {
  174. for _, vid := range vids {
  175. if needle.VolumeId(ecs.Id) == vid && ecs.Collection != "" {
  176. found[ecs.Collection] = true
  177. }
  178. }
  179. }
  180. }
  181. }
  182. }
  183. }
  184. if len(found) == 0 {
  185. return nil
  186. }
  187. collections := []string{}
  188. for k, _ := range found {
  189. collections = append(collections, k)
  190. }
  191. sort.Strings(collections)
  192. return collections
  193. }
  194. func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
  195. if !commandEnv.isLocked() {
  196. return fmt.Errorf("lock is lost")
  197. }
  198. copiedShardIds := []uint32{uint32(shardId)}
  199. if applyBalancing {
  200. existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
  201. // ask destination node to copy shard and the ecx file from source node, and mount it
  202. copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
  203. if err != nil {
  204. return err
  205. }
  206. // unmount the to be deleted shards
  207. err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds)
  208. if err != nil {
  209. return err
  210. }
  211. // ask source node to delete the shard, and maybe the ecx file
  212. err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds)
  213. if err != nil {
  214. return err
  215. }
  216. fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
  217. }
  218. destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
  219. existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
  220. return nil
  221. }
  222. func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
  223. targetServer *EcNode, shardIdsToCopy []uint32,
  224. volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
  225. fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  226. targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
  227. err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  228. if targetAddress != existingLocation {
  229. fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  230. _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
  231. VolumeId: uint32(volumeId),
  232. Collection: collection,
  233. ShardIds: shardIdsToCopy,
  234. CopyEcxFile: true,
  235. CopyEcjFile: true,
  236. CopyVifFile: true,
  237. SourceDataNode: string(existingLocation),
  238. })
  239. if copyErr != nil {
  240. return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
  241. }
  242. }
  243. fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
  244. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  245. VolumeId: uint32(volumeId),
  246. Collection: collection,
  247. ShardIds: shardIdsToCopy,
  248. })
  249. if mountErr != nil {
  250. return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
  251. }
  252. if targetAddress != existingLocation {
  253. copiedShardIds = shardIdsToCopy
  254. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
  255. }
  256. return nil
  257. })
  258. if err != nil {
  259. return
  260. }
  261. return
  262. }
  263. func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo)) {
  264. for _, dc := range topo.DataCenterInfos {
  265. for _, rack := range dc.RackInfos {
  266. for _, dn := range rack.DataNodeInfos {
  267. fn(DataCenterId(dc.Id), RackId(rack.Id), dn)
  268. }
  269. }
  270. }
  271. }
  272. func sortEcNodesByFreeslotsDescending(ecNodes []*EcNode) {
  273. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  274. return b.freeEcSlot - a.freeEcSlot
  275. })
  276. }
  277. func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
  278. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  279. return a.freeEcSlot - b.freeEcSlot
  280. })
  281. }
  282. // if the index node changed the freeEcSlot, need to keep every EcNode still sorted
  283. func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
  284. for i := index - 1; i >= 0; i-- {
  285. if lessThan(i+1, i) {
  286. swap(data, i, i+1)
  287. } else {
  288. break
  289. }
  290. }
  291. for i := index + 1; i < len(data); i++ {
  292. if lessThan(i, i-1) {
  293. swap(data, i, i-1)
  294. } else {
  295. break
  296. }
  297. }
  298. }
  299. func swap(data []*CandidateEcNode, i, j int) {
  300. t := data[i]
  301. data[i] = data[j]
  302. data[j] = t
  303. }
  304. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  305. for _, ecShardInfo := range ecShardInfos {
  306. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  307. count += shardBits.ShardIdCount()
  308. }
  309. return
  310. }
  311. func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
  312. if dn.DiskInfos == nil {
  313. return 0
  314. }
  315. diskInfo := dn.DiskInfos[string(diskType)]
  316. if diskInfo == nil {
  317. return 0
  318. }
  319. return int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
  320. }
  321. func (ecNode *EcNode) localShardIdCount(vid uint32) int {
  322. for _, diskInfo := range ecNode.info.DiskInfos {
  323. for _, ecShardInfo := range diskInfo.EcShardInfos {
  324. if vid == ecShardInfo.Id {
  325. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  326. return shardBits.ShardIdCount()
  327. }
  328. }
  329. }
  330. return 0
  331. }
  332. func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
  333. eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
  334. if selectedDataCenter != "" && selectedDataCenter != string(dc) {
  335. return
  336. }
  337. freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
  338. ecNodes = append(ecNodes, &EcNode{
  339. info: dn,
  340. dc: dc,
  341. rack: rack,
  342. freeEcSlot: int(freeEcSlots),
  343. })
  344. totalFreeEcSlots += freeEcSlots
  345. })
  346. return
  347. }
  348. func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error {
  349. fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
  350. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  351. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
  352. VolumeId: uint32(volumeId),
  353. Collection: collection,
  354. ShardIds: toBeDeletedShardIds,
  355. })
  356. return deleteErr
  357. })
  358. }
  359. func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error {
  360. fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
  361. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  362. _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
  363. VolumeId: uint32(volumeId),
  364. ShardIds: toBeUnmountedhardIds,
  365. })
  366. return deleteErr
  367. })
  368. }
  369. func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error {
  370. fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
  371. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  372. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  373. VolumeId: uint32(volumeId),
  374. Collection: collection,
  375. ShardIds: toBeMountedhardIds,
  376. })
  377. return mountErr
  378. })
  379. }
  380. func ceilDivide(a, b int) int {
  381. var r int
  382. if (a % b) != 0 {
  383. r = 1
  384. }
  385. return (a / b) + r
  386. }
  387. func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
  388. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  389. for _, shardInfo := range diskInfo.EcShardInfos {
  390. if needle.VolumeId(shardInfo.Id) == vid {
  391. return erasure_coding.ShardBits(shardInfo.EcIndexBits)
  392. }
  393. }
  394. }
  395. return 0
  396. }
  397. func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
  398. foundVolume := false
  399. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  400. if found {
  401. for _, shardInfo := range diskInfo.EcShardInfos {
  402. if needle.VolumeId(shardInfo.Id) == vid {
  403. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  404. newShardBits := oldShardBits
  405. for _, shardId := range shardIds {
  406. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  407. }
  408. shardInfo.EcIndexBits = uint32(newShardBits)
  409. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  410. foundVolume = true
  411. break
  412. }
  413. }
  414. } else {
  415. diskInfo = &master_pb.DiskInfo{
  416. Type: string(types.HardDriveType),
  417. }
  418. ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
  419. }
  420. if !foundVolume {
  421. var newShardBits erasure_coding.ShardBits
  422. for _, shardId := range shardIds {
  423. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  424. }
  425. diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
  426. Id: uint32(vid),
  427. Collection: collection,
  428. EcIndexBits: uint32(newShardBits),
  429. DiskType: string(types.HardDriveType),
  430. })
  431. ecNode.freeEcSlot -= len(shardIds)
  432. }
  433. return ecNode
  434. }
  435. func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
  436. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  437. for _, shardInfo := range diskInfo.EcShardInfos {
  438. if needle.VolumeId(shardInfo.Id) == vid {
  439. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  440. newShardBits := oldShardBits
  441. for _, shardId := range shardIds {
  442. newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
  443. }
  444. shardInfo.EcIndexBits = uint32(newShardBits)
  445. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  446. }
  447. }
  448. }
  449. return ecNode
  450. }
  451. func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
  452. countMap := make(map[string]int)
  453. for _, d := range data {
  454. id, count := identifierFn(d)
  455. countMap[id] += count
  456. }
  457. return countMap
  458. }
  459. func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
  460. groupMap := make(map[string][]*EcNode)
  461. for _, d := range data {
  462. id := identifierFn(d)
  463. groupMap[id] = append(groupMap[id], d)
  464. }
  465. return groupMap
  466. }
  467. type ecBalancer struct {
  468. commandEnv *CommandEnv
  469. ecNodes []*EcNode
  470. replicaPlacement *super_block.ReplicaPlacement
  471. applyBalancing bool
  472. }
  473. func (ecb *ecBalancer) racks() map[RackId]*EcRack {
  474. racks := make(map[RackId]*EcRack)
  475. for _, ecNode := range ecb.ecNodes {
  476. if racks[ecNode.rack] == nil {
  477. racks[ecNode.rack] = &EcRack{
  478. ecNodes: make(map[EcNodeId]*EcNode),
  479. }
  480. }
  481. racks[ecNode.rack].ecNodes[EcNodeId(ecNode.info.Id)] = ecNode
  482. racks[ecNode.rack].freeEcSlot += ecNode.freeEcSlot
  483. }
  484. return racks
  485. }
  486. func (ecb *ecBalancer) balanceEcVolumes(collection string) error {
  487. fmt.Printf("balanceEcVolumes %s\n", collection)
  488. if err := ecb.deleteDuplicatedEcShards(collection); err != nil {
  489. return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
  490. }
  491. if err := ecb.balanceEcShardsAcrossRacks(collection); err != nil {
  492. return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
  493. }
  494. if err := ecb.balanceEcShardsWithinRacks(collection); err != nil {
  495. return fmt.Errorf("balance within racks collection %s ec shards: %v", collection, err)
  496. }
  497. return nil
  498. }
  499. func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error {
  500. // vid => []ecNode
  501. vidLocations := ecb.collectVolumeIdToEcNodes(collection)
  502. // deduplicate ec shards
  503. for vid, locations := range vidLocations {
  504. if err := ecb.doDeduplicateEcShards(collection, vid, locations); err != nil {
  505. return err
  506. }
  507. }
  508. return nil
  509. }
  510. func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error {
  511. // check whether this volume has ecNodes that are over average
  512. shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
  513. for _, ecNode := range locations {
  514. shardBits := findEcVolumeShards(ecNode, vid)
  515. for _, shardId := range shardBits.ShardIds() {
  516. shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
  517. }
  518. }
  519. for shardId, ecNodes := range shardToLocations {
  520. if len(ecNodes) <= 1 {
  521. continue
  522. }
  523. sortEcNodesByFreeslotsAscending(ecNodes)
  524. fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id)
  525. if !ecb.applyBalancing {
  526. continue
  527. }
  528. duplicatedShardIds := []uint32{uint32(shardId)}
  529. for _, ecNode := range ecNodes[1:] {
  530. if err := unmountEcShards(ecb.commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  531. return err
  532. }
  533. if err := sourceServerDeleteEcShards(ecb.commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  534. return err
  535. }
  536. ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
  537. }
  538. }
  539. return nil
  540. }
  541. func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
  542. // collect vid => []ecNode, since previous steps can change the locations
  543. vidLocations := ecb.collectVolumeIdToEcNodes(collection)
  544. // spread the ec shards evenly
  545. for vid, locations := range vidLocations {
  546. if err := ecb.doBalanceEcShardsAcrossRacks(collection, vid, locations); err != nil {
  547. return err
  548. }
  549. }
  550. return nil
  551. }
  552. func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
  553. return groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
  554. shardBits := findEcVolumeShards(ecNode, vid)
  555. return string(ecNode.rack), shardBits.ShardIdCount()
  556. })
  557. }
  558. func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error {
  559. racks := ecb.racks()
  560. // calculate average number of shards an ec rack should have for one volume
  561. averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
  562. // see the volume's shards are in how many racks, and how many in each rack
  563. rackToShardCount := countShardsByRack(vid, locations)
  564. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  565. return string(ecNode.rack)
  566. })
  567. // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  568. ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
  569. for rackId, count := range rackToShardCount {
  570. if count <= averageShardsPerEcRack {
  571. continue
  572. }
  573. possibleEcNodes := rackEcNodesWithVid[rackId]
  574. for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
  575. ecShardsToMove[shardId] = ecNode
  576. }
  577. }
  578. for shardId, ecNode := range ecShardsToMove {
  579. rackId, err := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount)
  580. if err != nil {
  581. fmt.Printf("ec shard %d.%d at %s can not find a destination rack:\n%s\n", vid, shardId, ecNode.info.Id, err.Error())
  582. continue
  583. }
  584. var possibleDestinationEcNodes []*EcNode
  585. for _, n := range racks[rackId].ecNodes {
  586. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  587. }
  588. err = ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes)
  589. if err != nil {
  590. return err
  591. }
  592. rackToShardCount[string(rackId)] += 1
  593. rackToShardCount[string(ecNode.rack)] -= 1
  594. racks[rackId].freeEcSlot -= 1
  595. racks[ecNode.rack].freeEcSlot += 1
  596. }
  597. return nil
  598. }
  599. func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int) (RackId, error) {
  600. targets := []RackId{}
  601. targetShards := -1
  602. for _, shards := range rackToShardCount {
  603. if shards > targetShards {
  604. targetShards = shards
  605. }
  606. }
  607. details := ""
  608. for rackId, rack := range rackToEcNodes {
  609. shards := rackToShardCount[string(rackId)]
  610. if rack.freeEcSlot <= 0 {
  611. details += fmt.Sprintf(" Skipped %s because it has no free slots\n", rackId)
  612. continue
  613. }
  614. if ecb.replicaPlacement != nil && shards >= ecb.replicaPlacement.DiffRackCount {
  615. details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for other racks (%d)\n", rackId, shards, ecb.replicaPlacement.DiffRackCount)
  616. continue
  617. }
  618. if shards < targetShards {
  619. // Favor racks with less shards, to ensure an uniform distribution.
  620. targets = nil
  621. targetShards = shards
  622. }
  623. if shards == targetShards {
  624. targets = append(targets, rackId)
  625. }
  626. }
  627. if len(targets) == 0 {
  628. return "", errors.New(details)
  629. }
  630. return targets[rand.IntN(len(targets))], nil
  631. }
  632. func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
  633. // collect vid => []ecNode, since previous steps can change the locations
  634. vidLocations := ecb.collectVolumeIdToEcNodes(collection)
  635. racks := ecb.racks()
  636. // spread the ec shards evenly
  637. for vid, locations := range vidLocations {
  638. // see the volume's shards are in how many racks, and how many in each rack
  639. rackToShardCount := countShardsByRack(vid, locations)
  640. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  641. return string(ecNode.rack)
  642. })
  643. for rackId, _ := range rackToShardCount {
  644. var possibleDestinationEcNodes []*EcNode
  645. for _, n := range racks[RackId(rackId)].ecNodes {
  646. if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
  647. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  648. }
  649. }
  650. sourceEcNodes := rackEcNodesWithVid[rackId]
  651. averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
  652. if err := ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes); err != nil {
  653. return err
  654. }
  655. }
  656. }
  657. return nil
  658. }
  659. func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error {
  660. for _, ecNode := range existingLocations {
  661. shardBits := findEcVolumeShards(ecNode, vid)
  662. overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
  663. for _, shardId := range shardBits.ShardIds() {
  664. if overLimitCount <= 0 {
  665. break
  666. }
  667. fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
  668. err := ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes)
  669. if err != nil {
  670. return err
  671. }
  672. overLimitCount--
  673. }
  674. }
  675. return nil
  676. }
  677. func (ecb *ecBalancer) balanceEcRacks() error {
  678. // balance one rack for all ec shards
  679. for _, ecRack := range ecb.racks() {
  680. if err := ecb.doBalanceEcRack(ecRack); err != nil {
  681. return err
  682. }
  683. }
  684. return nil
  685. }
  686. func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
  687. if len(ecRack.ecNodes) <= 1 {
  688. return nil
  689. }
  690. var rackEcNodes []*EcNode
  691. for _, node := range ecRack.ecNodes {
  692. rackEcNodes = append(rackEcNodes, node)
  693. }
  694. ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
  695. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  696. if !found {
  697. return
  698. }
  699. for _, ecShardInfo := range diskInfo.EcShardInfos {
  700. count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
  701. }
  702. return ecNode.info.Id, count
  703. })
  704. var totalShardCount int
  705. for _, count := range ecNodeIdToShardCount {
  706. totalShardCount += count
  707. }
  708. averageShardCount := ceilDivide(totalShardCount, len(rackEcNodes))
  709. hasMove := true
  710. for hasMove {
  711. hasMove = false
  712. slices.SortFunc(rackEcNodes, func(a, b *EcNode) int {
  713. return b.freeEcSlot - a.freeEcSlot
  714. })
  715. emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1]
  716. emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id]
  717. if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
  718. emptyNodeIds := make(map[uint32]bool)
  719. if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
  720. for _, shards := range emptyDiskInfo.EcShardInfos {
  721. emptyNodeIds[shards.Id] = true
  722. }
  723. }
  724. if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
  725. for _, shards := range fullDiskInfo.EcShardInfos {
  726. if _, found := emptyNodeIds[shards.Id]; !found {
  727. for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
  728. fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
  729. err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing)
  730. if err != nil {
  731. return err
  732. }
  733. ecNodeIdToShardCount[emptyNode.info.Id]++
  734. ecNodeIdToShardCount[fullNode.info.Id]--
  735. hasMove = true
  736. break
  737. }
  738. break
  739. }
  740. }
  741. }
  742. }
  743. }
  744. return nil
  745. }
  746. func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, error) {
  747. if existingLocation == nil {
  748. return nil, fmt.Errorf("INTERNAL: missing source nodes")
  749. }
  750. if len(possibleDestinations) == 0 {
  751. return nil, fmt.Errorf("INTERNAL: missing destination nodes")
  752. }
  753. nodeShards := map[*EcNode]int{}
  754. for _, node := range possibleDestinations {
  755. nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount()
  756. }
  757. targets := []*EcNode{}
  758. targetShards := -1
  759. for _, shards := range nodeShards {
  760. if shards > targetShards {
  761. targetShards = shards
  762. }
  763. }
  764. details := ""
  765. for _, node := range possibleDestinations {
  766. if node.info.Id == existingLocation.info.Id {
  767. continue
  768. }
  769. if node.freeEcSlot <= 0 {
  770. details += fmt.Sprintf(" Skipped %s because it has no free slots\n", node.info.Id)
  771. continue
  772. }
  773. shards := nodeShards[node]
  774. if ecb.replicaPlacement != nil && shards >= ecb.replicaPlacement.SameRackCount {
  775. details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for the rack (%d)\n", node.info.Id, shards, ecb.replicaPlacement.SameRackCount)
  776. continue
  777. }
  778. if shards < targetShards {
  779. // Favor nodes with less shards, to ensure an uniform distribution.
  780. targets = nil
  781. targetShards = shards
  782. }
  783. if shards == targetShards {
  784. targets = append(targets, node)
  785. }
  786. }
  787. if len(targets) == 0 {
  788. return nil, errors.New(details)
  789. }
  790. return targets[rand.IntN(len(targets))], nil
  791. }
  792. func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error {
  793. destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
  794. if err != nil {
  795. fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error())
  796. return nil
  797. }
  798. fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
  799. return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing)
  800. }
  801. func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
  802. picked := make(map[erasure_coding.ShardId]*EcNode)
  803. var candidateEcNodes []*CandidateEcNode
  804. for _, ecNode := range ecNodes {
  805. shardBits := findEcVolumeShards(ecNode, vid)
  806. if shardBits.ShardIdCount() > 0 {
  807. candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
  808. ecNode: ecNode,
  809. shardCount: shardBits.ShardIdCount(),
  810. })
  811. }
  812. }
  813. slices.SortFunc(candidateEcNodes, func(a, b *CandidateEcNode) int {
  814. return b.shardCount - a.shardCount
  815. })
  816. for i := 0; i < n; i++ {
  817. selectedEcNodeIndex := -1
  818. for i, candidateEcNode := range candidateEcNodes {
  819. shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
  820. if shardBits > 0 {
  821. selectedEcNodeIndex = i
  822. for _, shardId := range shardBits.ShardIds() {
  823. candidateEcNode.shardCount--
  824. picked[shardId] = candidateEcNode.ecNode
  825. candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
  826. break
  827. }
  828. break
  829. }
  830. }
  831. if selectedEcNodeIndex >= 0 {
  832. ensureSortedEcNodes(candidateEcNodes, selectedEcNodeIndex, func(i, j int) bool {
  833. return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
  834. })
  835. }
  836. }
  837. return picked
  838. }
  839. func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.VolumeId][]*EcNode {
  840. vidLocations := make(map[needle.VolumeId][]*EcNode)
  841. for _, ecNode := range ecb.ecNodes {
  842. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  843. if !found {
  844. continue
  845. }
  846. for _, shardInfo := range diskInfo.EcShardInfos {
  847. // ignore if not in current collection
  848. if shardInfo.Collection == collection {
  849. vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
  850. }
  851. }
  852. }
  853. return vidLocations
  854. }
  855. func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, applyBalancing bool) (err error) {
  856. if len(collections) == 0 {
  857. return fmt.Errorf("no collections to balance")
  858. }
  859. // collect all ec nodes
  860. allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc)
  861. if err != nil {
  862. return err
  863. }
  864. if totalFreeEcSlots < 1 {
  865. return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots)
  866. }
  867. ecb := &ecBalancer{
  868. commandEnv: commandEnv,
  869. ecNodes: allEcNodes,
  870. replicaPlacement: ecReplicaPlacement,
  871. applyBalancing: applyBalancing,
  872. }
  873. for _, c := range collections {
  874. if err = ecb.balanceEcVolumes(c); err != nil {
  875. return err
  876. }
  877. }
  878. if err := ecb.balanceEcRacks(); err != nil {
  879. return fmt.Errorf("balance ec racks: %v", err)
  880. }
  881. return nil
  882. }