You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

206 lines
5.9 KiB

3 years ago
3 years ago
3 years ago
3 years ago
  1. package broker
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/cluster"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  10. "sort"
  11. "sync"
  12. )
  13. func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
  14. ret := &mq_pb.FindBrokerLeaderResponse{}
  15. err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
  16. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  17. ClientType: cluster.BrokerType,
  18. FilerGroup: request.FilerGroup,
  19. })
  20. if err != nil {
  21. return err
  22. }
  23. if len(resp.ClusterNodes) == 0 {
  24. return nil
  25. }
  26. ret.Broker = resp.ClusterNodes[0].Address
  27. return nil
  28. })
  29. return ret, err
  30. }
  31. func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
  32. ret := &mq_pb.CheckSegmentStatusResponse{}
  33. // TODO add in memory active segment
  34. return ret, nil
  35. }
  36. func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
  37. ret := &mq_pb.CheckBrokerLoadResponse{}
  38. // TODO read broker's load
  39. return ret, nil
  40. }
  41. func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) {
  42. ret := &mq_pb.AssignSegmentBrokersResponse{}
  43. segment := mq.FromPbSegment(request.Segment)
  44. // check existing segment locations on filer
  45. existingBrokers, err := broker.checkSegmentOnFiler(segment)
  46. if err != nil {
  47. return ret, err
  48. }
  49. if len(existingBrokers) > 0 {
  50. // good if the segment is still on the brokers
  51. isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers)
  52. if err != nil {
  53. return ret, err
  54. }
  55. if isActive {
  56. for _, broker := range existingBrokers {
  57. ret.Brokers = append(ret.Brokers, string(broker))
  58. }
  59. return ret, nil
  60. }
  61. }
  62. // randomly pick up to 10 brokers, and find the ones with the lightest load
  63. selectedBrokers, err := broker.selectBrokers()
  64. if err != nil {
  65. return ret, err
  66. }
  67. // save the allocated brokers info for this segment on the filer
  68. if err := broker.saveSegmentBrokersOnFiler(segment, selectedBrokers); err != nil {
  69. return ret, err
  70. }
  71. for _, broker := range selectedBrokers {
  72. ret.Brokers = append(ret.Brokers, string(broker))
  73. }
  74. return ret, nil
  75. }
  76. func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) {
  77. var wg sync.WaitGroup
  78. for _, candidate := range brokers {
  79. wg.Add(1)
  80. go func(candidate pb.ServerAddress) {
  81. defer wg.Done()
  82. broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
  83. resp, checkErr := client.CheckSegmentStatus(context.Background(), &mq_pb.CheckSegmentStatusRequest{
  84. Segment: &mq_pb.Segment{
  85. Namespace: string(segment.Topic.Namespace),
  86. Topic: segment.Topic.Name,
  87. Id: segment.Id,
  88. },
  89. })
  90. if checkErr != nil {
  91. err = checkErr
  92. glog.V(0).Infof("check segment status on %s: %v", candidate, checkErr)
  93. return nil
  94. }
  95. if resp.IsActive == false {
  96. active = false
  97. }
  98. return nil
  99. })
  100. }(candidate)
  101. }
  102. wg.Wait()
  103. return
  104. }
  105. func (broker *MessageQueueBroker) selectBrokers() (brokers []pb.ServerAddress, err error) {
  106. candidates, err := broker.selectCandidatesFromMaster(10)
  107. if err != nil {
  108. return
  109. }
  110. brokers, err = broker.pickLightestCandidates(candidates, 3)
  111. return
  112. }
  113. func (broker *MessageQueueBroker) selectCandidatesFromMaster(limit int32) (candidates []pb.ServerAddress, err error) {
  114. err = broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
  115. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  116. ClientType: cluster.BrokerType,
  117. FilerGroup: broker.option.FilerGroup,
  118. Limit: limit,
  119. })
  120. if err != nil {
  121. return err
  122. }
  123. if len(resp.ClusterNodes) == 0 {
  124. return nil
  125. }
  126. for _, node := range resp.ClusterNodes {
  127. candidates = append(candidates, pb.ServerAddress(node.Address))
  128. }
  129. return nil
  130. })
  131. return
  132. }
  133. type CandidateStatus struct {
  134. address pb.ServerAddress
  135. messageCount int64
  136. bytesCount int64
  137. load int64
  138. }
  139. func (broker *MessageQueueBroker) pickLightestCandidates(candidates []pb.ServerAddress, limit int) (selected []pb.ServerAddress, err error) {
  140. if len(candidates) <= limit {
  141. return candidates, nil
  142. }
  143. candidateStatuses, err := broker.checkBrokerStatus(candidates)
  144. if err != nil {
  145. return nil, err
  146. }
  147. sort.Slice(candidateStatuses, func(i, j int) bool {
  148. return candidateStatuses[i].load < candidateStatuses[j].load
  149. })
  150. for i, candidate := range candidateStatuses {
  151. if i >= limit {
  152. break
  153. }
  154. selected = append(selected, candidate.address)
  155. }
  156. return
  157. }
  158. func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddress) (candidateStatuses []*CandidateStatus, err error) {
  159. candidateStatuses = make([]*CandidateStatus, len(candidates))
  160. var wg sync.WaitGroup
  161. for i, candidate := range candidates {
  162. wg.Add(1)
  163. go func(i int, candidate pb.ServerAddress) {
  164. defer wg.Done()
  165. err = broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
  166. resp, checkErr := client.CheckBrokerLoad(context.Background(), &mq_pb.CheckBrokerLoadRequest{})
  167. if checkErr != nil {
  168. err = checkErr
  169. return err
  170. }
  171. candidateStatuses[i] = &CandidateStatus{
  172. address: candidate,
  173. messageCount: resp.MessageCount,
  174. bytesCount: resp.BytesCount,
  175. load: resp.MessageCount + resp.BytesCount/(64*1024),
  176. }
  177. return nil
  178. })
  179. }(i, candidate)
  180. }
  181. wg.Wait()
  182. return
  183. }