You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

819 lines
26 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/operation"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  12. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  14. "golang.org/x/exp/slices"
  15. "google.golang.org/grpc"
  16. )
  17. type DataCenterId string
  18. type EcNodeId string
  19. type RackId string
  20. type EcNode struct {
  21. info *master_pb.DataNodeInfo
  22. dc DataCenterId
  23. rack RackId
  24. freeEcSlot int
  25. }
  26. type CandidateEcNode struct {
  27. ecNode *EcNode
  28. shardCount int
  29. }
  30. type EcRack struct {
  31. ecNodes map[EcNodeId]*EcNode
  32. freeEcSlot int
  33. }
  34. func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
  35. if !commandEnv.isLocked() {
  36. return fmt.Errorf("lock is lost")
  37. }
  38. copiedShardIds := []uint32{uint32(shardId)}
  39. if applyBalancing {
  40. existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
  41. // ask destination node to copy shard and the ecx file from source node, and mount it
  42. copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
  43. if err != nil {
  44. return err
  45. }
  46. // unmount the to be deleted shards
  47. err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds)
  48. if err != nil {
  49. return err
  50. }
  51. // ask source node to delete the shard, and maybe the ecx file
  52. err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds)
  53. if err != nil {
  54. return err
  55. }
  56. fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
  57. }
  58. destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
  59. existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
  60. return nil
  61. }
  62. func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
  63. targetServer *EcNode, shardIdsToCopy []uint32,
  64. volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
  65. fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  66. targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
  67. err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  68. if targetAddress != existingLocation {
  69. fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  70. _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
  71. VolumeId: uint32(volumeId),
  72. Collection: collection,
  73. ShardIds: shardIdsToCopy,
  74. CopyEcxFile: true,
  75. CopyEcjFile: true,
  76. CopyVifFile: true,
  77. SourceDataNode: string(existingLocation),
  78. })
  79. if copyErr != nil {
  80. return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
  81. }
  82. }
  83. fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
  84. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  85. VolumeId: uint32(volumeId),
  86. Collection: collection,
  87. ShardIds: shardIdsToCopy,
  88. })
  89. if mountErr != nil {
  90. return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
  91. }
  92. if targetAddress != existingLocation {
  93. copiedShardIds = shardIdsToCopy
  94. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
  95. }
  96. return nil
  97. })
  98. if err != nil {
  99. return
  100. }
  101. return
  102. }
  103. // TODO: Make dc a DataCenterId instead of string.
  104. func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) {
  105. for _, dc := range topo.DataCenterInfos {
  106. for _, rack := range dc.RackInfos {
  107. for _, dn := range rack.DataNodeInfos {
  108. fn(dc.Id, RackId(rack.Id), dn)
  109. }
  110. }
  111. }
  112. }
  113. func sortEcNodesByFreeslotsDescending(ecNodes []*EcNode) {
  114. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  115. return b.freeEcSlot - a.freeEcSlot
  116. })
  117. }
  118. func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
  119. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  120. return a.freeEcSlot - b.freeEcSlot
  121. })
  122. }
  123. // if the index node changed the freeEcSlot, need to keep every EcNode still sorted
  124. func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
  125. for i := index - 1; i >= 0; i-- {
  126. if lessThan(i+1, i) {
  127. swap(data, i, i+1)
  128. } else {
  129. break
  130. }
  131. }
  132. for i := index + 1; i < len(data); i++ {
  133. if lessThan(i, i-1) {
  134. swap(data, i, i-1)
  135. } else {
  136. break
  137. }
  138. }
  139. }
  140. func swap(data []*CandidateEcNode, i, j int) {
  141. t := data[i]
  142. data[i] = data[j]
  143. data[j] = t
  144. }
  145. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  146. for _, ecShardInfo := range ecShardInfos {
  147. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  148. count += shardBits.ShardIdCount()
  149. }
  150. return
  151. }
  152. func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
  153. if dn.DiskInfos == nil {
  154. return 0
  155. }
  156. diskInfo := dn.DiskInfos[string(diskType)]
  157. if diskInfo == nil {
  158. return 0
  159. }
  160. return int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
  161. }
  162. func (ecNode *EcNode) localShardIdCount(vid uint32) int {
  163. for _, diskInfo := range ecNode.info.DiskInfos {
  164. for _, ecShardInfo := range diskInfo.EcShardInfos {
  165. if vid == ecShardInfo.Id {
  166. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  167. return shardBits.ShardIdCount()
  168. }
  169. }
  170. }
  171. return 0
  172. }
  173. func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  174. // list all possible locations
  175. // collect topology information
  176. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  177. if err != nil {
  178. return
  179. }
  180. // find out all volume servers with one slot left.
  181. ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
  182. sortEcNodesByFreeslotsDescending(ecNodes)
  183. return
  184. }
  185. func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
  186. eachDataNode(topo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  187. if selectedDataCenter != "" && selectedDataCenter != dc {
  188. return
  189. }
  190. freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
  191. ecNodes = append(ecNodes, &EcNode{
  192. info: dn,
  193. dc: DataCenterId(dc),
  194. rack: rack,
  195. freeEcSlot: int(freeEcSlots),
  196. })
  197. totalFreeEcSlots += freeEcSlots
  198. })
  199. return
  200. }
  201. func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error {
  202. fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
  203. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  204. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
  205. VolumeId: uint32(volumeId),
  206. Collection: collection,
  207. ShardIds: toBeDeletedShardIds,
  208. })
  209. return deleteErr
  210. })
  211. }
  212. func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error {
  213. fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
  214. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  215. _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
  216. VolumeId: uint32(volumeId),
  217. ShardIds: toBeUnmountedhardIds,
  218. })
  219. return deleteErr
  220. })
  221. }
  222. func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error {
  223. fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
  224. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  225. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  226. VolumeId: uint32(volumeId),
  227. Collection: collection,
  228. ShardIds: toBeMountedhardIds,
  229. })
  230. return mountErr
  231. })
  232. }
  233. func ceilDivide(a, b int) int {
  234. var r int
  235. if (a % b) != 0 {
  236. r = 1
  237. }
  238. return (a / b) + r
  239. }
  240. func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
  241. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  242. for _, shardInfo := range diskInfo.EcShardInfos {
  243. if needle.VolumeId(shardInfo.Id) == vid {
  244. return erasure_coding.ShardBits(shardInfo.EcIndexBits)
  245. }
  246. }
  247. }
  248. return 0
  249. }
  250. func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
  251. foundVolume := false
  252. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  253. if found {
  254. for _, shardInfo := range diskInfo.EcShardInfos {
  255. if needle.VolumeId(shardInfo.Id) == vid {
  256. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  257. newShardBits := oldShardBits
  258. for _, shardId := range shardIds {
  259. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  260. }
  261. shardInfo.EcIndexBits = uint32(newShardBits)
  262. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  263. foundVolume = true
  264. break
  265. }
  266. }
  267. } else {
  268. diskInfo = &master_pb.DiskInfo{
  269. Type: string(types.HardDriveType),
  270. }
  271. ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
  272. }
  273. if !foundVolume {
  274. var newShardBits erasure_coding.ShardBits
  275. for _, shardId := range shardIds {
  276. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  277. }
  278. diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
  279. Id: uint32(vid),
  280. Collection: collection,
  281. EcIndexBits: uint32(newShardBits),
  282. DiskType: string(types.HardDriveType),
  283. })
  284. ecNode.freeEcSlot -= len(shardIds)
  285. }
  286. return ecNode
  287. }
  288. func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
  289. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  290. for _, shardInfo := range diskInfo.EcShardInfos {
  291. if needle.VolumeId(shardInfo.Id) == vid {
  292. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  293. newShardBits := oldShardBits
  294. for _, shardId := range shardIds {
  295. newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
  296. }
  297. shardInfo.EcIndexBits = uint32(newShardBits)
  298. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  299. }
  300. }
  301. }
  302. return ecNode
  303. }
  304. func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
  305. countMap := make(map[string]int)
  306. for _, d := range data {
  307. id, count := identifierFn(d)
  308. countMap[id] += count
  309. }
  310. return countMap
  311. }
  312. func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
  313. groupMap := make(map[string][]*EcNode)
  314. for _, d := range data {
  315. id := identifierFn(d)
  316. groupMap[id] = append(groupMap[id], d)
  317. }
  318. return groupMap
  319. }
  320. func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack {
  321. // collect racks info
  322. racks := make(map[RackId]*EcRack)
  323. for _, ecNode := range allEcNodes {
  324. if racks[ecNode.rack] == nil {
  325. racks[ecNode.rack] = &EcRack{
  326. ecNodes: make(map[EcNodeId]*EcNode),
  327. }
  328. }
  329. racks[ecNode.rack].ecNodes[EcNodeId(ecNode.info.Id)] = ecNode
  330. racks[ecNode.rack].freeEcSlot += ecNode.freeEcSlot
  331. }
  332. return racks
  333. }
  334. func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
  335. fmt.Printf("balanceEcVolumes %s\n", collection)
  336. if err := deleteDuplicatedEcShards(commandEnv, allEcNodes, collection, applyBalancing); err != nil {
  337. return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
  338. }
  339. if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
  340. return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
  341. }
  342. if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
  343. return fmt.Errorf("balance within racks collection %s ec shards: %v", collection, err)
  344. }
  345. return nil
  346. }
  347. func deleteDuplicatedEcShards(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error {
  348. // vid => []ecNode
  349. vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection)
  350. // deduplicate ec shards
  351. for vid, locations := range vidLocations {
  352. if err := doDeduplicateEcShards(commandEnv, collection, vid, locations, applyBalancing); err != nil {
  353. return err
  354. }
  355. }
  356. return nil
  357. }
  358. func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
  359. // check whether this volume has ecNodes that are over average
  360. shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
  361. for _, ecNode := range locations {
  362. shardBits := findEcVolumeShards(ecNode, vid)
  363. for _, shardId := range shardBits.ShardIds() {
  364. shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
  365. }
  366. }
  367. for shardId, ecNodes := range shardToLocations {
  368. if len(ecNodes) <= 1 {
  369. continue
  370. }
  371. sortEcNodesByFreeslotsAscending(ecNodes)
  372. fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id)
  373. if !applyBalancing {
  374. continue
  375. }
  376. duplicatedShardIds := []uint32{uint32(shardId)}
  377. for _, ecNode := range ecNodes[1:] {
  378. if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  379. return err
  380. }
  381. if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  382. return err
  383. }
  384. ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
  385. }
  386. }
  387. return nil
  388. }
  389. func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
  390. // collect vid => []ecNode, since previous steps can change the locations
  391. vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection)
  392. // spread the ec shards evenly
  393. for vid, locations := range vidLocations {
  394. if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, applyBalancing); err != nil {
  395. return err
  396. }
  397. }
  398. return nil
  399. }
  400. func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
  401. // calculate average number of shards an ec rack should have for one volume
  402. averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
  403. // see the volume's shards are in how many racks, and how many in each rack
  404. rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
  405. shardBits := findEcVolumeShards(ecNode, vid)
  406. return string(ecNode.rack), shardBits.ShardIdCount()
  407. })
  408. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  409. return string(ecNode.rack)
  410. })
  411. // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  412. ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
  413. for rackId, count := range rackToShardCount {
  414. if count > averageShardsPerEcRack {
  415. possibleEcNodes := rackEcNodesWithVid[rackId]
  416. for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
  417. ecShardsToMove[shardId] = ecNode
  418. }
  419. }
  420. }
  421. for shardId, ecNode := range ecShardsToMove {
  422. rackId := pickOneRack(racks, rackToShardCount, averageShardsPerEcRack)
  423. if rackId == "" {
  424. fmt.Printf("ec shard %d.%d at %s can not find a destination rack\n", vid, shardId, ecNode.info.Id)
  425. continue
  426. }
  427. var possibleDestinationEcNodes []*EcNode
  428. for _, n := range racks[rackId].ecNodes {
  429. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  430. }
  431. err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
  432. if err != nil {
  433. return err
  434. }
  435. rackToShardCount[string(rackId)] += 1
  436. rackToShardCount[string(ecNode.rack)] -= 1
  437. racks[rackId].freeEcSlot -= 1
  438. racks[ecNode.rack].freeEcSlot += 1
  439. }
  440. return nil
  441. }
  442. func pickOneRack(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, averageShardsPerEcRack int) RackId {
  443. // TODO later may need to add some randomness
  444. for rackId, rack := range rackToEcNodes {
  445. if rackToShardCount[string(rackId)] >= averageShardsPerEcRack {
  446. continue
  447. }
  448. if rack.freeEcSlot <= 0 {
  449. continue
  450. }
  451. return rackId
  452. }
  453. return ""
  454. }
  455. func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
  456. // collect vid => []ecNode, since previous steps can change the locations
  457. vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection)
  458. // spread the ec shards evenly
  459. for vid, locations := range vidLocations {
  460. // see the volume's shards are in how many racks, and how many in each rack
  461. rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
  462. shardBits := findEcVolumeShards(ecNode, vid)
  463. return string(ecNode.rack), shardBits.ShardIdCount()
  464. })
  465. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  466. return string(ecNode.rack)
  467. })
  468. for rackId, _ := range rackToShardCount {
  469. var possibleDestinationEcNodes []*EcNode
  470. for _, n := range racks[RackId(rackId)].ecNodes {
  471. if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
  472. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  473. }
  474. }
  475. sourceEcNodes := rackEcNodesWithVid[rackId]
  476. averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
  477. if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil {
  478. return err
  479. }
  480. }
  481. }
  482. return nil
  483. }
  484. func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
  485. for _, ecNode := range existingLocations {
  486. shardBits := findEcVolumeShards(ecNode, vid)
  487. overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
  488. for _, shardId := range shardBits.ShardIds() {
  489. if overLimitCount <= 0 {
  490. break
  491. }
  492. fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
  493. err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
  494. if err != nil {
  495. return err
  496. }
  497. overLimitCount--
  498. }
  499. }
  500. return nil
  501. }
  502. func balanceEcRacks(commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error {
  503. // balance one rack for all ec shards
  504. for _, ecRack := range racks {
  505. if err := doBalanceEcRack(commandEnv, ecRack, applyBalancing); err != nil {
  506. return err
  507. }
  508. }
  509. return nil
  510. }
  511. func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error {
  512. if len(ecRack.ecNodes) <= 1 {
  513. return nil
  514. }
  515. var rackEcNodes []*EcNode
  516. for _, node := range ecRack.ecNodes {
  517. rackEcNodes = append(rackEcNodes, node)
  518. }
  519. ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
  520. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  521. if !found {
  522. return
  523. }
  524. for _, ecShardInfo := range diskInfo.EcShardInfos {
  525. count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
  526. }
  527. return ecNode.info.Id, count
  528. })
  529. var totalShardCount int
  530. for _, count := range ecNodeIdToShardCount {
  531. totalShardCount += count
  532. }
  533. averageShardCount := ceilDivide(totalShardCount, len(rackEcNodes))
  534. hasMove := true
  535. for hasMove {
  536. hasMove = false
  537. slices.SortFunc(rackEcNodes, func(a, b *EcNode) int {
  538. return b.freeEcSlot - a.freeEcSlot
  539. })
  540. emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1]
  541. emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id]
  542. if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
  543. emptyNodeIds := make(map[uint32]bool)
  544. if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
  545. for _, shards := range emptyDiskInfo.EcShardInfos {
  546. emptyNodeIds[shards.Id] = true
  547. }
  548. }
  549. if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
  550. for _, shards := range fullDiskInfo.EcShardInfos {
  551. if _, found := emptyNodeIds[shards.Id]; !found {
  552. for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
  553. fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
  554. err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
  555. if err != nil {
  556. return err
  557. }
  558. ecNodeIdToShardCount[emptyNode.info.Id]++
  559. ecNodeIdToShardCount[fullNode.info.Id]--
  560. hasMove = true
  561. break
  562. }
  563. break
  564. }
  565. }
  566. }
  567. }
  568. }
  569. return nil
  570. }
  571. func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
  572. sortEcNodesByFreeslotsDescending(possibleDestinationEcNodes)
  573. skipReason := ""
  574. for _, destEcNode := range possibleDestinationEcNodes {
  575. if destEcNode.info.Id == existingLocation.info.Id {
  576. continue
  577. }
  578. if destEcNode.freeEcSlot <= 0 {
  579. skipReason += fmt.Sprintf(" Skipping %s because it has no free slots\n", destEcNode.info.Id)
  580. continue
  581. }
  582. if findEcVolumeShards(destEcNode, vid).ShardIdCount() >= averageShardsPerEcNode {
  583. skipReason += fmt.Sprintf(" Skipping %s because it %d >= avernageShards (%d)\n",
  584. destEcNode.info.Id, findEcVolumeShards(destEcNode, vid).ShardIdCount(), averageShardsPerEcNode)
  585. continue
  586. }
  587. fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id)
  588. err := moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing)
  589. if err != nil {
  590. return err
  591. }
  592. return nil
  593. }
  594. fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, skipReason)
  595. return nil
  596. }
  597. func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
  598. picked := make(map[erasure_coding.ShardId]*EcNode)
  599. var candidateEcNodes []*CandidateEcNode
  600. for _, ecNode := range ecNodes {
  601. shardBits := findEcVolumeShards(ecNode, vid)
  602. if shardBits.ShardIdCount() > 0 {
  603. candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
  604. ecNode: ecNode,
  605. shardCount: shardBits.ShardIdCount(),
  606. })
  607. }
  608. }
  609. slices.SortFunc(candidateEcNodes, func(a, b *CandidateEcNode) int {
  610. return b.shardCount - a.shardCount
  611. })
  612. for i := 0; i < n; i++ {
  613. selectedEcNodeIndex := -1
  614. for i, candidateEcNode := range candidateEcNodes {
  615. shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
  616. if shardBits > 0 {
  617. selectedEcNodeIndex = i
  618. for _, shardId := range shardBits.ShardIds() {
  619. candidateEcNode.shardCount--
  620. picked[shardId] = candidateEcNode.ecNode
  621. candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
  622. break
  623. }
  624. break
  625. }
  626. }
  627. if selectedEcNodeIndex >= 0 {
  628. ensureSortedEcNodes(candidateEcNodes, selectedEcNodeIndex, func(i, j int) bool {
  629. return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
  630. })
  631. }
  632. }
  633. return picked
  634. }
  635. func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needle.VolumeId][]*EcNode {
  636. vidLocations := make(map[needle.VolumeId][]*EcNode)
  637. for _, ecNode := range allEcNodes {
  638. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  639. if !found {
  640. continue
  641. }
  642. for _, shardInfo := range diskInfo.EcShardInfos {
  643. // ignore if not in current collection
  644. if shardInfo.Collection == collection {
  645. vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
  646. }
  647. }
  648. }
  649. return vidLocations
  650. }
  651. func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_block.ReplicaPlacement, error) {
  652. for _, ecNode := range nodes {
  653. for _, diskInfo := range ecNode.info.DiskInfos {
  654. for _, volumeInfo := range diskInfo.VolumeInfos {
  655. if needle.VolumeId(volumeInfo.Id) != vid {
  656. continue
  657. }
  658. return super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
  659. }
  660. }
  661. }
  662. return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid)
  663. }
  664. func EcBalance(commandEnv *CommandEnv, collections []string, dc string, applyBalancing bool) (err error) {
  665. if len(collections) == 0 {
  666. return fmt.Errorf("no collections to balance")
  667. }
  668. // collect all ec nodes
  669. allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, dc)
  670. if err != nil {
  671. return err
  672. }
  673. if totalFreeEcSlots < 1 {
  674. return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots)
  675. }
  676. racks := collectRacks(allEcNodes)
  677. for _, c := range collections {
  678. if err = balanceEcVolumes(commandEnv, c, allEcNodes, racks, applyBalancing); err != nil {
  679. return err
  680. }
  681. }
  682. if err := balanceEcRacks(commandEnv, racks, applyBalancing); err != nil {
  683. return fmt.Errorf("balance ec racks: %v", err)
  684. }
  685. return nil
  686. }