You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

273 lines
7.7 KiB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
  1. package pub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/status"
  11. "log"
  12. "sort"
  13. "sync"
  14. "time"
  15. )
  16. type EachPartitionError struct {
  17. *mq_pb.BrokerPartitionAssignment
  18. Err error
  19. generation int
  20. }
  21. type EachPartitionPublishJob struct {
  22. *mq_pb.BrokerPartitionAssignment
  23. stopChan chan bool
  24. wg sync.WaitGroup
  25. generation int
  26. inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
  27. }
  28. func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
  29. if err := p.doConfigureTopic(); err != nil {
  30. return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
  31. }
  32. log.Printf("start scheduler thread for topic %s", p.config.Topic)
  33. generation := 0
  34. var errChan chan EachPartitionError
  35. for {
  36. glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
  37. if assignments, err := p.doLookupTopicPartitions(); err == nil {
  38. generation++
  39. glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
  40. if errChan == nil {
  41. errChan = make(chan EachPartitionError, len(assignments))
  42. }
  43. p.onEachAssignments(generation, assignments, errChan)
  44. } else {
  45. glog.Errorf("lookup topic %s: %v", p.config.Topic, err)
  46. time.Sleep(5 * time.Second)
  47. continue
  48. }
  49. if generation == 1 {
  50. wg.Done()
  51. }
  52. // wait for any error to happen. If so, consume all remaining errors, and retry
  53. for {
  54. select {
  55. case eachErr := <-errChan:
  56. glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
  57. if eachErr.generation < generation {
  58. continue
  59. }
  60. break
  61. }
  62. }
  63. }
  64. }
  65. func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) {
  66. // TODO assuming this is not re-configured so the partitions are fixed.
  67. sort.Slice(assignments, func(i, j int) bool {
  68. return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart
  69. })
  70. var jobs []*EachPartitionPublishJob
  71. hasExistingJob := len(p.jobs) == len(assignments)
  72. for i, assignment := range assignments {
  73. if assignment.LeaderBroker == "" {
  74. continue
  75. }
  76. if hasExistingJob {
  77. var existingJob *EachPartitionPublishJob
  78. existingJob = p.jobs[i]
  79. if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker {
  80. existingJob.generation = generation
  81. jobs = append(jobs, existingJob)
  82. continue
  83. } else {
  84. if existingJob.LeaderBroker != "" {
  85. close(existingJob.stopChan)
  86. existingJob.LeaderBroker = ""
  87. existingJob.wg.Wait()
  88. }
  89. }
  90. }
  91. // start a go routine to publish to this partition
  92. job := &EachPartitionPublishJob{
  93. BrokerPartitionAssignment: assignment,
  94. stopChan: make(chan bool, 1),
  95. generation: generation,
  96. inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024),
  97. }
  98. job.wg.Add(1)
  99. go func(job *EachPartitionPublishJob) {
  100. defer job.wg.Done()
  101. if err := p.doPublishToPartition(job); err != nil {
  102. errChan <- EachPartitionError{assignment, err, generation}
  103. }
  104. }(job)
  105. jobs = append(jobs, job)
  106. // TODO assuming this is not re-configured so the partitions are fixed.
  107. // better just re-use the existing job
  108. p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue)
  109. }
  110. p.jobs = jobs
  111. }
  112. func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
  113. log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
  114. grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption)
  115. if err != nil {
  116. return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err)
  117. }
  118. brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
  119. stream, err := brokerClient.PublishMessage(context.Background())
  120. if err != nil {
  121. return fmt.Errorf("create publish client: %v", err)
  122. }
  123. publishClient := &PublishClient{
  124. SeaweedMessaging_PublishMessageClient: stream,
  125. Broker: job.LeaderBroker,
  126. }
  127. if err = publishClient.Send(&mq_pb.PublishMessageRequest{
  128. Message: &mq_pb.PublishMessageRequest_Init{
  129. Init: &mq_pb.PublishMessageRequest_InitMessage{
  130. Topic: p.config.Topic.ToPbTopic(),
  131. Partition: job.Partition,
  132. AckInterval: 128,
  133. FollowerBrokers: job.FollowerBrokers,
  134. },
  135. },
  136. }); err != nil {
  137. return fmt.Errorf("send init message: %v", err)
  138. }
  139. resp, err := stream.Recv()
  140. if err != nil {
  141. return fmt.Errorf("recv init response: %v", err)
  142. }
  143. if resp.Error != "" {
  144. return fmt.Errorf("init response error: %v", resp.Error)
  145. }
  146. go func() {
  147. for {
  148. ackResp, err := publishClient.Recv()
  149. if err != nil {
  150. e, _ := status.FromError(err)
  151. if e.Code() == codes.Unknown && e.Message() == "EOF" {
  152. return
  153. }
  154. publishClient.Err = err
  155. fmt.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
  156. return
  157. }
  158. if ackResp.Error != "" {
  159. publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
  160. fmt.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
  161. return
  162. }
  163. if ackResp.AckSequence > 0 {
  164. log.Printf("ack %d", ackResp.AckSequence)
  165. }
  166. }
  167. }()
  168. publishCounter := 0
  169. for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
  170. if err := publishClient.Send(&mq_pb.PublishMessageRequest{
  171. Message: &mq_pb.PublishMessageRequest_Data{
  172. Data: data,
  173. },
  174. }); err != nil {
  175. return fmt.Errorf("send publish data: %v", err)
  176. }
  177. publishCounter++
  178. }
  179. if err := publishClient.CloseSend(); err != nil {
  180. return fmt.Errorf("close send: %v", err)
  181. }
  182. time.Sleep(3 * time.Second)
  183. log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
  184. return nil
  185. }
  186. func (p *TopicPublisher) doConfigureTopic() (err error) {
  187. if len(p.config.Brokers) == 0 {
  188. return fmt.Errorf("no bootstrap brokers")
  189. }
  190. var lastErr error
  191. for _, brokerAddress := range p.config.Brokers {
  192. err = pb.WithBrokerGrpcClient(false,
  193. brokerAddress,
  194. p.grpcDialOption,
  195. func(client mq_pb.SeaweedMessagingClient) error {
  196. _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
  197. Topic: p.config.Topic.ToPbTopic(),
  198. PartitionCount: p.config.CreateTopicPartitionCount,
  199. })
  200. return err
  201. })
  202. if err == nil {
  203. return nil
  204. } else {
  205. lastErr = fmt.Errorf("%s: %v", brokerAddress, err)
  206. }
  207. }
  208. if lastErr != nil {
  209. return fmt.Errorf("doConfigureTopic %s: %v", p.config.Topic, err)
  210. }
  211. return nil
  212. }
  213. func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
  214. if len(p.config.Brokers) == 0 {
  215. return nil, fmt.Errorf("no bootstrap brokers")
  216. }
  217. var lastErr error
  218. for _, brokerAddress := range p.config.Brokers {
  219. err := pb.WithBrokerGrpcClient(false,
  220. brokerAddress,
  221. p.grpcDialOption,
  222. func(client mq_pb.SeaweedMessagingClient) error {
  223. lookupResp, err := client.LookupTopicBrokers(context.Background(),
  224. &mq_pb.LookupTopicBrokersRequest{
  225. Topic: p.config.Topic.ToPbTopic(),
  226. })
  227. glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
  228. if err != nil {
  229. return err
  230. }
  231. if len(lookupResp.BrokerPartitionAssignments) == 0 {
  232. return fmt.Errorf("no broker partition assignments")
  233. }
  234. assignments = lookupResp.BrokerPartitionAssignments
  235. return nil
  236. })
  237. if err == nil {
  238. return assignments, nil
  239. } else {
  240. lastErr = err
  241. }
  242. }
  243. return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr)
  244. }