You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

283 lines
8.7 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package broker
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/cluster"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  10. "sort"
  11. "sync"
  12. )
  13. const (
  14. MaxPartitionCount = 1024
  15. )
  16. func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
  17. ret := &mq_pb.FindBrokerLeaderResponse{}
  18. err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
  19. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  20. ClientType: cluster.BrokerType,
  21. FilerGroup: request.FilerGroup,
  22. })
  23. if err != nil {
  24. return err
  25. }
  26. if len(resp.ClusterNodes) == 0 {
  27. return nil
  28. }
  29. ret.Broker = resp.ClusterNodes[0].Address
  30. return nil
  31. })
  32. return ret, err
  33. }
  34. func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) {
  35. ret := &mq_pb.AssignSegmentBrokersResponse{}
  36. segment := topic.FromPbSegment(request.Segment)
  37. // check existing segment locations on filer
  38. existingBrokers, err := broker.checkSegmentOnFiler(segment)
  39. if err != nil {
  40. return ret, err
  41. }
  42. if len(existingBrokers) > 0 {
  43. // good if the segment is still on the brokers
  44. isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers)
  45. if err != nil {
  46. return ret, err
  47. }
  48. if isActive {
  49. for _, broker := range existingBrokers {
  50. ret.Brokers = append(ret.Brokers, string(broker))
  51. }
  52. return ret, nil
  53. }
  54. }
  55. // randomly pick up to 10 brokers, and find the ones with the lightest load
  56. selectedBrokers, err := broker.selectBrokers()
  57. if err != nil {
  58. return ret, err
  59. }
  60. // save the allocated brokers info for this segment on the filer
  61. if err := broker.saveSegmentBrokersOnFiler(segment, selectedBrokers); err != nil {
  62. return ret, err
  63. }
  64. for _, broker := range selectedBrokers {
  65. ret.Brokers = append(ret.Brokers, string(broker))
  66. }
  67. return ret, nil
  68. }
  69. func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
  70. ret := &mq_pb.CheckSegmentStatusResponse{}
  71. // TODO add in memory active segment
  72. return ret, nil
  73. }
  74. func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
  75. ret := &mq_pb.CheckBrokerLoadResponse{}
  76. // TODO read broker's load
  77. return ret, nil
  78. }
  79. // FindTopicBrokers returns the brokers that are serving the topic
  80. //
  81. // 1. lock the topic
  82. //
  83. // 2. find the topic partitions on the filer
  84. // 2.1 if the topic is not found, return error
  85. // 2.2 if the request is_for_publish, create the topic
  86. // 2.2.1 if the request is_for_subscribe, return error not found
  87. // 2.2.2 if the request is_for_publish, create the topic
  88. // 2.2 if the topic is found, return the brokers
  89. //
  90. // 3. unlock the topic
  91. func (broker *MessageQueueBroker) FindTopicBrokers(c context.Context, request *mq_pb.FindTopicBrokersRequest) (*mq_pb.FindTopicBrokersResponse, error) {
  92. ret := &mq_pb.FindTopicBrokersResponse{}
  93. // lock the topic
  94. // find the topic partitions on the filer
  95. // if the topic is not found
  96. // if the request is_for_publish
  97. // create the topic
  98. // if the request is_for_subscribe
  99. // return error not found
  100. return ret, nil
  101. }
  102. // CheckTopicPartitionsStatus check the topic partitions on the broker
  103. func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) {
  104. ret := &mq_pb.CheckTopicPartitionsStatusResponse{}
  105. return ret, nil
  106. }
  107. // createOrUpdateTopicPartitions creates the topic partitions on the broker
  108. // 1. check
  109. func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment) (err error) {
  110. // create or update each partition
  111. if prevAssignment == nil {
  112. broker.createOrUpdateTopicPartition(topic, nil)
  113. } else {
  114. for _, partitionAssignment := range prevAssignment.BrokerPartitions {
  115. broker.createOrUpdateTopicPartition(topic, partitionAssignment)
  116. }
  117. }
  118. return nil
  119. }
  120. func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (newAssignment *mq_pb.BrokerPartitionsAssignment) {
  121. shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment)
  122. if !shouldCreate {
  123. }
  124. return
  125. }
  126. func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (shouldCreate bool) {
  127. if oldAssignment == nil {
  128. return true
  129. }
  130. for _, b := range oldAssignment.FollowerBrokers {
  131. pb.WithBrokerClient(false, pb.ServerAddress(b), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  132. _, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{
  133. Namespace: string(topic.Namespace),
  134. Topic: topic.Name,
  135. BrokerPartitionsAssignment: oldAssignment,
  136. ShouldCancelIfNotMatch: true,
  137. })
  138. if err != nil {
  139. shouldCreate = true
  140. }
  141. return nil
  142. })
  143. }
  144. return
  145. }
  146. func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *topic.Segment, brokers []pb.ServerAddress) (active bool, err error) {
  147. var wg sync.WaitGroup
  148. for _, candidate := range brokers {
  149. wg.Add(1)
  150. go func(candidate pb.ServerAddress) {
  151. defer wg.Done()
  152. broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
  153. resp, checkErr := client.CheckSegmentStatus(context.Background(), &mq_pb.CheckSegmentStatusRequest{
  154. Segment: &mq_pb.Segment{
  155. Namespace: string(segment.Topic.Namespace),
  156. Topic: segment.Topic.Name,
  157. Id: segment.Id,
  158. },
  159. })
  160. if checkErr != nil {
  161. err = checkErr
  162. glog.V(0).Infof("check segment status on %s: %v", candidate, checkErr)
  163. return nil
  164. }
  165. if resp.IsActive == false {
  166. active = false
  167. }
  168. return nil
  169. })
  170. }(candidate)
  171. }
  172. wg.Wait()
  173. return
  174. }
  175. func (broker *MessageQueueBroker) selectBrokers() (brokers []pb.ServerAddress, err error) {
  176. candidates, err := broker.selectCandidatesFromMaster(10)
  177. if err != nil {
  178. return
  179. }
  180. brokers, err = broker.pickLightestCandidates(candidates, 3)
  181. return
  182. }
  183. func (broker *MessageQueueBroker) selectCandidatesFromMaster(limit int32) (candidates []pb.ServerAddress, err error) {
  184. err = broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
  185. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  186. ClientType: cluster.BrokerType,
  187. FilerGroup: broker.option.FilerGroup,
  188. Limit: limit,
  189. })
  190. if err != nil {
  191. return err
  192. }
  193. if len(resp.ClusterNodes) == 0 {
  194. return nil
  195. }
  196. for _, node := range resp.ClusterNodes {
  197. candidates = append(candidates, pb.ServerAddress(node.Address))
  198. }
  199. return nil
  200. })
  201. return
  202. }
  203. type CandidateStatus struct {
  204. address pb.ServerAddress
  205. messageCount int64
  206. bytesCount int64
  207. load int64
  208. }
  209. func (broker *MessageQueueBroker) pickLightestCandidates(candidates []pb.ServerAddress, limit int) (selected []pb.ServerAddress, err error) {
  210. if len(candidates) <= limit {
  211. return candidates, nil
  212. }
  213. candidateStatuses, err := broker.checkBrokerStatus(candidates)
  214. if err != nil {
  215. return nil, err
  216. }
  217. sort.Slice(candidateStatuses, func(i, j int) bool {
  218. return candidateStatuses[i].load < candidateStatuses[j].load
  219. })
  220. for i, candidate := range candidateStatuses {
  221. if i >= limit {
  222. break
  223. }
  224. selected = append(selected, candidate.address)
  225. }
  226. return
  227. }
  228. func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddress) (candidateStatuses []*CandidateStatus, err error) {
  229. candidateStatuses = make([]*CandidateStatus, len(candidates))
  230. var wg sync.WaitGroup
  231. for i, candidate := range candidates {
  232. wg.Add(1)
  233. go func(i int, candidate pb.ServerAddress) {
  234. defer wg.Done()
  235. err = broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
  236. resp, checkErr := client.CheckBrokerLoad(context.Background(), &mq_pb.CheckBrokerLoadRequest{})
  237. if checkErr != nil {
  238. err = checkErr
  239. return err
  240. }
  241. candidateStatuses[i] = &CandidateStatus{
  242. address: candidate,
  243. messageCount: resp.MessageCount,
  244. bytesCount: resp.BytesCount,
  245. load: resp.MessageCount + resp.BytesCount/(64*1024),
  246. }
  247. return nil
  248. })
  249. }(i, candidate)
  250. }
  251. wg.Wait()
  252. return
  253. }