You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

253 lines
7.1 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. package pub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/status"
  11. "log"
  12. "sort"
  13. "sync"
  14. "time"
  15. )
  16. type EachPartitionError struct {
  17. *mq_pb.BrokerPartitionAssignment
  18. Err error
  19. generation int
  20. }
  21. type EachPartitionPublishJob struct {
  22. *mq_pb.BrokerPartitionAssignment
  23. stopChan chan bool
  24. wg sync.WaitGroup
  25. generation int
  26. inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
  27. }
  28. func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error {
  29. if err := p.doEnsureConfigureTopic(bootstrapBrokers); err != nil {
  30. return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err)
  31. }
  32. log.Printf("start scheduler thread for topic %s/%s", p.namespace, p.topic)
  33. generation := 0
  34. var errChan chan EachPartitionError
  35. for {
  36. glog.V(0).Infof("lookup partitions gen %d topic %s/%s", generation, p.namespace, p.topic)
  37. if assignments, err := p.doLookupTopicPartitions(bootstrapBrokers); err == nil {
  38. generation++
  39. glog.V(0).Infof("start generation %d", generation)
  40. if errChan == nil {
  41. errChan = make(chan EachPartitionError, len(assignments))
  42. }
  43. p.onEachAssignments(generation, assignments, errChan)
  44. } else {
  45. glog.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err)
  46. time.Sleep(5 * time.Second)
  47. continue
  48. }
  49. // wait for any error to happen. If so, consume all remaining errors, and retry
  50. for {
  51. select {
  52. case eachErr := <-errChan:
  53. glog.Errorf("gen %d publish to topic %s/%s partition %v: %v", eachErr.generation, p.namespace, p.topic, eachErr.Partition, eachErr.Err)
  54. if eachErr.generation < generation {
  55. continue
  56. }
  57. break
  58. }
  59. }
  60. }
  61. }
  62. func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) {
  63. // TODO assuming this is not re-configured so the partitions are fixed.
  64. sort.Slice(assignments, func(i, j int) bool {
  65. return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart
  66. })
  67. var jobs []*EachPartitionPublishJob
  68. hasExistingJob := len(p.jobs) == len(assignments)
  69. for i, assignment := range assignments {
  70. if assignment.LeaderBroker == "" {
  71. continue
  72. }
  73. if hasExistingJob {
  74. var existingJob *EachPartitionPublishJob
  75. existingJob = p.jobs[i]
  76. if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker {
  77. existingJob.generation = generation
  78. jobs = append(jobs, existingJob)
  79. continue
  80. } else {
  81. if existingJob.LeaderBroker != "" {
  82. close(existingJob.stopChan)
  83. existingJob.LeaderBroker = ""
  84. existingJob.wg.Wait()
  85. }
  86. }
  87. }
  88. // start a go routine to publish to this partition
  89. job := &EachPartitionPublishJob{
  90. BrokerPartitionAssignment: assignment,
  91. stopChan: make(chan bool, 1),
  92. generation: generation,
  93. inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024, true),
  94. }
  95. job.wg.Add(1)
  96. go func(job *EachPartitionPublishJob) {
  97. defer job.wg.Done()
  98. if err := p.doPublishToPartition(job); err != nil {
  99. errChan <- EachPartitionError{assignment, err, generation}
  100. }
  101. }(job)
  102. jobs = append(jobs, job)
  103. // TODO assuming this is not re-configured so the partitions are fixed.
  104. // better just re-use the existing job
  105. p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue)
  106. }
  107. p.jobs = jobs
  108. }
  109. func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
  110. log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
  111. grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption)
  112. if err != nil {
  113. return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err)
  114. }
  115. brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
  116. stream, err := brokerClient.PublishMessage(context.Background())
  117. if err != nil {
  118. return fmt.Errorf("create publish client: %v", err)
  119. }
  120. publishClient := &PublishClient{
  121. SeaweedMessaging_PublishMessageClient: stream,
  122. Broker: job.LeaderBroker,
  123. }
  124. if err = publishClient.Send(&mq_pb.PublishMessageRequest{
  125. Message: &mq_pb.PublishMessageRequest_Init{
  126. Init: &mq_pb.PublishMessageRequest_InitMessage{
  127. Topic: &mq_pb.Topic{
  128. Namespace: p.namespace,
  129. Name: p.topic,
  130. },
  131. Partition: job.Partition,
  132. AckInterval: 128,
  133. },
  134. },
  135. }); err != nil {
  136. return fmt.Errorf("send init message: %v", err)
  137. }
  138. resp, err := stream.Recv()
  139. if err != nil {
  140. return fmt.Errorf("recv init response: %v", err)
  141. }
  142. if resp.Error != "" {
  143. return fmt.Errorf("init response error: %v", resp.Error)
  144. }
  145. go func() {
  146. for {
  147. _, err := publishClient.Recv()
  148. if err != nil {
  149. e, ok := status.FromError(err)
  150. if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
  151. return
  152. }
  153. publishClient.Err = err
  154. fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
  155. return
  156. }
  157. }
  158. }()
  159. for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
  160. if err := publishClient.Send(&mq_pb.PublishMessageRequest{
  161. Message: &mq_pb.PublishMessageRequest_Data{
  162. Data: data,
  163. },
  164. }); err != nil {
  165. return fmt.Errorf("send publish data: %v", err)
  166. }
  167. }
  168. return nil
  169. }
  170. func (p *TopicPublisher) doEnsureConfigureTopic(bootstrapBrokers []string) (err error) {
  171. if len(bootstrapBrokers) == 0 {
  172. return fmt.Errorf("no bootstrap brokers")
  173. }
  174. var lastErr error
  175. for _, brokerAddress := range bootstrapBrokers {
  176. err = pb.WithBrokerGrpcClient(false,
  177. brokerAddress,
  178. p.grpcDialOption,
  179. func(client mq_pb.SeaweedMessagingClient) error {
  180. _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
  181. Topic: &mq_pb.Topic{
  182. Namespace: p.namespace,
  183. Name: p.topic,
  184. },
  185. PartitionCount: p.config.CreateTopicPartitionCount,
  186. })
  187. return err
  188. })
  189. if err == nil {
  190. return nil
  191. } else {
  192. lastErr = err
  193. }
  194. }
  195. if lastErr != nil {
  196. return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err)
  197. }
  198. return nil
  199. }
  200. func (p *TopicPublisher) doLookupTopicPartitions(bootstrapBrokers []string) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
  201. if len(bootstrapBrokers) == 0 {
  202. return nil, fmt.Errorf("no bootstrap brokers")
  203. }
  204. var lastErr error
  205. for _, brokerAddress := range bootstrapBrokers {
  206. err := pb.WithBrokerGrpcClient(false,
  207. brokerAddress,
  208. p.grpcDialOption,
  209. func(client mq_pb.SeaweedMessagingClient) error {
  210. lookupResp, err := client.LookupTopicBrokers(context.Background(),
  211. &mq_pb.LookupTopicBrokersRequest{
  212. Topic: &mq_pb.Topic{
  213. Namespace: p.namespace,
  214. Name: p.topic,
  215. },
  216. })
  217. glog.V(0).Infof("lookup topic %s/%s: %v", p.namespace, p.topic, lookupResp)
  218. if err != nil {
  219. return err
  220. }
  221. assignments = lookupResp.BrokerPartitionAssignments
  222. return nil
  223. })
  224. if err == nil {
  225. return assignments, nil
  226. } else {
  227. lastErr = err
  228. }
  229. }
  230. return nil, fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, lastErr)
  231. }