You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

294 lines
8.5 KiB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
7 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
9 months ago
11 months ago
8 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
  1. package pub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/status"
  11. "log"
  12. "sort"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. type EachPartitionError struct {
  18. *mq_pb.BrokerPartitionAssignment
  19. Err error
  20. generation int
  21. }
  22. type EachPartitionPublishJob struct {
  23. *mq_pb.BrokerPartitionAssignment
  24. stopChan chan bool
  25. wg sync.WaitGroup
  26. generation int
  27. inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
  28. }
  29. func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
  30. if err := p.doConfigureTopic(); err != nil {
  31. return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
  32. }
  33. log.Printf("start scheduler thread for topic %s", p.config.Topic)
  34. generation := 0
  35. var errChan chan EachPartitionError
  36. for {
  37. glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
  38. if assignments, err := p.doLookupTopicPartitions(); err == nil {
  39. generation++
  40. glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
  41. if errChan == nil {
  42. errChan = make(chan EachPartitionError, len(assignments))
  43. }
  44. p.onEachAssignments(generation, assignments, errChan)
  45. } else {
  46. glog.Errorf("lookup topic %s: %v", p.config.Topic, err)
  47. time.Sleep(5 * time.Second)
  48. continue
  49. }
  50. if generation == 1 {
  51. wg.Done()
  52. }
  53. // wait for any error to happen. If so, consume all remaining errors, and retry
  54. for {
  55. select {
  56. case eachErr := <-errChan:
  57. glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
  58. if eachErr.generation < generation {
  59. continue
  60. }
  61. break
  62. }
  63. }
  64. }
  65. }
  66. func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) {
  67. // TODO assuming this is not re-configured so the partitions are fixed.
  68. sort.Slice(assignments, func(i, j int) bool {
  69. return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart
  70. })
  71. var jobs []*EachPartitionPublishJob
  72. hasExistingJob := len(p.jobs) == len(assignments)
  73. for i, assignment := range assignments {
  74. if assignment.LeaderBroker == "" {
  75. continue
  76. }
  77. if hasExistingJob {
  78. var existingJob *EachPartitionPublishJob
  79. existingJob = p.jobs[i]
  80. if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker {
  81. existingJob.generation = generation
  82. jobs = append(jobs, existingJob)
  83. continue
  84. } else {
  85. if existingJob.LeaderBroker != "" {
  86. close(existingJob.stopChan)
  87. existingJob.LeaderBroker = ""
  88. existingJob.wg.Wait()
  89. }
  90. }
  91. }
  92. // start a go routine to publish to this partition
  93. job := &EachPartitionPublishJob{
  94. BrokerPartitionAssignment: assignment,
  95. stopChan: make(chan bool, 1),
  96. generation: generation,
  97. inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024),
  98. }
  99. job.wg.Add(1)
  100. go func(job *EachPartitionPublishJob) {
  101. defer job.wg.Done()
  102. if err := p.doPublishToPartition(job); err != nil {
  103. errChan <- EachPartitionError{assignment, err, generation}
  104. }
  105. }(job)
  106. jobs = append(jobs, job)
  107. // TODO assuming this is not re-configured so the partitions are fixed.
  108. // better just re-use the existing job
  109. p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue)
  110. }
  111. p.jobs = jobs
  112. }
  113. func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
  114. log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
  115. grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption)
  116. if err != nil {
  117. return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err)
  118. }
  119. brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
  120. stream, err := brokerClient.PublishMessage(context.Background())
  121. if err != nil {
  122. return fmt.Errorf("create publish client: %v", err)
  123. }
  124. publishClient := &PublishClient{
  125. SeaweedMessaging_PublishMessageClient: stream,
  126. Broker: job.LeaderBroker,
  127. }
  128. if err = publishClient.Send(&mq_pb.PublishMessageRequest{
  129. Message: &mq_pb.PublishMessageRequest_Init{
  130. Init: &mq_pb.PublishMessageRequest_InitMessage{
  131. Topic: p.config.Topic.ToPbTopic(),
  132. Partition: job.Partition,
  133. AckInterval: 128,
  134. FollowerBroker: job.FollowerBroker,
  135. PublisherName: p.config.PublisherName,
  136. },
  137. },
  138. }); err != nil {
  139. return fmt.Errorf("send init message: %v", err)
  140. }
  141. // process the hello message
  142. resp, err := stream.Recv()
  143. if err != nil {
  144. return fmt.Errorf("recv init response: %v", err)
  145. }
  146. if resp.Error != "" {
  147. return fmt.Errorf("init response error: %v", resp.Error)
  148. }
  149. var publishedTsNs int64
  150. hasMoreData := int32(1)
  151. var wg sync.WaitGroup
  152. wg.Add(1)
  153. go func() {
  154. defer wg.Done()
  155. for {
  156. ackResp, err := publishClient.Recv()
  157. if err != nil {
  158. e, _ := status.FromError(err)
  159. if e.Code() == codes.Unknown && e.Message() == "EOF" {
  160. log.Printf("publish to %s EOF", publishClient.Broker)
  161. return
  162. }
  163. publishClient.Err = err
  164. log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
  165. return
  166. }
  167. if ackResp.Error != "" {
  168. publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
  169. log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
  170. return
  171. }
  172. if ackResp.AckSequence > 0 {
  173. log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
  174. }
  175. if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
  176. return
  177. }
  178. }
  179. }()
  180. publishCounter := 0
  181. for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
  182. if data.Ctrl != nil && data.Ctrl.IsClose {
  183. // need to set this before sending to brokers, to avoid timing issue
  184. atomic.StoreInt32(&hasMoreData, 0)
  185. }
  186. if err := publishClient.Send(&mq_pb.PublishMessageRequest{
  187. Message: &mq_pb.PublishMessageRequest_Data{
  188. Data: data,
  189. },
  190. }); err != nil {
  191. return fmt.Errorf("send publish data: %v", err)
  192. }
  193. publishCounter++
  194. atomic.StoreInt64(&publishedTsNs, data.TsNs)
  195. }
  196. if publishCounter > 0 {
  197. wg.Wait()
  198. } else {
  199. // CloseSend would cancel the context on the server side
  200. if err := publishClient.CloseSend(); err != nil {
  201. return fmt.Errorf("close send: %v", err)
  202. }
  203. }
  204. log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
  205. return nil
  206. }
  207. func (p *TopicPublisher) doConfigureTopic() (err error) {
  208. if len(p.config.Brokers) == 0 {
  209. return fmt.Errorf("no bootstrap brokers")
  210. }
  211. var lastErr error
  212. for _, brokerAddress := range p.config.Brokers {
  213. err = pb.WithBrokerGrpcClient(false,
  214. brokerAddress,
  215. p.grpcDialOption,
  216. func(client mq_pb.SeaweedMessagingClient) error {
  217. _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
  218. Topic: p.config.Topic.ToPbTopic(),
  219. PartitionCount: p.config.PartitionCount,
  220. RecordType: p.config.RecordType, // TODO schema upgrade
  221. })
  222. return err
  223. })
  224. if err == nil {
  225. lastErr = nil
  226. return nil
  227. } else {
  228. lastErr = fmt.Errorf("%s: %v", brokerAddress, err)
  229. }
  230. }
  231. if lastErr != nil {
  232. return fmt.Errorf("doConfigureTopic %s: %v", p.config.Topic, err)
  233. }
  234. return nil
  235. }
  236. func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
  237. if len(p.config.Brokers) == 0 {
  238. return nil, fmt.Errorf("no bootstrap brokers")
  239. }
  240. var lastErr error
  241. for _, brokerAddress := range p.config.Brokers {
  242. err := pb.WithBrokerGrpcClient(false,
  243. brokerAddress,
  244. p.grpcDialOption,
  245. func(client mq_pb.SeaweedMessagingClient) error {
  246. lookupResp, err := client.LookupTopicBrokers(context.Background(),
  247. &mq_pb.LookupTopicBrokersRequest{
  248. Topic: p.config.Topic.ToPbTopic(),
  249. })
  250. glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
  251. if err != nil {
  252. return err
  253. }
  254. if len(lookupResp.BrokerPartitionAssignments) == 0 {
  255. return fmt.Errorf("no broker partition assignments")
  256. }
  257. assignments = lookupResp.BrokerPartitionAssignments
  258. return nil
  259. })
  260. if err == nil {
  261. return assignments, nil
  262. } else {
  263. lastErr = err
  264. }
  265. }
  266. return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr)
  267. }