You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

271 lines
7.6 KiB

12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
  1. package pub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/status"
  11. "log"
  12. "sort"
  13. "sync"
  14. "time"
  15. )
  16. type EachPartitionError struct {
  17. *mq_pb.BrokerPartitionAssignment
  18. Err error
  19. generation int
  20. }
  21. type EachPartitionPublishJob struct {
  22. *mq_pb.BrokerPartitionAssignment
  23. stopChan chan bool
  24. wg sync.WaitGroup
  25. generation int
  26. inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
  27. }
  28. func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
  29. if err := p.doEnsureConfigureTopic(); err != nil {
  30. return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
  31. }
  32. log.Printf("start scheduler thread for topic %s", p.config.Topic)
  33. generation := 0
  34. var errChan chan EachPartitionError
  35. for {
  36. glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
  37. if assignments, err := p.doLookupTopicPartitions(); err == nil {
  38. generation++
  39. glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
  40. if errChan == nil {
  41. errChan = make(chan EachPartitionError, len(assignments))
  42. }
  43. p.onEachAssignments(generation, assignments, errChan)
  44. } else {
  45. glog.Errorf("lookup topic %s: %v", p.config.Topic, err)
  46. time.Sleep(5 * time.Second)
  47. continue
  48. }
  49. if generation == 1 {
  50. wg.Done()
  51. }
  52. // wait for any error to happen. If so, consume all remaining errors, and retry
  53. for {
  54. select {
  55. case eachErr := <-errChan:
  56. glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
  57. if eachErr.generation < generation {
  58. continue
  59. }
  60. break
  61. }
  62. }
  63. }
  64. }
  65. func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) {
  66. // TODO assuming this is not re-configured so the partitions are fixed.
  67. sort.Slice(assignments, func(i, j int) bool {
  68. return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart
  69. })
  70. var jobs []*EachPartitionPublishJob
  71. hasExistingJob := len(p.jobs) == len(assignments)
  72. for i, assignment := range assignments {
  73. if assignment.LeaderBroker == "" {
  74. continue
  75. }
  76. if hasExistingJob {
  77. var existingJob *EachPartitionPublishJob
  78. existingJob = p.jobs[i]
  79. if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker {
  80. existingJob.generation = generation
  81. jobs = append(jobs, existingJob)
  82. continue
  83. } else {
  84. if existingJob.LeaderBroker != "" {
  85. close(existingJob.stopChan)
  86. existingJob.LeaderBroker = ""
  87. existingJob.wg.Wait()
  88. }
  89. }
  90. }
  91. // start a go routine to publish to this partition
  92. job := &EachPartitionPublishJob{
  93. BrokerPartitionAssignment: assignment,
  94. stopChan: make(chan bool, 1),
  95. generation: generation,
  96. inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024),
  97. }
  98. job.wg.Add(1)
  99. go func(job *EachPartitionPublishJob) {
  100. defer job.wg.Done()
  101. if err := p.doPublishToPartition(job); err != nil {
  102. errChan <- EachPartitionError{assignment, err, generation}
  103. }
  104. }(job)
  105. jobs = append(jobs, job)
  106. // TODO assuming this is not re-configured so the partitions are fixed.
  107. // better just re-use the existing job
  108. p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue)
  109. }
  110. p.jobs = jobs
  111. }
  112. func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
  113. log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
  114. grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption)
  115. if err != nil {
  116. return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err)
  117. }
  118. brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
  119. stream, err := brokerClient.PublishMessage(context.Background())
  120. if err != nil {
  121. return fmt.Errorf("create publish client: %v", err)
  122. }
  123. publishClient := &PublishClient{
  124. SeaweedMessaging_PublishMessageClient: stream,
  125. Broker: job.LeaderBroker,
  126. }
  127. if err = publishClient.Send(&mq_pb.PublishMessageRequest{
  128. Message: &mq_pb.PublishMessageRequest_Init{
  129. Init: &mq_pb.PublishMessageRequest_InitMessage{
  130. Topic: p.config.Topic.ToPbTopic(),
  131. Partition: job.Partition,
  132. AckInterval: 128,
  133. },
  134. },
  135. }); err != nil {
  136. return fmt.Errorf("send init message: %v", err)
  137. }
  138. resp, err := stream.Recv()
  139. if err != nil {
  140. return fmt.Errorf("recv init response: %v", err)
  141. }
  142. if resp.Error != "" {
  143. return fmt.Errorf("init response error: %v", resp.Error)
  144. }
  145. go func() {
  146. for {
  147. ackResp, err := publishClient.Recv()
  148. if err != nil {
  149. e, ok := status.FromError(err)
  150. if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
  151. return
  152. }
  153. publishClient.Err = err
  154. fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
  155. return
  156. }
  157. if ackResp.Error != "" {
  158. publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
  159. fmt.Printf("publish to %s error: %v\n", publishClient.Broker, ackResp.Error)
  160. return
  161. }
  162. if ackResp.AckSequence > 0 {
  163. log.Printf("ack %d", ackResp.AckSequence)
  164. }
  165. }
  166. }()
  167. publishCounter := 0
  168. for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
  169. if err := publishClient.Send(&mq_pb.PublishMessageRequest{
  170. Message: &mq_pb.PublishMessageRequest_Data{
  171. Data: data,
  172. },
  173. }); err != nil {
  174. return fmt.Errorf("send publish data: %v", err)
  175. }
  176. publishCounter++
  177. }
  178. if err := publishClient.CloseSend(); err != nil {
  179. return fmt.Errorf("close send: %v", err)
  180. }
  181. time.Sleep(3 * time.Second)
  182. log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
  183. return nil
  184. }
  185. func (p *TopicPublisher) doEnsureConfigureTopic() (err error) {
  186. if len(p.config.Brokers) == 0 {
  187. return fmt.Errorf("no bootstrap brokers")
  188. }
  189. var lastErr error
  190. for _, brokerAddress := range p.config.Brokers {
  191. err = pb.WithBrokerGrpcClient(false,
  192. brokerAddress,
  193. p.grpcDialOption,
  194. func(client mq_pb.SeaweedMessagingClient) error {
  195. _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
  196. Topic: p.config.Topic.ToPbTopic(),
  197. PartitionCount: p.config.CreateTopicPartitionCount,
  198. })
  199. return err
  200. })
  201. if err == nil {
  202. return nil
  203. } else {
  204. lastErr = err
  205. }
  206. }
  207. if lastErr != nil {
  208. return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
  209. }
  210. return nil
  211. }
  212. func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
  213. if len(p.config.Brokers) == 0 {
  214. return nil, fmt.Errorf("no bootstrap brokers")
  215. }
  216. var lastErr error
  217. for _, brokerAddress := range p.config.Brokers {
  218. err := pb.WithBrokerGrpcClient(false,
  219. brokerAddress,
  220. p.grpcDialOption,
  221. func(client mq_pb.SeaweedMessagingClient) error {
  222. lookupResp, err := client.LookupTopicBrokers(context.Background(),
  223. &mq_pb.LookupTopicBrokersRequest{
  224. Topic: p.config.Topic.ToPbTopic(),
  225. })
  226. glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
  227. if err != nil {
  228. return err
  229. }
  230. if len(lookupResp.BrokerPartitionAssignments) == 0 {
  231. return fmt.Errorf("no broker partition assignments")
  232. }
  233. assignments = lookupResp.BrokerPartitionAssignments
  234. return nil
  235. })
  236. if err == nil {
  237. return assignments, nil
  238. } else {
  239. lastErr = err
  240. }
  241. }
  242. return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr)
  243. }