You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

298 lines
8.8 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
10 months ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
11 months ago
1 year ago
10 months ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. package pub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/codes"
  11. "google.golang.org/grpc/credentials/insecure"
  12. "google.golang.org/grpc/status"
  13. "log"
  14. "sort"
  15. "sync"
  16. "sync/atomic"
  17. "time"
  18. )
  19. type EachPartitionError struct {
  20. *mq_pb.BrokerPartitionAssignment
  21. Err error
  22. generation int
  23. }
  24. type EachPartitionPublishJob struct {
  25. *mq_pb.BrokerPartitionAssignment
  26. stopChan chan bool
  27. wg sync.WaitGroup
  28. generation int
  29. inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
  30. }
  31. func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
  32. if err := p.doConfigureTopic(); err != nil {
  33. wg.Done()
  34. return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
  35. }
  36. log.Printf("start scheduler thread for topic %s", p.config.Topic)
  37. generation := 0
  38. var errChan chan EachPartitionError
  39. for {
  40. glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
  41. if assignments, err := p.doLookupTopicPartitions(); err == nil {
  42. generation++
  43. glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
  44. if errChan == nil {
  45. errChan = make(chan EachPartitionError, len(assignments))
  46. }
  47. p.onEachAssignments(generation, assignments, errChan)
  48. } else {
  49. glog.Errorf("lookup topic %s: %v", p.config.Topic, err)
  50. time.Sleep(5 * time.Second)
  51. continue
  52. }
  53. if generation == 1 {
  54. wg.Done()
  55. }
  56. // wait for any error to happen. If so, consume all remaining errors, and retry
  57. for {
  58. select {
  59. case eachErr := <-errChan:
  60. glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
  61. if eachErr.generation < generation {
  62. continue
  63. }
  64. break
  65. }
  66. }
  67. }
  68. }
  69. func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) {
  70. // TODO assuming this is not re-configured so the partitions are fixed.
  71. sort.Slice(assignments, func(i, j int) bool {
  72. return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart
  73. })
  74. var jobs []*EachPartitionPublishJob
  75. hasExistingJob := len(p.jobs) == len(assignments)
  76. for i, assignment := range assignments {
  77. if assignment.LeaderBroker == "" {
  78. continue
  79. }
  80. if hasExistingJob {
  81. var existingJob *EachPartitionPublishJob
  82. existingJob = p.jobs[i]
  83. if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker {
  84. existingJob.generation = generation
  85. jobs = append(jobs, existingJob)
  86. continue
  87. } else {
  88. if existingJob.LeaderBroker != "" {
  89. close(existingJob.stopChan)
  90. existingJob.LeaderBroker = ""
  91. existingJob.wg.Wait()
  92. }
  93. }
  94. }
  95. // start a go routine to publish to this partition
  96. job := &EachPartitionPublishJob{
  97. BrokerPartitionAssignment: assignment,
  98. stopChan: make(chan bool, 1),
  99. generation: generation,
  100. inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024),
  101. }
  102. job.wg.Add(1)
  103. go func(job *EachPartitionPublishJob) {
  104. defer job.wg.Done()
  105. if err := p.doPublishToPartition(job); err != nil {
  106. log.Printf("publish to %s partition %v: %v", p.config.Topic, job.Partition, err)
  107. errChan <- EachPartitionError{assignment, err, generation}
  108. }
  109. }(job)
  110. jobs = append(jobs, job)
  111. // TODO assuming this is not re-configured so the partitions are fixed.
  112. // better just re-use the existing job
  113. p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue)
  114. }
  115. p.jobs = jobs
  116. }
  117. func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
  118. log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
  119. grpcConnection, err := grpc.NewClient(job.LeaderBroker, grpc.WithTransportCredentials(insecure.NewCredentials()), p.grpcDialOption)
  120. if err != nil {
  121. return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err)
  122. }
  123. brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
  124. stream, err := brokerClient.PublishMessage(context.Background())
  125. if err != nil {
  126. return fmt.Errorf("create publish client: %v", err)
  127. }
  128. publishClient := &PublishClient{
  129. SeaweedMessaging_PublishMessageClient: stream,
  130. Broker: job.LeaderBroker,
  131. }
  132. if err = publishClient.Send(&mq_pb.PublishMessageRequest{
  133. Message: &mq_pb.PublishMessageRequest_Init{
  134. Init: &mq_pb.PublishMessageRequest_InitMessage{
  135. Topic: p.config.Topic.ToPbTopic(),
  136. Partition: job.Partition,
  137. AckInterval: 128,
  138. FollowerBroker: job.FollowerBroker,
  139. PublisherName: p.config.PublisherName,
  140. },
  141. },
  142. }); err != nil {
  143. return fmt.Errorf("send init message: %v", err)
  144. }
  145. // process the hello message
  146. resp, err := stream.Recv()
  147. if err != nil {
  148. return fmt.Errorf("recv init response: %v", err)
  149. }
  150. if resp.Error != "" {
  151. return fmt.Errorf("init response error: %v", resp.Error)
  152. }
  153. var publishedTsNs int64
  154. hasMoreData := int32(1)
  155. var wg sync.WaitGroup
  156. wg.Add(1)
  157. go func() {
  158. defer wg.Done()
  159. for {
  160. ackResp, err := publishClient.Recv()
  161. if err != nil {
  162. e, _ := status.FromError(err)
  163. if e.Code() == codes.Unknown && e.Message() == "EOF" {
  164. log.Printf("publish to %s EOF", publishClient.Broker)
  165. return
  166. }
  167. publishClient.Err = err
  168. log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
  169. return
  170. }
  171. if ackResp.Error != "" {
  172. publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
  173. log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
  174. return
  175. }
  176. if ackResp.AckSequence > 0 {
  177. log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
  178. }
  179. if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
  180. return
  181. }
  182. }
  183. }()
  184. publishCounter := 0
  185. for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
  186. if data.Ctrl != nil && data.Ctrl.IsClose {
  187. // need to set this before sending to brokers, to avoid timing issue
  188. atomic.StoreInt32(&hasMoreData, 0)
  189. }
  190. if err := publishClient.Send(&mq_pb.PublishMessageRequest{
  191. Message: &mq_pb.PublishMessageRequest_Data{
  192. Data: data,
  193. },
  194. }); err != nil {
  195. return fmt.Errorf("send publish data: %v", err)
  196. }
  197. publishCounter++
  198. atomic.StoreInt64(&publishedTsNs, data.TsNs)
  199. }
  200. if publishCounter > 0 {
  201. wg.Wait()
  202. } else {
  203. // CloseSend would cancel the context on the server side
  204. if err := publishClient.CloseSend(); err != nil {
  205. return fmt.Errorf("close send: %v", err)
  206. }
  207. }
  208. log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
  209. return nil
  210. }
  211. func (p *TopicPublisher) doConfigureTopic() (err error) {
  212. if len(p.config.Brokers) == 0 {
  213. return fmt.Errorf("topic configuring found no bootstrap brokers")
  214. }
  215. var lastErr error
  216. for _, brokerAddress := range p.config.Brokers {
  217. err = pb.WithBrokerGrpcClient(false,
  218. brokerAddress,
  219. p.grpcDialOption,
  220. func(client mq_pb.SeaweedMessagingClient) error {
  221. _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
  222. Topic: p.config.Topic.ToPbTopic(),
  223. PartitionCount: p.config.PartitionCount,
  224. RecordType: p.config.RecordType, // TODO schema upgrade
  225. })
  226. return err
  227. })
  228. if err == nil {
  229. lastErr = nil
  230. return nil
  231. } else {
  232. lastErr = fmt.Errorf("%s: %v", brokerAddress, err)
  233. }
  234. }
  235. if lastErr != nil {
  236. return fmt.Errorf("doConfigureTopic %s: %v", p.config.Topic, err)
  237. }
  238. return nil
  239. }
  240. func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
  241. if len(p.config.Brokers) == 0 {
  242. return nil, fmt.Errorf("lookup found no bootstrap brokers")
  243. }
  244. var lastErr error
  245. for _, brokerAddress := range p.config.Brokers {
  246. err := pb.WithBrokerGrpcClient(false,
  247. brokerAddress,
  248. p.grpcDialOption,
  249. func(client mq_pb.SeaweedMessagingClient) error {
  250. lookupResp, err := client.LookupTopicBrokers(context.Background(),
  251. &mq_pb.LookupTopicBrokersRequest{
  252. Topic: p.config.Topic.ToPbTopic(),
  253. })
  254. glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
  255. if err != nil {
  256. return err
  257. }
  258. if len(lookupResp.BrokerPartitionAssignments) == 0 {
  259. return fmt.Errorf("no broker partition assignments")
  260. }
  261. assignments = lookupResp.BrokerPartitionAssignments
  262. return nil
  263. })
  264. if err == nil {
  265. return assignments, nil
  266. } else {
  267. lastErr = err
  268. }
  269. }
  270. return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr)
  271. }