You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

899 lines
29 KiB

4 months ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 months ago
  1. package shell
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math/rand/v2"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/operation"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  16. "golang.org/x/exp/slices"
  17. "google.golang.org/grpc"
  18. )
  19. type DataCenterId string
  20. type EcNodeId string
  21. type RackId string
  22. type EcNode struct {
  23. info *master_pb.DataNodeInfo
  24. dc DataCenterId
  25. rack RackId
  26. freeEcSlot int
  27. }
  28. type CandidateEcNode struct {
  29. ecNode *EcNode
  30. shardCount int
  31. }
  32. type EcRack struct {
  33. ecNodes map[EcNodeId]*EcNode
  34. freeEcSlot int
  35. }
  36. func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
  37. if !commandEnv.isLocked() {
  38. return fmt.Errorf("lock is lost")
  39. }
  40. copiedShardIds := []uint32{uint32(shardId)}
  41. if applyBalancing {
  42. existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
  43. // ask destination node to copy shard and the ecx file from source node, and mount it
  44. copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
  45. if err != nil {
  46. return err
  47. }
  48. // unmount the to be deleted shards
  49. err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds)
  50. if err != nil {
  51. return err
  52. }
  53. // ask source node to delete the shard, and maybe the ecx file
  54. err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds)
  55. if err != nil {
  56. return err
  57. }
  58. fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
  59. }
  60. destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
  61. existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
  62. return nil
  63. }
  64. func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
  65. targetServer *EcNode, shardIdsToCopy []uint32,
  66. volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
  67. fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  68. targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
  69. err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  70. if targetAddress != existingLocation {
  71. fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  72. _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
  73. VolumeId: uint32(volumeId),
  74. Collection: collection,
  75. ShardIds: shardIdsToCopy,
  76. CopyEcxFile: true,
  77. CopyEcjFile: true,
  78. CopyVifFile: true,
  79. SourceDataNode: string(existingLocation),
  80. })
  81. if copyErr != nil {
  82. return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
  83. }
  84. }
  85. fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
  86. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  87. VolumeId: uint32(volumeId),
  88. Collection: collection,
  89. ShardIds: shardIdsToCopy,
  90. })
  91. if mountErr != nil {
  92. return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
  93. }
  94. if targetAddress != existingLocation {
  95. copiedShardIds = shardIdsToCopy
  96. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
  97. }
  98. return nil
  99. })
  100. if err != nil {
  101. return
  102. }
  103. return
  104. }
  105. func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo)) {
  106. for _, dc := range topo.DataCenterInfos {
  107. for _, rack := range dc.RackInfos {
  108. for _, dn := range rack.DataNodeInfos {
  109. fn(DataCenterId(dc.Id), RackId(rack.Id), dn)
  110. }
  111. }
  112. }
  113. }
  114. func sortEcNodesByFreeslotsDescending(ecNodes []*EcNode) {
  115. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  116. return b.freeEcSlot - a.freeEcSlot
  117. })
  118. }
  119. func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
  120. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  121. return a.freeEcSlot - b.freeEcSlot
  122. })
  123. }
  124. // if the index node changed the freeEcSlot, need to keep every EcNode still sorted
  125. func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
  126. for i := index - 1; i >= 0; i-- {
  127. if lessThan(i+1, i) {
  128. swap(data, i, i+1)
  129. } else {
  130. break
  131. }
  132. }
  133. for i := index + 1; i < len(data); i++ {
  134. if lessThan(i, i-1) {
  135. swap(data, i, i-1)
  136. } else {
  137. break
  138. }
  139. }
  140. }
  141. func swap(data []*CandidateEcNode, i, j int) {
  142. t := data[i]
  143. data[i] = data[j]
  144. data[j] = t
  145. }
  146. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  147. for _, ecShardInfo := range ecShardInfos {
  148. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  149. count += shardBits.ShardIdCount()
  150. }
  151. return
  152. }
  153. func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
  154. if dn.DiskInfos == nil {
  155. return 0
  156. }
  157. diskInfo := dn.DiskInfos[string(diskType)]
  158. if diskInfo == nil {
  159. return 0
  160. }
  161. return int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
  162. }
  163. func (ecNode *EcNode) localShardIdCount(vid uint32) int {
  164. for _, diskInfo := range ecNode.info.DiskInfos {
  165. for _, ecShardInfo := range diskInfo.EcShardInfos {
  166. if vid == ecShardInfo.Id {
  167. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  168. return shardBits.ShardIdCount()
  169. }
  170. }
  171. }
  172. return 0
  173. }
  174. func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  175. // list all possible locations
  176. // collect topology information
  177. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  178. if err != nil {
  179. return
  180. }
  181. // find out all volume servers with one slot left.
  182. ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
  183. sortEcNodesByFreeslotsDescending(ecNodes)
  184. return
  185. }
  186. func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
  187. eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
  188. if selectedDataCenter != "" && selectedDataCenter != string(dc) {
  189. return
  190. }
  191. freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
  192. ecNodes = append(ecNodes, &EcNode{
  193. info: dn,
  194. dc: dc,
  195. rack: rack,
  196. freeEcSlot: int(freeEcSlots),
  197. })
  198. totalFreeEcSlots += freeEcSlots
  199. })
  200. return
  201. }
  202. func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error {
  203. fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
  204. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  205. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
  206. VolumeId: uint32(volumeId),
  207. Collection: collection,
  208. ShardIds: toBeDeletedShardIds,
  209. })
  210. return deleteErr
  211. })
  212. }
  213. func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error {
  214. fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
  215. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  216. _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
  217. VolumeId: uint32(volumeId),
  218. ShardIds: toBeUnmountedhardIds,
  219. })
  220. return deleteErr
  221. })
  222. }
  223. func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error {
  224. fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
  225. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  226. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  227. VolumeId: uint32(volumeId),
  228. Collection: collection,
  229. ShardIds: toBeMountedhardIds,
  230. })
  231. return mountErr
  232. })
  233. }
  234. func ceilDivide(a, b int) int {
  235. var r int
  236. if (a % b) != 0 {
  237. r = 1
  238. }
  239. return (a / b) + r
  240. }
  241. func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
  242. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  243. for _, shardInfo := range diskInfo.EcShardInfos {
  244. if needle.VolumeId(shardInfo.Id) == vid {
  245. return erasure_coding.ShardBits(shardInfo.EcIndexBits)
  246. }
  247. }
  248. }
  249. return 0
  250. }
  251. func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
  252. foundVolume := false
  253. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  254. if found {
  255. for _, shardInfo := range diskInfo.EcShardInfos {
  256. if needle.VolumeId(shardInfo.Id) == vid {
  257. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  258. newShardBits := oldShardBits
  259. for _, shardId := range shardIds {
  260. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  261. }
  262. shardInfo.EcIndexBits = uint32(newShardBits)
  263. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  264. foundVolume = true
  265. break
  266. }
  267. }
  268. } else {
  269. diskInfo = &master_pb.DiskInfo{
  270. Type: string(types.HardDriveType),
  271. }
  272. ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
  273. }
  274. if !foundVolume {
  275. var newShardBits erasure_coding.ShardBits
  276. for _, shardId := range shardIds {
  277. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  278. }
  279. diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
  280. Id: uint32(vid),
  281. Collection: collection,
  282. EcIndexBits: uint32(newShardBits),
  283. DiskType: string(types.HardDriveType),
  284. })
  285. ecNode.freeEcSlot -= len(shardIds)
  286. }
  287. return ecNode
  288. }
  289. func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
  290. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  291. for _, shardInfo := range diskInfo.EcShardInfos {
  292. if needle.VolumeId(shardInfo.Id) == vid {
  293. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  294. newShardBits := oldShardBits
  295. for _, shardId := range shardIds {
  296. newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
  297. }
  298. shardInfo.EcIndexBits = uint32(newShardBits)
  299. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  300. }
  301. }
  302. }
  303. return ecNode
  304. }
  305. func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
  306. countMap := make(map[string]int)
  307. for _, d := range data {
  308. id, count := identifierFn(d)
  309. countMap[id] += count
  310. }
  311. return countMap
  312. }
  313. func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
  314. groupMap := make(map[string][]*EcNode)
  315. for _, d := range data {
  316. id := identifierFn(d)
  317. groupMap[id] = append(groupMap[id], d)
  318. }
  319. return groupMap
  320. }
  321. func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack {
  322. // collect racks info
  323. racks := make(map[RackId]*EcRack)
  324. for _, ecNode := range allEcNodes {
  325. if racks[ecNode.rack] == nil {
  326. racks[ecNode.rack] = &EcRack{
  327. ecNodes: make(map[EcNodeId]*EcNode),
  328. }
  329. }
  330. racks[ecNode.rack].ecNodes[EcNodeId(ecNode.info.Id)] = ecNode
  331. racks[ecNode.rack].freeEcSlot += ecNode.freeEcSlot
  332. }
  333. return racks
  334. }
  335. func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
  336. fmt.Printf("balanceEcVolumes %s\n", collection)
  337. if err := deleteDuplicatedEcShards(commandEnv, allEcNodes, collection, applyBalancing); err != nil {
  338. return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
  339. }
  340. if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
  341. return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
  342. }
  343. if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
  344. return fmt.Errorf("balance within racks collection %s ec shards: %v", collection, err)
  345. }
  346. return nil
  347. }
  348. func deleteDuplicatedEcShards(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error {
  349. // vid => []ecNode
  350. vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection)
  351. // deduplicate ec shards
  352. for vid, locations := range vidLocations {
  353. if err := doDeduplicateEcShards(commandEnv, collection, vid, locations, applyBalancing); err != nil {
  354. return err
  355. }
  356. }
  357. return nil
  358. }
  359. func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
  360. // check whether this volume has ecNodes that are over average
  361. shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
  362. for _, ecNode := range locations {
  363. shardBits := findEcVolumeShards(ecNode, vid)
  364. for _, shardId := range shardBits.ShardIds() {
  365. shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
  366. }
  367. }
  368. for shardId, ecNodes := range shardToLocations {
  369. if len(ecNodes) <= 1 {
  370. continue
  371. }
  372. sortEcNodesByFreeslotsAscending(ecNodes)
  373. fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id)
  374. if !applyBalancing {
  375. continue
  376. }
  377. duplicatedShardIds := []uint32{uint32(shardId)}
  378. for _, ecNode := range ecNodes[1:] {
  379. if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  380. return err
  381. }
  382. if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  383. return err
  384. }
  385. ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
  386. }
  387. }
  388. return nil
  389. }
  390. func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
  391. // collect vid => []ecNode, since previous steps can change the locations
  392. vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection)
  393. // spread the ec shards evenly
  394. for vid, locations := range vidLocations {
  395. if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, applyBalancing); err != nil {
  396. return err
  397. }
  398. }
  399. return nil
  400. }
  401. func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
  402. return groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
  403. shardBits := findEcVolumeShards(ecNode, vid)
  404. return string(ecNode.rack), shardBits.ShardIdCount()
  405. })
  406. }
  407. // TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards.
  408. func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
  409. // calculate average number of shards an ec rack should have for one volume
  410. averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
  411. // see the volume's shards are in how many racks, and how many in each rack
  412. rackToShardCount := countShardsByRack(vid, locations)
  413. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  414. return string(ecNode.rack)
  415. })
  416. // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  417. ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
  418. for rackId, count := range rackToShardCount {
  419. if count <= averageShardsPerEcRack {
  420. continue
  421. }
  422. possibleEcNodes := rackEcNodesWithVid[rackId]
  423. for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
  424. ecShardsToMove[shardId] = ecNode
  425. }
  426. }
  427. for shardId, ecNode := range ecShardsToMove {
  428. // TODO: consider volume replica info when balancing racks
  429. rackId := pickRackToBalanceShardsInto(racks, rackToShardCount, nil, averageShardsPerEcRack)
  430. if rackId == "" {
  431. fmt.Printf("ec shard %d.%d at %s can not find a destination rack\n", vid, shardId, ecNode.info.Id)
  432. continue
  433. }
  434. var possibleDestinationEcNodes []*EcNode
  435. for _, n := range racks[rackId].ecNodes {
  436. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  437. }
  438. err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
  439. if err != nil {
  440. return err
  441. }
  442. rackToShardCount[string(rackId)] += 1
  443. rackToShardCount[string(ecNode.rack)] -= 1
  444. racks[rackId].freeEcSlot -= 1
  445. racks[ecNode.rack].freeEcSlot += 1
  446. }
  447. return nil
  448. }
  449. // TOOD: Return an error with details upon failure to resolve a destination rack.
  450. func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcRack int) RackId {
  451. targets := []RackId{}
  452. targetShards := -1
  453. for _, shards := range rackToShardCount {
  454. if shards > targetShards {
  455. targetShards = shards
  456. }
  457. }
  458. for rackId, rack := range rackToEcNodes {
  459. shards := rackToShardCount[string(rackId)]
  460. if rack.freeEcSlot <= 0 {
  461. // No EC shards slots left :(
  462. continue
  463. }
  464. if replicaPlacement != nil && shards >= replicaPlacement.DiffRackCount {
  465. // Don't select racks with more EC shards for the target volume than the replicaton limit.
  466. continue
  467. }
  468. if shards >= averageShardsPerEcRack {
  469. // Keep EC shards across racks as balanced as possible.
  470. continue
  471. }
  472. if shards < targetShards {
  473. // Favor racks with less shards, to ensure an uniform distribution.
  474. targets = nil
  475. targetShards = shards
  476. }
  477. if shards == targetShards {
  478. targets = append(targets, rackId)
  479. }
  480. }
  481. if len(targets) == 0 {
  482. return ""
  483. }
  484. return targets[rand.IntN(len(targets))]
  485. }
  486. func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
  487. // collect vid => []ecNode, since previous steps can change the locations
  488. vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection)
  489. // spread the ec shards evenly
  490. for vid, locations := range vidLocations {
  491. // see the volume's shards are in how many racks, and how many in each rack
  492. rackToShardCount := countShardsByRack(vid, locations)
  493. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  494. return string(ecNode.rack)
  495. })
  496. for rackId, _ := range rackToShardCount {
  497. var possibleDestinationEcNodes []*EcNode
  498. for _, n := range racks[RackId(rackId)].ecNodes {
  499. if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
  500. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  501. }
  502. }
  503. sourceEcNodes := rackEcNodesWithVid[rackId]
  504. averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
  505. if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil {
  506. return err
  507. }
  508. }
  509. }
  510. return nil
  511. }
  512. func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
  513. for _, ecNode := range existingLocations {
  514. shardBits := findEcVolumeShards(ecNode, vid)
  515. overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
  516. for _, shardId := range shardBits.ShardIds() {
  517. if overLimitCount <= 0 {
  518. break
  519. }
  520. fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
  521. err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
  522. if err != nil {
  523. return err
  524. }
  525. overLimitCount--
  526. }
  527. }
  528. return nil
  529. }
  530. func balanceEcRacks(commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error {
  531. // balance one rack for all ec shards
  532. for _, ecRack := range racks {
  533. if err := doBalanceEcRack(commandEnv, ecRack, applyBalancing); err != nil {
  534. return err
  535. }
  536. }
  537. return nil
  538. }
  539. func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error {
  540. if len(ecRack.ecNodes) <= 1 {
  541. return nil
  542. }
  543. var rackEcNodes []*EcNode
  544. for _, node := range ecRack.ecNodes {
  545. rackEcNodes = append(rackEcNodes, node)
  546. }
  547. ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
  548. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  549. if !found {
  550. return
  551. }
  552. for _, ecShardInfo := range diskInfo.EcShardInfos {
  553. count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
  554. }
  555. return ecNode.info.Id, count
  556. })
  557. var totalShardCount int
  558. for _, count := range ecNodeIdToShardCount {
  559. totalShardCount += count
  560. }
  561. averageShardCount := ceilDivide(totalShardCount, len(rackEcNodes))
  562. hasMove := true
  563. for hasMove {
  564. hasMove = false
  565. slices.SortFunc(rackEcNodes, func(a, b *EcNode) int {
  566. return b.freeEcSlot - a.freeEcSlot
  567. })
  568. emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1]
  569. emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id]
  570. if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
  571. emptyNodeIds := make(map[uint32]bool)
  572. if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
  573. for _, shards := range emptyDiskInfo.EcShardInfos {
  574. emptyNodeIds[shards.Id] = true
  575. }
  576. }
  577. if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
  578. for _, shards := range fullDiskInfo.EcShardInfos {
  579. if _, found := emptyNodeIds[shards.Id]; !found {
  580. for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
  581. fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
  582. err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
  583. if err != nil {
  584. return err
  585. }
  586. ecNodeIdToShardCount[emptyNode.info.Id]++
  587. ecNodeIdToShardCount[fullNode.info.Id]--
  588. hasMove = true
  589. break
  590. }
  591. break
  592. }
  593. }
  594. }
  595. }
  596. }
  597. return nil
  598. }
  599. func pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcNode int) (*EcNode, error) {
  600. if existingLocation == nil {
  601. return nil, fmt.Errorf("INTERNAL: missing source nodes")
  602. }
  603. if len(possibleDestinations) == 0 {
  604. return nil, fmt.Errorf("INTERNAL: missing destination nodes")
  605. }
  606. nodeShards := map[*EcNode]int{}
  607. for _, node := range possibleDestinations {
  608. nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount()
  609. }
  610. targets := []*EcNode{}
  611. targetShards := -1
  612. for _, shards := range nodeShards {
  613. if shards > targetShards {
  614. targetShards = shards
  615. }
  616. }
  617. details := ""
  618. for _, node := range possibleDestinations {
  619. if node.info.Id == existingLocation.info.Id {
  620. continue
  621. }
  622. if node.freeEcSlot <= 0 {
  623. details += fmt.Sprintf(" Skipped %s because it has no free slots\n", node.info.Id)
  624. continue
  625. }
  626. shards := nodeShards[node]
  627. if replicaPlacement != nil && shards >= replicaPlacement.SameRackCount {
  628. details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for the rack (%d)\n", node.info.Id, shards, replicaPlacement.SameRackCount)
  629. continue
  630. }
  631. if shards >= averageShardsPerEcNode {
  632. details += fmt.Sprintf(" Skipped %s because shards %d >= averageShards (%d)\n",
  633. node.info.Id, shards, averageShardsPerEcNode)
  634. continue
  635. }
  636. if shards < targetShards {
  637. // Favor nodes with less shards, to ensure an uniform distribution.
  638. targets = nil
  639. targetShards = shards
  640. }
  641. if shards == targetShards {
  642. targets = append(targets, node)
  643. }
  644. }
  645. if len(targets) == 0 {
  646. return nil, errors.New(details)
  647. }
  648. return targets[rand.IntN(len(targets))], nil
  649. }
  650. // TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards.
  651. func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
  652. // TODO: consider volume replica info when balancing nodes
  653. destNode, err := pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes, nil, averageShardsPerEcNode)
  654. if err != nil {
  655. fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error())
  656. return nil
  657. }
  658. fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
  659. return moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destNode, applyBalancing)
  660. }
  661. func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
  662. picked := make(map[erasure_coding.ShardId]*EcNode)
  663. var candidateEcNodes []*CandidateEcNode
  664. for _, ecNode := range ecNodes {
  665. shardBits := findEcVolumeShards(ecNode, vid)
  666. if shardBits.ShardIdCount() > 0 {
  667. candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
  668. ecNode: ecNode,
  669. shardCount: shardBits.ShardIdCount(),
  670. })
  671. }
  672. }
  673. slices.SortFunc(candidateEcNodes, func(a, b *CandidateEcNode) int {
  674. return b.shardCount - a.shardCount
  675. })
  676. for i := 0; i < n; i++ {
  677. selectedEcNodeIndex := -1
  678. for i, candidateEcNode := range candidateEcNodes {
  679. shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
  680. if shardBits > 0 {
  681. selectedEcNodeIndex = i
  682. for _, shardId := range shardBits.ShardIds() {
  683. candidateEcNode.shardCount--
  684. picked[shardId] = candidateEcNode.ecNode
  685. candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
  686. break
  687. }
  688. break
  689. }
  690. }
  691. if selectedEcNodeIndex >= 0 {
  692. ensureSortedEcNodes(candidateEcNodes, selectedEcNodeIndex, func(i, j int) bool {
  693. return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
  694. })
  695. }
  696. }
  697. return picked
  698. }
  699. func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needle.VolumeId][]*EcNode {
  700. vidLocations := make(map[needle.VolumeId][]*EcNode)
  701. for _, ecNode := range allEcNodes {
  702. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  703. if !found {
  704. continue
  705. }
  706. for _, shardInfo := range diskInfo.EcShardInfos {
  707. // ignore if not in current collection
  708. if shardInfo.Collection == collection {
  709. vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
  710. }
  711. }
  712. }
  713. return vidLocations
  714. }
  715. // TODO: EC volumes have no replica placement info :( Maybe rely on the master's default?
  716. func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_block.ReplicaPlacement, error) {
  717. for _, ecNode := range nodes {
  718. for _, diskInfo := range ecNode.info.DiskInfos {
  719. for _, volumeInfo := range diskInfo.VolumeInfos {
  720. if needle.VolumeId(volumeInfo.Id) != vid {
  721. continue
  722. }
  723. return super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
  724. }
  725. }
  726. }
  727. return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid)
  728. }
  729. func getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
  730. var resp *master_pb.GetMasterConfigurationResponse
  731. var err error
  732. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  733. resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  734. return err
  735. })
  736. if err != nil {
  737. return nil, err
  738. }
  739. return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
  740. }
  741. func EcBalance(commandEnv *CommandEnv, collections []string, dc string, applyBalancing bool) (err error) {
  742. if len(collections) == 0 {
  743. return fmt.Errorf("no collections to balance")
  744. }
  745. // collect all ec nodes
  746. allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, dc)
  747. if err != nil {
  748. return err
  749. }
  750. if totalFreeEcSlots < 1 {
  751. return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots)
  752. }
  753. racks := collectRacks(allEcNodes)
  754. for _, c := range collections {
  755. if err = balanceEcVolumes(commandEnv, c, allEcNodes, racks, applyBalancing); err != nil {
  756. return err
  757. }
  758. }
  759. if err := balanceEcRacks(commandEnv, racks, applyBalancing); err != nil {
  760. return fmt.Errorf("balance ec racks: %v", err)
  761. }
  762. return nil
  763. }