You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

212 lines
6.2 KiB

3 years ago
3 years ago
3 years ago
3 years ago
  1. package broker
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/cluster"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/mq"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
  10. "sort"
  11. "sync"
  12. )
  13. func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
  14. ret := &mq_pb.FindBrokerLeaderResponse{}
  15. err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
  16. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  17. ClientType: cluster.BrokerType,
  18. FilerGroup: request.FilerGroup,
  19. IsLeaderOnly: true,
  20. })
  21. if err != nil {
  22. return err
  23. }
  24. if len(resp.ClusterNodes) == 0 {
  25. return nil
  26. }
  27. ret.Broker = resp.ClusterNodes[0].Address
  28. return nil
  29. })
  30. return ret, err
  31. }
  32. func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
  33. ret := &mq_pb.CheckSegmentStatusResponse{}
  34. // TODO add in memory active segment
  35. return ret, nil
  36. }
  37. func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
  38. ret := &mq_pb.CheckBrokerLoadResponse{}
  39. // TODO read broker's load
  40. return ret, nil
  41. }
  42. func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) {
  43. ret := &mq_pb.AssignSegmentBrokersResponse{}
  44. segment := mq.FromPbSegment(request.Segment)
  45. // check existing segment locations on filer
  46. existingBrokers, err := broker.checkSegmentOnFiler(segment)
  47. if err != nil {
  48. return ret, err
  49. }
  50. // good if the segment is still on the brokers
  51. isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers)
  52. if err != nil {
  53. return ret, err
  54. }
  55. if isActive {
  56. for _, broker := range existingBrokers {
  57. ret.Brokers = append(ret.Brokers, string(broker))
  58. }
  59. return ret, nil
  60. }
  61. // randomly pick up to 10 brokers, and find the ones with the lightest load
  62. selectedBrokers, err := broker.selectBrokers()
  63. if err != nil {
  64. return ret, err
  65. }
  66. // save the allocated brokers info for this segment on the filer
  67. if err := broker.saveSegmentOnFiler(segment, selectedBrokers); err != nil {
  68. return ret, err
  69. }
  70. for _, broker := range selectedBrokers {
  71. ret.Brokers = append(ret.Brokers, string(broker))
  72. }
  73. return ret, nil
  74. }
  75. func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) {
  76. return
  77. }
  78. func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) {
  79. var wg sync.WaitGroup
  80. for _, candidate := range brokers {
  81. wg.Add(1)
  82. go func(candidate pb.ServerAddress) {
  83. defer wg.Done()
  84. broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
  85. resp, checkErr := client.CheckSegmentStatus(context.Background(), &mq_pb.CheckSegmentStatusRequest{
  86. Segment: &mq_pb.Segment{
  87. Namespace: string(segment.Topic.Namespace),
  88. Topic: segment.Topic.Name,
  89. Id: segment.Id,
  90. },
  91. })
  92. if checkErr != nil {
  93. err = checkErr
  94. glog.V(0).Infof("check segment status on %s: %v", candidate, checkErr)
  95. return nil
  96. }
  97. if resp.IsActive == false {
  98. active = false
  99. }
  100. return nil
  101. })
  102. }(candidate)
  103. }
  104. wg.Wait()
  105. return
  106. }
  107. func (broker *MessageQueueBroker) selectBrokers() (brokers []pb.ServerAddress, err error) {
  108. candidates, err := broker.selectCandidatesFromMaster(10)
  109. if err != nil {
  110. return
  111. }
  112. brokers, err = broker.pickLightestCandidates(candidates, 3)
  113. return
  114. }
  115. func (broker *MessageQueueBroker) selectCandidatesFromMaster(limit int32) (candidates []pb.ServerAddress, err error) {
  116. err = broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
  117. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  118. ClientType: cluster.BrokerType,
  119. FilerGroup: broker.option.FilerGroup,
  120. Limit: limit,
  121. })
  122. if err != nil {
  123. return err
  124. }
  125. if len(resp.ClusterNodes) == 0 {
  126. return nil
  127. }
  128. for _, node := range resp.ClusterNodes {
  129. candidates = append(candidates, pb.ServerAddress(node.Address))
  130. }
  131. return nil
  132. })
  133. return
  134. }
  135. type CandidateStatus struct {
  136. address pb.ServerAddress
  137. messageCount int64
  138. bytesCount int64
  139. load int64
  140. }
  141. func (broker *MessageQueueBroker) pickLightestCandidates(candidates []pb.ServerAddress, limit int) (selected []pb.ServerAddress, err error) {
  142. if len(candidates) <= limit {
  143. return candidates, nil
  144. }
  145. candidateStatuses, err := broker.checkBrokerStatus(candidates)
  146. if err != nil {
  147. return nil, err
  148. }
  149. sort.Slice(candidateStatuses, func(i, j int) bool {
  150. return candidateStatuses[i].load < candidateStatuses[j].load
  151. })
  152. for i, candidate := range candidateStatuses {
  153. if i >= limit {
  154. break
  155. }
  156. selected = append(selected, candidate.address)
  157. }
  158. return
  159. }
  160. func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddress) (candidateStatuses []*CandidateStatus, err error) {
  161. candidateStatuses = make([]*CandidateStatus, len(candidates))
  162. var wg sync.WaitGroup
  163. for i, candidate := range candidates {
  164. wg.Add(1)
  165. go func(i int, candidate pb.ServerAddress) {
  166. defer wg.Done()
  167. err = broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
  168. resp, checkErr := client.CheckBrokerLoad(context.Background(), &mq_pb.CheckBrokerLoadRequest{})
  169. if checkErr != nil {
  170. err = checkErr
  171. return err
  172. }
  173. candidateStatuses[i] = &CandidateStatus{
  174. address: candidate,
  175. messageCount: resp.MessageCount,
  176. bytesCount: resp.BytesCount,
  177. load: resp.MessageCount + resp.BytesCount/(64*1024),
  178. }
  179. return nil
  180. })
  181. }(i, candidate)
  182. }
  183. wg.Wait()
  184. return
  185. }
  186. func (broker *MessageQueueBroker) saveSegmentOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) {
  187. return
  188. }