You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
4.8 KiB

1 year ago
  1. package pub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "sort"
  9. "sync"
  10. "time"
  11. )
  12. type EachPartitionError struct {
  13. *mq_pb.BrokerPartitionAssignment
  14. Err error
  15. generation int
  16. }
  17. type EachPartitionPublishJob struct {
  18. *mq_pb.BrokerPartitionAssignment
  19. stopChan chan bool
  20. wg sync.WaitGroup
  21. generation int
  22. }
  23. func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error {
  24. if err := p.doEnsureConfigureTopic(bootstrapBrokers); err != nil {
  25. return err
  26. }
  27. generation := 0
  28. var errChan chan EachPartitionError
  29. for {
  30. glog.V(0).Infof("lookup partitions gen %d topic %s/%s", generation, p.namespace, p.topic)
  31. if assignments, err := p.doLookupTopicPartitions(bootstrapBrokers); err == nil {
  32. generation++
  33. glog.V(0).Infof("start generation %d", generation)
  34. if errChan == nil {
  35. errChan = make(chan EachPartitionError, len(assignments))
  36. }
  37. p.onEachAssignments(generation, assignments, errChan)
  38. } else {
  39. glog.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err)
  40. time.Sleep(5 * time.Second)
  41. continue
  42. }
  43. // wait for any error to happen. If so, consume all remaining errors, and retry
  44. for {
  45. select {
  46. case eachErr := <-errChan:
  47. glog.Errorf("gen %d publish to topic %s/%s partition %v: %v", eachErr.generation, p.namespace, p.topic, eachErr.Partition, eachErr.Err)
  48. if eachErr.generation < generation {
  49. continue
  50. }
  51. break
  52. }
  53. }
  54. }
  55. }
  56. func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) {
  57. // TODO assuming this is not re-configured so the partitions are fixed.
  58. sort.Slice(assignments, func(i, j int) bool {
  59. return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart
  60. })
  61. var jobs []*EachPartitionPublishJob
  62. hasExistingJob := len(p.jobs) == len(assignments)
  63. for i, assignment := range assignments {
  64. if assignment.LeaderBroker == "" {
  65. continue
  66. }
  67. if hasExistingJob {
  68. var existingJob *EachPartitionPublishJob
  69. existingJob = p.jobs[i]
  70. if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker {
  71. existingJob.generation = generation
  72. jobs = append(jobs, existingJob)
  73. continue
  74. } else {
  75. if existingJob.LeaderBroker != "" {
  76. close(existingJob.stopChan)
  77. existingJob.LeaderBroker = ""
  78. existingJob.wg.Wait()
  79. }
  80. }
  81. }
  82. // start a go routine to publish to this partition
  83. job := &EachPartitionPublishJob{
  84. BrokerPartitionAssignment: assignment,
  85. stopChan: make(chan bool, 1),
  86. generation: generation,
  87. }
  88. job.wg.Add(1)
  89. go func(job *EachPartitionPublishJob) {
  90. defer job.wg.Done()
  91. if err := p.doPublishToPartition(job); err != nil {
  92. errChan <- EachPartitionError{assignment, err, generation}
  93. }
  94. }(job)
  95. jobs = append(jobs, job)
  96. }
  97. p.jobs = jobs
  98. }
  99. func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
  100. return nil
  101. }
  102. func (p *TopicPublisher) doEnsureConfigureTopic(bootstrapBrokers []string) (err error) {
  103. if len(bootstrapBrokers) == 0 {
  104. return fmt.Errorf("no bootstrap brokers")
  105. }
  106. var lastErr error
  107. for _, brokerAddress := range bootstrapBrokers {
  108. err = pb.WithBrokerGrpcClient(false,
  109. brokerAddress,
  110. p.grpcDialOption,
  111. func(client mq_pb.SeaweedMessagingClient) error {
  112. _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
  113. Topic: &mq_pb.Topic{
  114. Namespace: p.namespace,
  115. Name: p.topic,
  116. },
  117. PartitionCount: p.config.CreateTopicPartitionCount,
  118. })
  119. return err
  120. })
  121. if err == nil {
  122. return nil
  123. } else {
  124. lastErr = err
  125. }
  126. }
  127. if lastErr != nil {
  128. return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err)
  129. }
  130. return nil
  131. }
  132. func (p *TopicPublisher) doLookupTopicPartitions(bootstrapBrokers []string) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
  133. if len(bootstrapBrokers) == 0 {
  134. return nil, fmt.Errorf("no bootstrap brokers")
  135. }
  136. var lastErr error
  137. for _, brokerAddress := range bootstrapBrokers {
  138. err := pb.WithBrokerGrpcClient(false,
  139. brokerAddress,
  140. p.grpcDialOption,
  141. func(client mq_pb.SeaweedMessagingClient) error {
  142. lookupResp, err := client.LookupTopicBrokers(context.Background(),
  143. &mq_pb.LookupTopicBrokersRequest{
  144. Topic: &mq_pb.Topic{
  145. Namespace: p.namespace,
  146. Name: p.topic,
  147. },
  148. })
  149. glog.V(0).Infof("lookup topic %s/%s: %v", p.namespace, p.topic, lookupResp)
  150. if err != nil {
  151. return err
  152. }
  153. assignments = lookupResp.BrokerPartitionAssignments
  154. return nil
  155. })
  156. if err == nil {
  157. return assignments, nil
  158. } else {
  159. lastErr = err
  160. }
  161. }
  162. return nil, fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, lastErr)
  163. }