207 lines
6.0 KiB

3 years ago
3 years ago
3 years ago
3 years ago
  1. package broker
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/cluster"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  10. "sort"
  11. "sync"
  12. )
  13. func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
  14. ret := &mq_pb.FindBrokerLeaderResponse{}
  15. err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
  16. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  17. ClientType: cluster.BrokerType,
  18. FilerGroup: request.FilerGroup,
  19. IsLeaderOnly: true,
  20. })
  21. if err != nil {
  22. return err
  23. }
  24. if len(resp.ClusterNodes) == 0 {
  25. return nil
  26. }
  27. ret.Broker = resp.ClusterNodes[0].Address
  28. return nil
  29. })
  30. return ret, err
  31. }
  32. func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
  33. ret := &mq_pb.CheckSegmentStatusResponse{}
  34. // TODO add in memory active segment
  35. return ret, nil
  36. }
  37. func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
  38. ret := &mq_pb.CheckBrokerLoadResponse{}
  39. // TODO read broker's load
  40. return ret, nil
  41. }
  42. func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) {
  43. ret := &mq_pb.AssignSegmentBrokersResponse{}
  44. segment := mq.FromPbSegment(request.Segment)
  45. // check existing segment locations on filer
  46. existingBrokers, err := broker.checkSegmentOnFiler(segment)
  47. if err != nil {
  48. return ret, err
  49. }
  50. if len(existingBrokers) > 0 {
  51. // good if the segment is still on the brokers
  52. isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers)
  53. if err != nil {
  54. return ret, err
  55. }
  56. if isActive {
  57. for _, broker := range existingBrokers {
  58. ret.Brokers = append(ret.Brokers, string(broker))
  59. }
  60. return ret, nil
  61. }
  62. }
  63. // randomly pick up to 10 brokers, and find the ones with the lightest load
  64. selectedBrokers, err := broker.selectBrokers()
  65. if err != nil {
  66. return ret, err
  67. }
  68. // save the allocated brokers info for this segment on the filer
  69. if err := broker.saveSegmentBrokersOnFiler(segment, selectedBrokers); err != nil {
  70. return ret, err
  71. }
  72. for _, broker := range selectedBrokers {
  73. ret.Brokers = append(ret.Brokers, string(broker))
  74. }
  75. return ret, nil
  76. }
  77. func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) {
  78. var wg sync.WaitGroup
  79. for _, candidate := range brokers {
  80. wg.Add(1)
  81. go func(candidate pb.ServerAddress) {
  82. defer wg.Done()
  83. broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
  84. resp, checkErr := client.CheckSegmentStatus(context.Background(), &mq_pb.CheckSegmentStatusRequest{
  85. Segment: &mq_pb.Segment{
  86. Namespace: string(segment.Topic.Namespace),
  87. Topic: segment.Topic.Name,
  88. Id: segment.Id,
  89. },
  90. })
  91. if checkErr != nil {
  92. err = checkErr
  93. glog.V(0).Infof("check segment status on %s: %v", candidate, checkErr)
  94. return nil
  95. }
  96. if resp.IsActive == false {
  97. active = false
  98. }
  99. return nil
  100. })
  101. }(candidate)
  102. }
  103. wg.Wait()
  104. return
  105. }
  106. func (broker *MessageQueueBroker) selectBrokers() (brokers []pb.ServerAddress, err error) {
  107. candidates, err := broker.selectCandidatesFromMaster(10)
  108. if err != nil {
  109. return
  110. }
  111. brokers, err = broker.pickLightestCandidates(candidates, 3)
  112. return
  113. }
  114. func (broker *MessageQueueBroker) selectCandidatesFromMaster(limit int32) (candidates []pb.ServerAddress, err error) {
  115. err = broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
  116. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  117. ClientType: cluster.BrokerType,
  118. FilerGroup: broker.option.FilerGroup,
  119. Limit: limit,
  120. })
  121. if err != nil {
  122. return err
  123. }
  124. if len(resp.ClusterNodes) == 0 {
  125. return nil
  126. }
  127. for _, node := range resp.ClusterNodes {
  128. candidates = append(candidates, pb.ServerAddress(node.Address))
  129. }
  130. return nil
  131. })
  132. return
  133. }
  134. type CandidateStatus struct {
  135. address pb.ServerAddress
  136. messageCount int64
  137. bytesCount int64
  138. load int64
  139. }
  140. func (broker *MessageQueueBroker) pickLightestCandidates(candidates []pb.ServerAddress, limit int) (selected []pb.ServerAddress, err error) {
  141. if len(candidates) <= limit {
  142. return candidates, nil
  143. }
  144. candidateStatuses, err := broker.checkBrokerStatus(candidates)
  145. if err != nil {
  146. return nil, err
  147. }
  148. sort.Slice(candidateStatuses, func(i, j int) bool {
  149. return candidateStatuses[i].load < candidateStatuses[j].load
  150. })
  151. for i, candidate := range candidateStatuses {
  152. if i >= limit {
  153. break
  154. }
  155. selected = append(selected, candidate.address)
  156. }
  157. return
  158. }
  159. func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddress) (candidateStatuses []*CandidateStatus, err error) {
  160. candidateStatuses = make([]*CandidateStatus, len(candidates))
  161. var wg sync.WaitGroup
  162. for i, candidate := range candidates {
  163. wg.Add(1)
  164. go func(i int, candidate pb.ServerAddress) {
  165. defer wg.Done()
  166. err = broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
  167. resp, checkErr := client.CheckBrokerLoad(context.Background(), &mq_pb.CheckBrokerLoadRequest{})
  168. if checkErr != nil {
  169. err = checkErr
  170. return err
  171. }
  172. candidateStatuses[i] = &CandidateStatus{
  173. address: candidate,
  174. messageCount: resp.MessageCount,
  175. bytesCount: resp.BytesCount,
  176. load: resp.MessageCount + resp.BytesCount/(64*1024),
  177. }
  178. return nil
  179. })
  180. }(i, candidate)
  181. }
  182. wg.Wait()
  183. return
  184. }