You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

927 lines
31 KiB

3 months ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math/rand/v2"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/operation"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  16. "golang.org/x/exp/slices"
  17. "google.golang.org/grpc"
  18. )
  19. type DataCenterId string
  20. type EcNodeId string
  21. type RackId string
  22. type EcNode struct {
  23. info *master_pb.DataNodeInfo
  24. dc DataCenterId
  25. rack RackId
  26. freeEcSlot int
  27. }
  28. type CandidateEcNode struct {
  29. ecNode *EcNode
  30. shardCount int
  31. }
  32. type EcRack struct {
  33. ecNodes map[EcNodeId]*EcNode
  34. freeEcSlot int
  35. }
  36. // TODO: We're shuffling way too many parameters between internal functions. Encapsulate in a ecBalancer{} struct.
  37. var (
  38. // Overridable functions for testing.
  39. getDefaultReplicaPlacement = _getDefaultReplicaPlacement
  40. )
  41. func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
  42. var resp *master_pb.GetMasterConfigurationResponse
  43. var err error
  44. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  45. resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  46. return err
  47. })
  48. if err != nil {
  49. return nil, err
  50. }
  51. return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
  52. }
  53. func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) {
  54. if replicaStr != "" {
  55. rp, err := super_block.NewReplicaPlacementFromString(replicaStr)
  56. if err == nil {
  57. fmt.Printf("using replica placement %q for EC volumes\n", rp.String())
  58. }
  59. return rp, err
  60. }
  61. // No replica placement argument provided, resolve from master default settings.
  62. rp, err := getDefaultReplicaPlacement(commandEnv)
  63. if err == nil {
  64. fmt.Printf("using master default replica placement %q for EC volumes\n", rp.String())
  65. }
  66. return rp, err
  67. }
  68. func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
  69. if !commandEnv.isLocked() {
  70. return fmt.Errorf("lock is lost")
  71. }
  72. copiedShardIds := []uint32{uint32(shardId)}
  73. if applyBalancing {
  74. existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
  75. // ask destination node to copy shard and the ecx file from source node, and mount it
  76. copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
  77. if err != nil {
  78. return err
  79. }
  80. // unmount the to be deleted shards
  81. err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds)
  82. if err != nil {
  83. return err
  84. }
  85. // ask source node to delete the shard, and maybe the ecx file
  86. err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds)
  87. if err != nil {
  88. return err
  89. }
  90. fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
  91. }
  92. destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
  93. existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
  94. return nil
  95. }
  96. func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
  97. targetServer *EcNode, shardIdsToCopy []uint32,
  98. volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
  99. fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  100. targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
  101. err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  102. if targetAddress != existingLocation {
  103. fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  104. _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
  105. VolumeId: uint32(volumeId),
  106. Collection: collection,
  107. ShardIds: shardIdsToCopy,
  108. CopyEcxFile: true,
  109. CopyEcjFile: true,
  110. CopyVifFile: true,
  111. SourceDataNode: string(existingLocation),
  112. })
  113. if copyErr != nil {
  114. return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
  115. }
  116. }
  117. fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
  118. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  119. VolumeId: uint32(volumeId),
  120. Collection: collection,
  121. ShardIds: shardIdsToCopy,
  122. })
  123. if mountErr != nil {
  124. return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
  125. }
  126. if targetAddress != existingLocation {
  127. copiedShardIds = shardIdsToCopy
  128. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
  129. }
  130. return nil
  131. })
  132. if err != nil {
  133. return
  134. }
  135. return
  136. }
  137. func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo)) {
  138. for _, dc := range topo.DataCenterInfos {
  139. for _, rack := range dc.RackInfos {
  140. for _, dn := range rack.DataNodeInfos {
  141. fn(DataCenterId(dc.Id), RackId(rack.Id), dn)
  142. }
  143. }
  144. }
  145. }
  146. func sortEcNodesByFreeslotsDescending(ecNodes []*EcNode) {
  147. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  148. return b.freeEcSlot - a.freeEcSlot
  149. })
  150. }
  151. func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
  152. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  153. return a.freeEcSlot - b.freeEcSlot
  154. })
  155. }
  156. // if the index node changed the freeEcSlot, need to keep every EcNode still sorted
  157. func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
  158. for i := index - 1; i >= 0; i-- {
  159. if lessThan(i+1, i) {
  160. swap(data, i, i+1)
  161. } else {
  162. break
  163. }
  164. }
  165. for i := index + 1; i < len(data); i++ {
  166. if lessThan(i, i-1) {
  167. swap(data, i, i-1)
  168. } else {
  169. break
  170. }
  171. }
  172. }
  173. func swap(data []*CandidateEcNode, i, j int) {
  174. t := data[i]
  175. data[i] = data[j]
  176. data[j] = t
  177. }
  178. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  179. for _, ecShardInfo := range ecShardInfos {
  180. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  181. count += shardBits.ShardIdCount()
  182. }
  183. return
  184. }
  185. func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
  186. if dn.DiskInfos == nil {
  187. return 0
  188. }
  189. diskInfo := dn.DiskInfos[string(diskType)]
  190. if diskInfo == nil {
  191. return 0
  192. }
  193. return int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
  194. }
  195. func (ecNode *EcNode) localShardIdCount(vid uint32) int {
  196. for _, diskInfo := range ecNode.info.DiskInfos {
  197. for _, ecShardInfo := range diskInfo.EcShardInfos {
  198. if vid == ecShardInfo.Id {
  199. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  200. return shardBits.ShardIdCount()
  201. }
  202. }
  203. }
  204. return 0
  205. }
  206. func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  207. // list all possible locations
  208. // collect topology information
  209. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  210. if err != nil {
  211. return
  212. }
  213. // find out all volume servers with one slot left.
  214. ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
  215. sortEcNodesByFreeslotsDescending(ecNodes)
  216. return
  217. }
  218. func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
  219. eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
  220. if selectedDataCenter != "" && selectedDataCenter != string(dc) {
  221. return
  222. }
  223. freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
  224. ecNodes = append(ecNodes, &EcNode{
  225. info: dn,
  226. dc: dc,
  227. rack: rack,
  228. freeEcSlot: int(freeEcSlots),
  229. })
  230. totalFreeEcSlots += freeEcSlots
  231. })
  232. return
  233. }
  234. func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error {
  235. fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
  236. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  237. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
  238. VolumeId: uint32(volumeId),
  239. Collection: collection,
  240. ShardIds: toBeDeletedShardIds,
  241. })
  242. return deleteErr
  243. })
  244. }
  245. func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error {
  246. fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
  247. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  248. _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
  249. VolumeId: uint32(volumeId),
  250. ShardIds: toBeUnmountedhardIds,
  251. })
  252. return deleteErr
  253. })
  254. }
  255. func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error {
  256. fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
  257. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  258. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  259. VolumeId: uint32(volumeId),
  260. Collection: collection,
  261. ShardIds: toBeMountedhardIds,
  262. })
  263. return mountErr
  264. })
  265. }
  266. func ceilDivide(a, b int) int {
  267. var r int
  268. if (a % b) != 0 {
  269. r = 1
  270. }
  271. return (a / b) + r
  272. }
  273. func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
  274. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  275. for _, shardInfo := range diskInfo.EcShardInfos {
  276. if needle.VolumeId(shardInfo.Id) == vid {
  277. return erasure_coding.ShardBits(shardInfo.EcIndexBits)
  278. }
  279. }
  280. }
  281. return 0
  282. }
  283. func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
  284. foundVolume := false
  285. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  286. if found {
  287. for _, shardInfo := range diskInfo.EcShardInfos {
  288. if needle.VolumeId(shardInfo.Id) == vid {
  289. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  290. newShardBits := oldShardBits
  291. for _, shardId := range shardIds {
  292. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  293. }
  294. shardInfo.EcIndexBits = uint32(newShardBits)
  295. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  296. foundVolume = true
  297. break
  298. }
  299. }
  300. } else {
  301. diskInfo = &master_pb.DiskInfo{
  302. Type: string(types.HardDriveType),
  303. }
  304. ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
  305. }
  306. if !foundVolume {
  307. var newShardBits erasure_coding.ShardBits
  308. for _, shardId := range shardIds {
  309. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  310. }
  311. diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
  312. Id: uint32(vid),
  313. Collection: collection,
  314. EcIndexBits: uint32(newShardBits),
  315. DiskType: string(types.HardDriveType),
  316. })
  317. ecNode.freeEcSlot -= len(shardIds)
  318. }
  319. return ecNode
  320. }
  321. func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
  322. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  323. for _, shardInfo := range diskInfo.EcShardInfos {
  324. if needle.VolumeId(shardInfo.Id) == vid {
  325. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  326. newShardBits := oldShardBits
  327. for _, shardId := range shardIds {
  328. newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
  329. }
  330. shardInfo.EcIndexBits = uint32(newShardBits)
  331. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  332. }
  333. }
  334. }
  335. return ecNode
  336. }
  337. func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
  338. countMap := make(map[string]int)
  339. for _, d := range data {
  340. id, count := identifierFn(d)
  341. countMap[id] += count
  342. }
  343. return countMap
  344. }
  345. func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
  346. groupMap := make(map[string][]*EcNode)
  347. for _, d := range data {
  348. id := identifierFn(d)
  349. groupMap[id] = append(groupMap[id], d)
  350. }
  351. return groupMap
  352. }
  353. func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack {
  354. // collect racks info
  355. racks := make(map[RackId]*EcRack)
  356. for _, ecNode := range allEcNodes {
  357. if racks[ecNode.rack] == nil {
  358. racks[ecNode.rack] = &EcRack{
  359. ecNodes: make(map[EcNodeId]*EcNode),
  360. }
  361. }
  362. racks[ecNode.rack].ecNodes[EcNodeId(ecNode.info.Id)] = ecNode
  363. racks[ecNode.rack].freeEcSlot += ecNode.freeEcSlot
  364. }
  365. return racks
  366. }
  367. func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*EcNode, racks map[RackId]*EcRack, rp *super_block.ReplicaPlacement, applyBalancing bool) error {
  368. fmt.Printf("balanceEcVolumes %s\n", collection)
  369. if err := deleteDuplicatedEcShards(commandEnv, allEcNodes, collection, applyBalancing); err != nil {
  370. return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
  371. }
  372. if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, rp, applyBalancing); err != nil {
  373. return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
  374. }
  375. if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
  376. return fmt.Errorf("balance within racks collection %s ec shards: %v", collection, err)
  377. }
  378. return nil
  379. }
  380. func deleteDuplicatedEcShards(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error {
  381. // vid => []ecNode
  382. vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection)
  383. // deduplicate ec shards
  384. for vid, locations := range vidLocations {
  385. if err := doDeduplicateEcShards(commandEnv, collection, vid, locations, applyBalancing); err != nil {
  386. return err
  387. }
  388. }
  389. return nil
  390. }
  391. func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
  392. // check whether this volume has ecNodes that are over average
  393. shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
  394. for _, ecNode := range locations {
  395. shardBits := findEcVolumeShards(ecNode, vid)
  396. for _, shardId := range shardBits.ShardIds() {
  397. shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
  398. }
  399. }
  400. for shardId, ecNodes := range shardToLocations {
  401. if len(ecNodes) <= 1 {
  402. continue
  403. }
  404. sortEcNodesByFreeslotsAscending(ecNodes)
  405. fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id)
  406. if !applyBalancing {
  407. continue
  408. }
  409. duplicatedShardIds := []uint32{uint32(shardId)}
  410. for _, ecNode := range ecNodes[1:] {
  411. if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  412. return err
  413. }
  414. if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  415. return err
  416. }
  417. ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
  418. }
  419. }
  420. return nil
  421. }
  422. func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, rp *super_block.ReplicaPlacement, applyBalancing bool) error {
  423. // collect vid => []ecNode, since previous steps can change the locations
  424. vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection)
  425. // spread the ec shards evenly
  426. for vid, locations := range vidLocations {
  427. if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, rp, applyBalancing); err != nil {
  428. return err
  429. }
  430. }
  431. return nil
  432. }
  433. func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
  434. return groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
  435. shardBits := findEcVolumeShards(ecNode, vid)
  436. return string(ecNode.rack), shardBits.ShardIdCount()
  437. })
  438. }
  439. // TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards.
  440. func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, rp *super_block.ReplicaPlacement, applyBalancing bool) error {
  441. // calculate average number of shards an ec rack should have for one volume
  442. averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
  443. // see the volume's shards are in how many racks, and how many in each rack
  444. rackToShardCount := countShardsByRack(vid, locations)
  445. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  446. return string(ecNode.rack)
  447. })
  448. // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  449. ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
  450. for rackId, count := range rackToShardCount {
  451. if count <= averageShardsPerEcRack {
  452. continue
  453. }
  454. possibleEcNodes := rackEcNodesWithVid[rackId]
  455. for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
  456. ecShardsToMove[shardId] = ecNode
  457. }
  458. }
  459. for shardId, ecNode := range ecShardsToMove {
  460. // TODO: consider volume replica info when balancing racks
  461. rackId, err := pickRackToBalanceShardsInto(racks, rackToShardCount, rp, averageShardsPerEcRack)
  462. if err != nil {
  463. fmt.Printf("ec shard %d.%d at %s can not find a destination rack:\n%s\n", vid, shardId, ecNode.info.Id, err.Error())
  464. continue
  465. }
  466. var possibleDestinationEcNodes []*EcNode
  467. for _, n := range racks[rackId].ecNodes {
  468. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  469. }
  470. err = pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
  471. if err != nil {
  472. return err
  473. }
  474. rackToShardCount[string(rackId)] += 1
  475. rackToShardCount[string(ecNode.rack)] -= 1
  476. racks[rackId].freeEcSlot -= 1
  477. racks[ecNode.rack].freeEcSlot += 1
  478. }
  479. return nil
  480. }
  481. func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcRack int) (RackId, error) {
  482. targets := []RackId{}
  483. targetShards := -1
  484. for _, shards := range rackToShardCount {
  485. if shards > targetShards {
  486. targetShards = shards
  487. }
  488. }
  489. details := ""
  490. for rackId, rack := range rackToEcNodes {
  491. shards := rackToShardCount[string(rackId)]
  492. if rack.freeEcSlot <= 0 {
  493. details += fmt.Sprintf(" Skipped %s because it has no free slots\n", rackId)
  494. continue
  495. }
  496. if replicaPlacement != nil && shards >= replicaPlacement.DiffRackCount {
  497. details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for other racks (%d)\n", rackId, shards, replicaPlacement.DiffRackCount)
  498. continue
  499. }
  500. if shards >= averageShardsPerEcRack {
  501. details += fmt.Sprintf(" Skipped %s because shards %d >= averageShards (%d)\n", rackId, shards, averageShardsPerEcRack)
  502. continue
  503. }
  504. if shards < targetShards {
  505. // Favor racks with less shards, to ensure an uniform distribution.
  506. targets = nil
  507. targetShards = shards
  508. }
  509. if shards == targetShards {
  510. targets = append(targets, rackId)
  511. }
  512. }
  513. if len(targets) == 0 {
  514. return "", errors.New(details)
  515. }
  516. return targets[rand.IntN(len(targets))], nil
  517. }
  518. func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
  519. // collect vid => []ecNode, since previous steps can change the locations
  520. vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection)
  521. // spread the ec shards evenly
  522. for vid, locations := range vidLocations {
  523. // see the volume's shards are in how many racks, and how many in each rack
  524. rackToShardCount := countShardsByRack(vid, locations)
  525. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  526. return string(ecNode.rack)
  527. })
  528. for rackId, _ := range rackToShardCount {
  529. var possibleDestinationEcNodes []*EcNode
  530. for _, n := range racks[RackId(rackId)].ecNodes {
  531. if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
  532. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  533. }
  534. }
  535. sourceEcNodes := rackEcNodesWithVid[rackId]
  536. averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
  537. if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil {
  538. return err
  539. }
  540. }
  541. }
  542. return nil
  543. }
  544. func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
  545. for _, ecNode := range existingLocations {
  546. shardBits := findEcVolumeShards(ecNode, vid)
  547. overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
  548. for _, shardId := range shardBits.ShardIds() {
  549. if overLimitCount <= 0 {
  550. break
  551. }
  552. fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
  553. err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
  554. if err != nil {
  555. return err
  556. }
  557. overLimitCount--
  558. }
  559. }
  560. return nil
  561. }
  562. func balanceEcRacks(commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error {
  563. // balance one rack for all ec shards
  564. for _, ecRack := range racks {
  565. if err := doBalanceEcRack(commandEnv, ecRack, applyBalancing); err != nil {
  566. return err
  567. }
  568. }
  569. return nil
  570. }
  571. func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error {
  572. if len(ecRack.ecNodes) <= 1 {
  573. return nil
  574. }
  575. var rackEcNodes []*EcNode
  576. for _, node := range ecRack.ecNodes {
  577. rackEcNodes = append(rackEcNodes, node)
  578. }
  579. ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
  580. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  581. if !found {
  582. return
  583. }
  584. for _, ecShardInfo := range diskInfo.EcShardInfos {
  585. count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
  586. }
  587. return ecNode.info.Id, count
  588. })
  589. var totalShardCount int
  590. for _, count := range ecNodeIdToShardCount {
  591. totalShardCount += count
  592. }
  593. averageShardCount := ceilDivide(totalShardCount, len(rackEcNodes))
  594. hasMove := true
  595. for hasMove {
  596. hasMove = false
  597. slices.SortFunc(rackEcNodes, func(a, b *EcNode) int {
  598. return b.freeEcSlot - a.freeEcSlot
  599. })
  600. emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1]
  601. emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id]
  602. if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
  603. emptyNodeIds := make(map[uint32]bool)
  604. if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
  605. for _, shards := range emptyDiskInfo.EcShardInfos {
  606. emptyNodeIds[shards.Id] = true
  607. }
  608. }
  609. if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
  610. for _, shards := range fullDiskInfo.EcShardInfos {
  611. if _, found := emptyNodeIds[shards.Id]; !found {
  612. for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
  613. fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
  614. err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
  615. if err != nil {
  616. return err
  617. }
  618. ecNodeIdToShardCount[emptyNode.info.Id]++
  619. ecNodeIdToShardCount[fullNode.info.Id]--
  620. hasMove = true
  621. break
  622. }
  623. break
  624. }
  625. }
  626. }
  627. }
  628. }
  629. return nil
  630. }
  631. func pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcNode int) (*EcNode, error) {
  632. if existingLocation == nil {
  633. return nil, fmt.Errorf("INTERNAL: missing source nodes")
  634. }
  635. if len(possibleDestinations) == 0 {
  636. return nil, fmt.Errorf("INTERNAL: missing destination nodes")
  637. }
  638. nodeShards := map[*EcNode]int{}
  639. for _, node := range possibleDestinations {
  640. nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount()
  641. }
  642. targets := []*EcNode{}
  643. targetShards := -1
  644. for _, shards := range nodeShards {
  645. if shards > targetShards {
  646. targetShards = shards
  647. }
  648. }
  649. details := ""
  650. for _, node := range possibleDestinations {
  651. if node.info.Id == existingLocation.info.Id {
  652. continue
  653. }
  654. if node.freeEcSlot <= 0 {
  655. details += fmt.Sprintf(" Skipped %s because it has no free slots\n", node.info.Id)
  656. continue
  657. }
  658. shards := nodeShards[node]
  659. if replicaPlacement != nil && shards >= replicaPlacement.SameRackCount {
  660. details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for the rack (%d)\n", node.info.Id, shards, replicaPlacement.SameRackCount)
  661. continue
  662. }
  663. if shards >= averageShardsPerEcNode {
  664. details += fmt.Sprintf(" Skipped %s because shards %d >= averageShards (%d)\n",
  665. node.info.Id, shards, averageShardsPerEcNode)
  666. continue
  667. }
  668. if shards < targetShards {
  669. // Favor nodes with less shards, to ensure an uniform distribution.
  670. targets = nil
  671. targetShards = shards
  672. }
  673. if shards == targetShards {
  674. targets = append(targets, node)
  675. }
  676. }
  677. if len(targets) == 0 {
  678. return nil, errors.New(details)
  679. }
  680. return targets[rand.IntN(len(targets))], nil
  681. }
  682. // TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards.
  683. func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
  684. // TODO: consider volume replica info when balancing nodes
  685. destNode, err := pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes, nil, averageShardsPerEcNode)
  686. if err != nil {
  687. fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error())
  688. return nil
  689. }
  690. fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
  691. return moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destNode, applyBalancing)
  692. }
  693. func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
  694. picked := make(map[erasure_coding.ShardId]*EcNode)
  695. var candidateEcNodes []*CandidateEcNode
  696. for _, ecNode := range ecNodes {
  697. shardBits := findEcVolumeShards(ecNode, vid)
  698. if shardBits.ShardIdCount() > 0 {
  699. candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
  700. ecNode: ecNode,
  701. shardCount: shardBits.ShardIdCount(),
  702. })
  703. }
  704. }
  705. slices.SortFunc(candidateEcNodes, func(a, b *CandidateEcNode) int {
  706. return b.shardCount - a.shardCount
  707. })
  708. for i := 0; i < n; i++ {
  709. selectedEcNodeIndex := -1
  710. for i, candidateEcNode := range candidateEcNodes {
  711. shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
  712. if shardBits > 0 {
  713. selectedEcNodeIndex = i
  714. for _, shardId := range shardBits.ShardIds() {
  715. candidateEcNode.shardCount--
  716. picked[shardId] = candidateEcNode.ecNode
  717. candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
  718. break
  719. }
  720. break
  721. }
  722. }
  723. if selectedEcNodeIndex >= 0 {
  724. ensureSortedEcNodes(candidateEcNodes, selectedEcNodeIndex, func(i, j int) bool {
  725. return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
  726. })
  727. }
  728. }
  729. return picked
  730. }
  731. func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needle.VolumeId][]*EcNode {
  732. vidLocations := make(map[needle.VolumeId][]*EcNode)
  733. for _, ecNode := range allEcNodes {
  734. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  735. if !found {
  736. continue
  737. }
  738. for _, shardInfo := range diskInfo.EcShardInfos {
  739. // ignore if not in current collection
  740. if shardInfo.Collection == collection {
  741. vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
  742. }
  743. }
  744. }
  745. return vidLocations
  746. }
  747. // TODO: EC volumes have no topology replica placement info :( We need a better solution to resolve topology, and balancing, for those.
  748. func volumeIdToReplicaPlacement(commandEnv *CommandEnv, vid needle.VolumeId, nodes []*EcNode, ecReplicaPlacement *super_block.ReplicaPlacement) (*super_block.ReplicaPlacement, error) {
  749. for _, ecNode := range nodes {
  750. for _, diskInfo := range ecNode.info.DiskInfos {
  751. for _, volumeInfo := range diskInfo.VolumeInfos {
  752. if needle.VolumeId(volumeInfo.Id) == vid {
  753. return super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
  754. }
  755. }
  756. for _, ecShardInfo := range diskInfo.EcShardInfos {
  757. if needle.VolumeId(ecShardInfo.Id) == vid {
  758. return ecReplicaPlacement, nil
  759. }
  760. }
  761. }
  762. }
  763. return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid)
  764. }
  765. func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, applyBalancing bool) (err error) {
  766. if len(collections) == 0 {
  767. return fmt.Errorf("no collections to balance")
  768. }
  769. // collect all ec nodes
  770. allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, dc)
  771. if err != nil {
  772. return err
  773. }
  774. if totalFreeEcSlots < 1 {
  775. return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots)
  776. }
  777. racks := collectRacks(allEcNodes)
  778. for _, c := range collections {
  779. if err = balanceEcVolumes(commandEnv, c, allEcNodes, racks, ecReplicaPlacement, applyBalancing); err != nil {
  780. return err
  781. }
  782. }
  783. if err := balanceEcRacks(commandEnv, racks, applyBalancing); err != nil {
  784. return fmt.Errorf("balance ec racks: %v", err)
  785. }
  786. return nil
  787. }