You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

994 lines
31 KiB

2 months ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math/rand/v2"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/glog"
  9. "github.com/seaweedfs/seaweedfs/weed/operation"
  10. "github.com/seaweedfs/seaweedfs/weed/pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  17. "golang.org/x/exp/slices"
  18. "google.golang.org/grpc"
  19. )
  20. type DataCenterId string
  21. type EcNodeId string
  22. type RackId string
  23. type EcNode struct {
  24. info *master_pb.DataNodeInfo
  25. dc DataCenterId
  26. rack RackId
  27. freeEcSlot int
  28. }
  29. type CandidateEcNode struct {
  30. ecNode *EcNode
  31. shardCount int
  32. }
  33. type EcRack struct {
  34. ecNodes map[EcNodeId]*EcNode
  35. freeEcSlot int
  36. }
  37. var (
  38. ecBalanceAlgorithmDescription = `
  39. func EcBalance() {
  40. for each collection:
  41. balanceEcVolumes(collectionName)
  42. for each rack:
  43. balanceEcRack(rack)
  44. }
  45. func balanceEcVolumes(collectionName){
  46. for each volume:
  47. doDeduplicateEcShards(volumeId)
  48. tracks rack~shardCount mapping
  49. for each volume:
  50. doBalanceEcShardsAcrossRacks(volumeId)
  51. for each volume:
  52. doBalanceEcShardsWithinRacks(volumeId)
  53. }
  54. // spread ec shards into more racks
  55. func doBalanceEcShardsAcrossRacks(volumeId){
  56. tracks rack~volumeIdShardCount mapping
  57. averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
  58. ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  59. for each ecShardsToMove {
  60. destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, ecShardReplicaPlacement)
  61. destVolumeServers = volume servers on the destRack
  62. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  63. }
  64. }
  65. func doBalanceEcShardsWithinRacks(volumeId){
  66. racks = collect all racks that the volume id is on
  67. for rack, shards := range racks
  68. doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
  69. }
  70. // move ec shards
  71. func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
  72. tracks volumeServer~volumeIdShardCount mapping
  73. averageShardCount = len(shards) / numVolumeServers
  74. volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
  75. ecShardsToMove = select overflown ec shards from volumeServersOverAverage
  76. for each ecShardsToMove {
  77. destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, ecShardReplicaPlacement)
  78. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  79. }
  80. }
  81. // move ec shards while keeping shard distribution for the same volume unchanged or more even
  82. func balanceEcRack(rack){
  83. averageShardCount = total shards / numVolumeServers
  84. for hasMovedOneEcShard {
  85. sort all volume servers ordered by the number of local ec shards
  86. pick the volume server A with the lowest number of ec shards x
  87. pick the volume server B with the highest number of ec shards y
  88. if y > averageShardCount and x +1 <= averageShardCount {
  89. if B has a ec shard with volume id v that A does not have {
  90. move one ec shard v from B to A
  91. hasMovedOneEcShard = true
  92. }
  93. }
  94. }
  95. }
  96. `
  97. // Overridable functions for testing.
  98. getDefaultReplicaPlacement = _getDefaultReplicaPlacement
  99. )
  100. func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
  101. var resp *master_pb.GetMasterConfigurationResponse
  102. var err error
  103. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  104. resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  105. return err
  106. })
  107. if err != nil {
  108. return nil, err
  109. }
  110. return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
  111. }
  112. func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) {
  113. if replicaStr != "" {
  114. rp, err := super_block.NewReplicaPlacementFromString(replicaStr)
  115. if err == nil {
  116. fmt.Printf("using replica placement %q for EC volumes\n", rp.String())
  117. }
  118. return rp, err
  119. }
  120. // No replica placement argument provided, resolve from master default settings.
  121. rp, err := getDefaultReplicaPlacement(commandEnv)
  122. if err == nil {
  123. fmt.Printf("using master default replica placement %q for EC volumes\n", rp.String())
  124. }
  125. return rp, err
  126. }
  127. func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
  128. if delayBeforeCollecting > 0 {
  129. time.Sleep(delayBeforeCollecting)
  130. }
  131. var resp *master_pb.VolumeListResponse
  132. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  133. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  134. return err
  135. })
  136. if err != nil {
  137. return
  138. }
  139. return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
  140. }
  141. func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  142. // list all possible locations
  143. // collect topology information
  144. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  145. if err != nil {
  146. return
  147. }
  148. // find out all volume servers with one slot left.
  149. ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
  150. sortEcNodesByFreeslotsDescending(ecNodes)
  151. return
  152. }
  153. func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  154. return collectEcNodesForDC(commandEnv, "")
  155. }
  156. func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
  157. if !commandEnv.isLocked() {
  158. return fmt.Errorf("lock is lost")
  159. }
  160. copiedShardIds := []uint32{uint32(shardId)}
  161. if applyBalancing {
  162. existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
  163. // ask destination node to copy shard and the ecx file from source node, and mount it
  164. copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
  165. if err != nil {
  166. return err
  167. }
  168. // unmount the to be deleted shards
  169. err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds)
  170. if err != nil {
  171. return err
  172. }
  173. // ask source node to delete the shard, and maybe the ecx file
  174. err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds)
  175. if err != nil {
  176. return err
  177. }
  178. fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
  179. }
  180. destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
  181. existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
  182. return nil
  183. }
  184. func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
  185. targetServer *EcNode, shardIdsToCopy []uint32,
  186. volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
  187. fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  188. targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
  189. err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  190. if targetAddress != existingLocation {
  191. fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  192. _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
  193. VolumeId: uint32(volumeId),
  194. Collection: collection,
  195. ShardIds: shardIdsToCopy,
  196. CopyEcxFile: true,
  197. CopyEcjFile: true,
  198. CopyVifFile: true,
  199. SourceDataNode: string(existingLocation),
  200. })
  201. if copyErr != nil {
  202. return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
  203. }
  204. }
  205. fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
  206. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  207. VolumeId: uint32(volumeId),
  208. Collection: collection,
  209. ShardIds: shardIdsToCopy,
  210. })
  211. if mountErr != nil {
  212. return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
  213. }
  214. if targetAddress != existingLocation {
  215. copiedShardIds = shardIdsToCopy
  216. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
  217. }
  218. return nil
  219. })
  220. if err != nil {
  221. return
  222. }
  223. return
  224. }
  225. func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo)) {
  226. for _, dc := range topo.DataCenterInfos {
  227. for _, rack := range dc.RackInfos {
  228. for _, dn := range rack.DataNodeInfos {
  229. fn(DataCenterId(dc.Id), RackId(rack.Id), dn)
  230. }
  231. }
  232. }
  233. }
  234. func sortEcNodesByFreeslotsDescending(ecNodes []*EcNode) {
  235. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  236. return b.freeEcSlot - a.freeEcSlot
  237. })
  238. }
  239. func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
  240. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  241. return a.freeEcSlot - b.freeEcSlot
  242. })
  243. }
  244. // if the index node changed the freeEcSlot, need to keep every EcNode still sorted
  245. func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
  246. for i := index - 1; i >= 0; i-- {
  247. if lessThan(i+1, i) {
  248. swap(data, i, i+1)
  249. } else {
  250. break
  251. }
  252. }
  253. for i := index + 1; i < len(data); i++ {
  254. if lessThan(i, i-1) {
  255. swap(data, i, i-1)
  256. } else {
  257. break
  258. }
  259. }
  260. }
  261. func swap(data []*CandidateEcNode, i, j int) {
  262. t := data[i]
  263. data[i] = data[j]
  264. data[j] = t
  265. }
  266. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  267. for _, ecShardInfo := range ecShardInfos {
  268. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  269. count += shardBits.ShardIdCount()
  270. }
  271. return
  272. }
  273. func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
  274. if dn.DiskInfos == nil {
  275. return 0
  276. }
  277. diskInfo := dn.DiskInfos[string(diskType)]
  278. if diskInfo == nil {
  279. return 0
  280. }
  281. return int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
  282. }
  283. func (ecNode *EcNode) localShardIdCount(vid uint32) int {
  284. for _, diskInfo := range ecNode.info.DiskInfos {
  285. for _, ecShardInfo := range diskInfo.EcShardInfos {
  286. if vid == ecShardInfo.Id {
  287. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  288. return shardBits.ShardIdCount()
  289. }
  290. }
  291. }
  292. return 0
  293. }
  294. func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
  295. eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
  296. if selectedDataCenter != "" && selectedDataCenter != string(dc) {
  297. return
  298. }
  299. freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
  300. ecNodes = append(ecNodes, &EcNode{
  301. info: dn,
  302. dc: dc,
  303. rack: rack,
  304. freeEcSlot: int(freeEcSlots),
  305. })
  306. totalFreeEcSlots += freeEcSlots
  307. })
  308. return
  309. }
  310. func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error {
  311. fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
  312. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  313. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
  314. VolumeId: uint32(volumeId),
  315. Collection: collection,
  316. ShardIds: toBeDeletedShardIds,
  317. })
  318. return deleteErr
  319. })
  320. }
  321. func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error {
  322. fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
  323. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  324. _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
  325. VolumeId: uint32(volumeId),
  326. ShardIds: toBeUnmountedhardIds,
  327. })
  328. return deleteErr
  329. })
  330. }
  331. func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error {
  332. fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
  333. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  334. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  335. VolumeId: uint32(volumeId),
  336. Collection: collection,
  337. ShardIds: toBeMountedhardIds,
  338. })
  339. return mountErr
  340. })
  341. }
  342. func ceilDivide(a, b int) int {
  343. var r int
  344. if (a % b) != 0 {
  345. r = 1
  346. }
  347. return (a / b) + r
  348. }
  349. func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
  350. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  351. for _, shardInfo := range diskInfo.EcShardInfos {
  352. if needle.VolumeId(shardInfo.Id) == vid {
  353. return erasure_coding.ShardBits(shardInfo.EcIndexBits)
  354. }
  355. }
  356. }
  357. return 0
  358. }
  359. func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
  360. foundVolume := false
  361. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  362. if found {
  363. for _, shardInfo := range diskInfo.EcShardInfos {
  364. if needle.VolumeId(shardInfo.Id) == vid {
  365. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  366. newShardBits := oldShardBits
  367. for _, shardId := range shardIds {
  368. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  369. }
  370. shardInfo.EcIndexBits = uint32(newShardBits)
  371. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  372. foundVolume = true
  373. break
  374. }
  375. }
  376. } else {
  377. diskInfo = &master_pb.DiskInfo{
  378. Type: string(types.HardDriveType),
  379. }
  380. ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
  381. }
  382. if !foundVolume {
  383. var newShardBits erasure_coding.ShardBits
  384. for _, shardId := range shardIds {
  385. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  386. }
  387. diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
  388. Id: uint32(vid),
  389. Collection: collection,
  390. EcIndexBits: uint32(newShardBits),
  391. DiskType: string(types.HardDriveType),
  392. })
  393. ecNode.freeEcSlot -= len(shardIds)
  394. }
  395. return ecNode
  396. }
  397. func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
  398. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  399. for _, shardInfo := range diskInfo.EcShardInfos {
  400. if needle.VolumeId(shardInfo.Id) == vid {
  401. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  402. newShardBits := oldShardBits
  403. for _, shardId := range shardIds {
  404. newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
  405. }
  406. shardInfo.EcIndexBits = uint32(newShardBits)
  407. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  408. }
  409. }
  410. }
  411. return ecNode
  412. }
  413. func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
  414. countMap := make(map[string]int)
  415. for _, d := range data {
  416. id, count := identifierFn(d)
  417. countMap[id] += count
  418. }
  419. return countMap
  420. }
  421. func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
  422. groupMap := make(map[string][]*EcNode)
  423. for _, d := range data {
  424. id := identifierFn(d)
  425. groupMap[id] = append(groupMap[id], d)
  426. }
  427. return groupMap
  428. }
  429. type ecBalancer struct {
  430. commandEnv *CommandEnv
  431. ecNodes []*EcNode
  432. replicaPlacement *super_block.ReplicaPlacement
  433. applyBalancing bool
  434. }
  435. func (ecb *ecBalancer) racks() map[RackId]*EcRack {
  436. racks := make(map[RackId]*EcRack)
  437. for _, ecNode := range ecb.ecNodes {
  438. if racks[ecNode.rack] == nil {
  439. racks[ecNode.rack] = &EcRack{
  440. ecNodes: make(map[EcNodeId]*EcNode),
  441. }
  442. }
  443. racks[ecNode.rack].ecNodes[EcNodeId(ecNode.info.Id)] = ecNode
  444. racks[ecNode.rack].freeEcSlot += ecNode.freeEcSlot
  445. }
  446. return racks
  447. }
  448. func (ecb *ecBalancer) balanceEcVolumes(collection string) error {
  449. fmt.Printf("balanceEcVolumes %s\n", collection)
  450. if err := ecb.deleteDuplicatedEcShards(collection); err != nil {
  451. return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
  452. }
  453. if err := ecb.balanceEcShardsAcrossRacks(collection); err != nil {
  454. return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
  455. }
  456. if err := ecb.balanceEcShardsWithinRacks(collection); err != nil {
  457. return fmt.Errorf("balance within racks collection %s ec shards: %v", collection, err)
  458. }
  459. return nil
  460. }
  461. func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error {
  462. // vid => []ecNode
  463. vidLocations := ecb.collectVolumeIdToEcNodes(collection)
  464. // deduplicate ec shards
  465. for vid, locations := range vidLocations {
  466. if err := ecb.doDeduplicateEcShards(collection, vid, locations); err != nil {
  467. return err
  468. }
  469. }
  470. return nil
  471. }
  472. func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error {
  473. // check whether this volume has ecNodes that are over average
  474. shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
  475. for _, ecNode := range locations {
  476. shardBits := findEcVolumeShards(ecNode, vid)
  477. for _, shardId := range shardBits.ShardIds() {
  478. shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
  479. }
  480. }
  481. for shardId, ecNodes := range shardToLocations {
  482. if len(ecNodes) <= 1 {
  483. continue
  484. }
  485. sortEcNodesByFreeslotsAscending(ecNodes)
  486. fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id)
  487. if !ecb.applyBalancing {
  488. continue
  489. }
  490. duplicatedShardIds := []uint32{uint32(shardId)}
  491. for _, ecNode := range ecNodes[1:] {
  492. if err := unmountEcShards(ecb.commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  493. return err
  494. }
  495. if err := sourceServerDeleteEcShards(ecb.commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  496. return err
  497. }
  498. ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
  499. }
  500. }
  501. return nil
  502. }
  503. func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
  504. // collect vid => []ecNode, since previous steps can change the locations
  505. vidLocations := ecb.collectVolumeIdToEcNodes(collection)
  506. // spread the ec shards evenly
  507. for vid, locations := range vidLocations {
  508. if err := ecb.doBalanceEcShardsAcrossRacks(collection, vid, locations); err != nil {
  509. return err
  510. }
  511. }
  512. return nil
  513. }
  514. func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
  515. return groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
  516. shardBits := findEcVolumeShards(ecNode, vid)
  517. return string(ecNode.rack), shardBits.ShardIdCount()
  518. })
  519. }
  520. func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error {
  521. racks := ecb.racks()
  522. // calculate average number of shards an ec rack should have for one volume
  523. averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
  524. // see the volume's shards are in how many racks, and how many in each rack
  525. rackToShardCount := countShardsByRack(vid, locations)
  526. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  527. return string(ecNode.rack)
  528. })
  529. // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  530. ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
  531. for rackId, count := range rackToShardCount {
  532. if count <= averageShardsPerEcRack {
  533. continue
  534. }
  535. possibleEcNodes := rackEcNodesWithVid[rackId]
  536. for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
  537. ecShardsToMove[shardId] = ecNode
  538. }
  539. }
  540. for shardId, ecNode := range ecShardsToMove {
  541. rackId, err := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount)
  542. if err != nil {
  543. fmt.Printf("ec shard %d.%d at %s can not find a destination rack:\n%s\n", vid, shardId, ecNode.info.Id, err.Error())
  544. continue
  545. }
  546. var possibleDestinationEcNodes []*EcNode
  547. for _, n := range racks[rackId].ecNodes {
  548. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  549. }
  550. err = ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes)
  551. if err != nil {
  552. return err
  553. }
  554. rackToShardCount[string(rackId)] += 1
  555. rackToShardCount[string(ecNode.rack)] -= 1
  556. racks[rackId].freeEcSlot -= 1
  557. racks[ecNode.rack].freeEcSlot += 1
  558. }
  559. return nil
  560. }
  561. func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int) (RackId, error) {
  562. targets := []RackId{}
  563. targetShards := -1
  564. for _, shards := range rackToShardCount {
  565. if shards > targetShards {
  566. targetShards = shards
  567. }
  568. }
  569. details := ""
  570. for rackId, rack := range rackToEcNodes {
  571. shards := rackToShardCount[string(rackId)]
  572. if rack.freeEcSlot <= 0 {
  573. details += fmt.Sprintf(" Skipped %s because it has no free slots\n", rackId)
  574. continue
  575. }
  576. if ecb.replicaPlacement != nil && shards >= ecb.replicaPlacement.DiffRackCount {
  577. details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for other racks (%d)\n", rackId, shards, ecb.replicaPlacement.DiffRackCount)
  578. continue
  579. }
  580. if shards < targetShards {
  581. // Favor racks with less shards, to ensure an uniform distribution.
  582. targets = nil
  583. targetShards = shards
  584. }
  585. if shards == targetShards {
  586. targets = append(targets, rackId)
  587. }
  588. }
  589. if len(targets) == 0 {
  590. return "", errors.New(details)
  591. }
  592. return targets[rand.IntN(len(targets))], nil
  593. }
  594. func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
  595. // collect vid => []ecNode, since previous steps can change the locations
  596. vidLocations := ecb.collectVolumeIdToEcNodes(collection)
  597. racks := ecb.racks()
  598. // spread the ec shards evenly
  599. for vid, locations := range vidLocations {
  600. // see the volume's shards are in how many racks, and how many in each rack
  601. rackToShardCount := countShardsByRack(vid, locations)
  602. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  603. return string(ecNode.rack)
  604. })
  605. for rackId, _ := range rackToShardCount {
  606. var possibleDestinationEcNodes []*EcNode
  607. for _, n := range racks[RackId(rackId)].ecNodes {
  608. if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
  609. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  610. }
  611. }
  612. sourceEcNodes := rackEcNodesWithVid[rackId]
  613. averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
  614. if err := ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes); err != nil {
  615. return err
  616. }
  617. }
  618. }
  619. return nil
  620. }
  621. func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error {
  622. for _, ecNode := range existingLocations {
  623. shardBits := findEcVolumeShards(ecNode, vid)
  624. overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
  625. for _, shardId := range shardBits.ShardIds() {
  626. if overLimitCount <= 0 {
  627. break
  628. }
  629. fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
  630. err := ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes)
  631. if err != nil {
  632. return err
  633. }
  634. overLimitCount--
  635. }
  636. }
  637. return nil
  638. }
  639. func (ecb *ecBalancer) balanceEcRacks() error {
  640. // balance one rack for all ec shards
  641. for _, ecRack := range ecb.racks() {
  642. if err := ecb.doBalanceEcRack(ecRack); err != nil {
  643. return err
  644. }
  645. }
  646. return nil
  647. }
  648. func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
  649. if len(ecRack.ecNodes) <= 1 {
  650. return nil
  651. }
  652. var rackEcNodes []*EcNode
  653. for _, node := range ecRack.ecNodes {
  654. rackEcNodes = append(rackEcNodes, node)
  655. }
  656. ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
  657. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  658. if !found {
  659. return
  660. }
  661. for _, ecShardInfo := range diskInfo.EcShardInfos {
  662. count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
  663. }
  664. return ecNode.info.Id, count
  665. })
  666. var totalShardCount int
  667. for _, count := range ecNodeIdToShardCount {
  668. totalShardCount += count
  669. }
  670. averageShardCount := ceilDivide(totalShardCount, len(rackEcNodes))
  671. hasMove := true
  672. for hasMove {
  673. hasMove = false
  674. slices.SortFunc(rackEcNodes, func(a, b *EcNode) int {
  675. return b.freeEcSlot - a.freeEcSlot
  676. })
  677. emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1]
  678. emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id]
  679. if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
  680. emptyNodeIds := make(map[uint32]bool)
  681. if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
  682. for _, shards := range emptyDiskInfo.EcShardInfos {
  683. emptyNodeIds[shards.Id] = true
  684. }
  685. }
  686. if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
  687. for _, shards := range fullDiskInfo.EcShardInfos {
  688. if _, found := emptyNodeIds[shards.Id]; !found {
  689. for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
  690. fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
  691. err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing)
  692. if err != nil {
  693. return err
  694. }
  695. ecNodeIdToShardCount[emptyNode.info.Id]++
  696. ecNodeIdToShardCount[fullNode.info.Id]--
  697. hasMove = true
  698. break
  699. }
  700. break
  701. }
  702. }
  703. }
  704. }
  705. }
  706. return nil
  707. }
  708. func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, error) {
  709. if existingLocation == nil {
  710. return nil, fmt.Errorf("INTERNAL: missing source nodes")
  711. }
  712. if len(possibleDestinations) == 0 {
  713. return nil, fmt.Errorf("INTERNAL: missing destination nodes")
  714. }
  715. nodeShards := map[*EcNode]int{}
  716. for _, node := range possibleDestinations {
  717. nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount()
  718. }
  719. targets := []*EcNode{}
  720. targetShards := -1
  721. for _, shards := range nodeShards {
  722. if shards > targetShards {
  723. targetShards = shards
  724. }
  725. }
  726. details := ""
  727. for _, node := range possibleDestinations {
  728. if node.info.Id == existingLocation.info.Id {
  729. continue
  730. }
  731. if node.freeEcSlot <= 0 {
  732. details += fmt.Sprintf(" Skipped %s because it has no free slots\n", node.info.Id)
  733. continue
  734. }
  735. shards := nodeShards[node]
  736. if ecb.replicaPlacement != nil && shards >= ecb.replicaPlacement.SameRackCount {
  737. details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for the rack (%d)\n", node.info.Id, shards, ecb.replicaPlacement.SameRackCount)
  738. continue
  739. }
  740. if shards < targetShards {
  741. // Favor nodes with less shards, to ensure an uniform distribution.
  742. targets = nil
  743. targetShards = shards
  744. }
  745. if shards == targetShards {
  746. targets = append(targets, node)
  747. }
  748. }
  749. if len(targets) == 0 {
  750. return nil, errors.New(details)
  751. }
  752. return targets[rand.IntN(len(targets))], nil
  753. }
  754. func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error {
  755. destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
  756. if err != nil {
  757. fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error())
  758. return nil
  759. }
  760. fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
  761. return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing)
  762. }
  763. func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
  764. picked := make(map[erasure_coding.ShardId]*EcNode)
  765. var candidateEcNodes []*CandidateEcNode
  766. for _, ecNode := range ecNodes {
  767. shardBits := findEcVolumeShards(ecNode, vid)
  768. if shardBits.ShardIdCount() > 0 {
  769. candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
  770. ecNode: ecNode,
  771. shardCount: shardBits.ShardIdCount(),
  772. })
  773. }
  774. }
  775. slices.SortFunc(candidateEcNodes, func(a, b *CandidateEcNode) int {
  776. return b.shardCount - a.shardCount
  777. })
  778. for i := 0; i < n; i++ {
  779. selectedEcNodeIndex := -1
  780. for i, candidateEcNode := range candidateEcNodes {
  781. shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
  782. if shardBits > 0 {
  783. selectedEcNodeIndex = i
  784. for _, shardId := range shardBits.ShardIds() {
  785. candidateEcNode.shardCount--
  786. picked[shardId] = candidateEcNode.ecNode
  787. candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
  788. break
  789. }
  790. break
  791. }
  792. }
  793. if selectedEcNodeIndex >= 0 {
  794. ensureSortedEcNodes(candidateEcNodes, selectedEcNodeIndex, func(i, j int) bool {
  795. return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
  796. })
  797. }
  798. }
  799. return picked
  800. }
  801. func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.VolumeId][]*EcNode {
  802. vidLocations := make(map[needle.VolumeId][]*EcNode)
  803. for _, ecNode := range ecb.ecNodes {
  804. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  805. if !found {
  806. continue
  807. }
  808. for _, shardInfo := range diskInfo.EcShardInfos {
  809. // ignore if not in current collection
  810. if shardInfo.Collection == collection {
  811. vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
  812. }
  813. }
  814. }
  815. return vidLocations
  816. }
  817. func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, applyBalancing bool) (err error) {
  818. if len(collections) == 0 {
  819. return fmt.Errorf("no collections to balance")
  820. }
  821. // collect all ec nodes
  822. allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc)
  823. if err != nil {
  824. return err
  825. }
  826. if totalFreeEcSlots < 1 {
  827. return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots)
  828. }
  829. ecb := &ecBalancer{
  830. commandEnv: commandEnv,
  831. ecNodes: allEcNodes,
  832. replicaPlacement: ecReplicaPlacement,
  833. applyBalancing: applyBalancing,
  834. }
  835. for _, c := range collections {
  836. if err = ecb.balanceEcVolumes(c); err != nil {
  837. return err
  838. }
  839. }
  840. if err := ecb.balanceEcRacks(); err != nil {
  841. return fmt.Errorf("balance ec racks: %v", err)
  842. }
  843. return nil
  844. }