You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

279 lines
7.9 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
11 months ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. package pub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/status"
  11. "log"
  12. "sort"
  13. "sync"
  14. "time"
  15. )
  16. type EachPartitionError struct {
  17. *mq_pb.BrokerPartitionAssignment
  18. Err error
  19. generation int
  20. }
  21. type EachPartitionPublishJob struct {
  22. *mq_pb.BrokerPartitionAssignment
  23. stopChan chan bool
  24. wg sync.WaitGroup
  25. generation int
  26. inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
  27. }
  28. func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
  29. if err := p.doConfigureTopic(); err != nil {
  30. return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
  31. }
  32. log.Printf("start scheduler thread for topic %s", p.config.Topic)
  33. generation := 0
  34. var errChan chan EachPartitionError
  35. for {
  36. glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
  37. if assignments, err := p.doLookupTopicPartitions(); err == nil {
  38. generation++
  39. glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
  40. if errChan == nil {
  41. errChan = make(chan EachPartitionError, len(assignments))
  42. }
  43. p.onEachAssignments(generation, assignments, errChan)
  44. } else {
  45. glog.Errorf("lookup topic %s: %v", p.config.Topic, err)
  46. time.Sleep(5 * time.Second)
  47. continue
  48. }
  49. if generation == 1 {
  50. wg.Done()
  51. }
  52. // wait for any error to happen. If so, consume all remaining errors, and retry
  53. for {
  54. select {
  55. case eachErr := <-errChan:
  56. glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
  57. if eachErr.generation < generation {
  58. continue
  59. }
  60. break
  61. }
  62. }
  63. }
  64. }
  65. func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) {
  66. // TODO assuming this is not re-configured so the partitions are fixed.
  67. sort.Slice(assignments, func(i, j int) bool {
  68. return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart
  69. })
  70. var jobs []*EachPartitionPublishJob
  71. hasExistingJob := len(p.jobs) == len(assignments)
  72. for i, assignment := range assignments {
  73. if assignment.LeaderBroker == "" {
  74. continue
  75. }
  76. if hasExistingJob {
  77. var existingJob *EachPartitionPublishJob
  78. existingJob = p.jobs[i]
  79. if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker {
  80. existingJob.generation = generation
  81. jobs = append(jobs, existingJob)
  82. continue
  83. } else {
  84. if existingJob.LeaderBroker != "" {
  85. close(existingJob.stopChan)
  86. existingJob.LeaderBroker = ""
  87. existingJob.wg.Wait()
  88. }
  89. }
  90. }
  91. // start a go routine to publish to this partition
  92. job := &EachPartitionPublishJob{
  93. BrokerPartitionAssignment: assignment,
  94. stopChan: make(chan bool, 1),
  95. generation: generation,
  96. inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024),
  97. }
  98. job.wg.Add(1)
  99. go func(job *EachPartitionPublishJob) {
  100. defer job.wg.Done()
  101. if err := p.doPublishToPartition(job); err != nil {
  102. errChan <- EachPartitionError{assignment, err, generation}
  103. }
  104. }(job)
  105. jobs = append(jobs, job)
  106. // TODO assuming this is not re-configured so the partitions are fixed.
  107. // better just re-use the existing job
  108. p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue)
  109. }
  110. p.jobs = jobs
  111. }
  112. func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
  113. log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
  114. grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption)
  115. if err != nil {
  116. return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err)
  117. }
  118. brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
  119. stream, err := brokerClient.PublishMessage(context.Background())
  120. if err != nil {
  121. return fmt.Errorf("create publish client: %v", err)
  122. }
  123. publishClient := &PublishClient{
  124. SeaweedMessaging_PublishMessageClient: stream,
  125. Broker: job.LeaderBroker,
  126. }
  127. if err = publishClient.Send(&mq_pb.PublishMessageRequest{
  128. Message: &mq_pb.PublishMessageRequest_Init{
  129. Init: &mq_pb.PublishMessageRequest_InitMessage{
  130. Topic: p.config.Topic.ToPbTopic(),
  131. Partition: job.Partition,
  132. AckInterval: 128,
  133. FollowerBrokers: job.FollowerBrokers,
  134. },
  135. },
  136. }); err != nil {
  137. return fmt.Errorf("send init message: %v", err)
  138. }
  139. // process the hello message
  140. resp, err := stream.Recv()
  141. if err != nil {
  142. return fmt.Errorf("recv init response: %v", err)
  143. }
  144. if resp.Error != "" {
  145. return fmt.Errorf("init response error: %v", resp.Error)
  146. }
  147. var wg sync.WaitGroup
  148. wg.Add(1)
  149. go func() {
  150. defer wg.Done()
  151. for {
  152. ackResp, err := publishClient.Recv()
  153. if err != nil {
  154. e, _ := status.FromError(err)
  155. if e.Code() == codes.Unknown && e.Message() == "EOF" {
  156. log.Printf("publish to %s EOF", publishClient.Broker)
  157. return
  158. }
  159. publishClient.Err = err
  160. log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
  161. return
  162. }
  163. if ackResp.Error != "" {
  164. publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
  165. log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
  166. return
  167. }
  168. if ackResp.AckSequence > 0 {
  169. log.Printf("ack %d", ackResp.AckSequence)
  170. }
  171. }
  172. }()
  173. publishCounter := 0
  174. for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
  175. if err := publishClient.Send(&mq_pb.PublishMessageRequest{
  176. Message: &mq_pb.PublishMessageRequest_Data{
  177. Data: data,
  178. },
  179. }); err != nil {
  180. return fmt.Errorf("send publish data: %v", err)
  181. }
  182. publishCounter++
  183. }
  184. // CloseSend would cancel the context on the server side
  185. //if err := publishClient.CloseSend(); err != nil {
  186. // return fmt.Errorf("close send: %v", err)
  187. //}
  188. wg.Wait()
  189. log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
  190. return nil
  191. }
  192. func (p *TopicPublisher) doConfigureTopic() (err error) {
  193. if len(p.config.Brokers) == 0 {
  194. return fmt.Errorf("no bootstrap brokers")
  195. }
  196. var lastErr error
  197. for _, brokerAddress := range p.config.Brokers {
  198. err = pb.WithBrokerGrpcClient(false,
  199. brokerAddress,
  200. p.grpcDialOption,
  201. func(client mq_pb.SeaweedMessagingClient) error {
  202. _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
  203. Topic: p.config.Topic.ToPbTopic(),
  204. PartitionCount: p.config.CreateTopicPartitionCount,
  205. })
  206. return err
  207. })
  208. if err == nil {
  209. return nil
  210. } else {
  211. lastErr = fmt.Errorf("%s: %v", brokerAddress, err)
  212. }
  213. }
  214. if lastErr != nil {
  215. return fmt.Errorf("doConfigureTopic %s: %v", p.config.Topic, err)
  216. }
  217. return nil
  218. }
  219. func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
  220. if len(p.config.Brokers) == 0 {
  221. return nil, fmt.Errorf("no bootstrap brokers")
  222. }
  223. var lastErr error
  224. for _, brokerAddress := range p.config.Brokers {
  225. err := pb.WithBrokerGrpcClient(false,
  226. brokerAddress,
  227. p.grpcDialOption,
  228. func(client mq_pb.SeaweedMessagingClient) error {
  229. lookupResp, err := client.LookupTopicBrokers(context.Background(),
  230. &mq_pb.LookupTopicBrokersRequest{
  231. Topic: p.config.Topic.ToPbTopic(),
  232. })
  233. glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
  234. if err != nil {
  235. return err
  236. }
  237. if len(lookupResp.BrokerPartitionAssignments) == 0 {
  238. return fmt.Errorf("no broker partition assignments")
  239. }
  240. assignments = lookupResp.BrokerPartitionAssignments
  241. return nil
  242. })
  243. if err == nil {
  244. return assignments, nil
  245. } else {
  246. lastErr = err
  247. }
  248. }
  249. return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr)
  250. }