You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

288 lines
8.2 KiB

12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
11 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
  1. package pub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/status"
  11. "log"
  12. "sort"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. type EachPartitionError struct {
  18. *mq_pb.BrokerPartitionAssignment
  19. Err error
  20. generation int
  21. }
  22. type EachPartitionPublishJob struct {
  23. *mq_pb.BrokerPartitionAssignment
  24. stopChan chan bool
  25. wg sync.WaitGroup
  26. generation int
  27. inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
  28. }
  29. func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
  30. if err := p.doConfigureTopic(); err != nil {
  31. return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
  32. }
  33. log.Printf("start scheduler thread for topic %s", p.config.Topic)
  34. generation := 0
  35. var errChan chan EachPartitionError
  36. for {
  37. glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
  38. if assignments, err := p.doLookupTopicPartitions(); err == nil {
  39. generation++
  40. glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
  41. if errChan == nil {
  42. errChan = make(chan EachPartitionError, len(assignments))
  43. }
  44. p.onEachAssignments(generation, assignments, errChan)
  45. } else {
  46. glog.Errorf("lookup topic %s: %v", p.config.Topic, err)
  47. time.Sleep(5 * time.Second)
  48. continue
  49. }
  50. if generation == 1 {
  51. wg.Done()
  52. }
  53. // wait for any error to happen. If so, consume all remaining errors, and retry
  54. for {
  55. select {
  56. case eachErr := <-errChan:
  57. glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
  58. if eachErr.generation < generation {
  59. continue
  60. }
  61. break
  62. }
  63. }
  64. }
  65. }
  66. func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) {
  67. // TODO assuming this is not re-configured so the partitions are fixed.
  68. sort.Slice(assignments, func(i, j int) bool {
  69. return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart
  70. })
  71. var jobs []*EachPartitionPublishJob
  72. hasExistingJob := len(p.jobs) == len(assignments)
  73. for i, assignment := range assignments {
  74. if assignment.LeaderBroker == "" {
  75. continue
  76. }
  77. if hasExistingJob {
  78. var existingJob *EachPartitionPublishJob
  79. existingJob = p.jobs[i]
  80. if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker {
  81. existingJob.generation = generation
  82. jobs = append(jobs, existingJob)
  83. continue
  84. } else {
  85. if existingJob.LeaderBroker != "" {
  86. close(existingJob.stopChan)
  87. existingJob.LeaderBroker = ""
  88. existingJob.wg.Wait()
  89. }
  90. }
  91. }
  92. // start a go routine to publish to this partition
  93. job := &EachPartitionPublishJob{
  94. BrokerPartitionAssignment: assignment,
  95. stopChan: make(chan bool, 1),
  96. generation: generation,
  97. inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024),
  98. }
  99. job.wg.Add(1)
  100. go func(job *EachPartitionPublishJob) {
  101. defer job.wg.Done()
  102. if err := p.doPublishToPartition(job); err != nil {
  103. errChan <- EachPartitionError{assignment, err, generation}
  104. }
  105. }(job)
  106. jobs = append(jobs, job)
  107. // TODO assuming this is not re-configured so the partitions are fixed.
  108. // better just re-use the existing job
  109. p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue)
  110. }
  111. p.jobs = jobs
  112. }
  113. func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
  114. log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
  115. grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption)
  116. if err != nil {
  117. return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err)
  118. }
  119. brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
  120. stream, err := brokerClient.PublishMessage(context.Background())
  121. if err != nil {
  122. return fmt.Errorf("create publish client: %v", err)
  123. }
  124. publishClient := &PublishClient{
  125. SeaweedMessaging_PublishMessageClient: stream,
  126. Broker: job.LeaderBroker,
  127. }
  128. if err = publishClient.Send(&mq_pb.PublishMessageRequest{
  129. Message: &mq_pb.PublishMessageRequest_Init{
  130. Init: &mq_pb.PublishMessageRequest_InitMessage{
  131. Topic: p.config.Topic.ToPbTopic(),
  132. Partition: job.Partition,
  133. AckInterval: 128,
  134. FollowerBrokers: job.FollowerBrokers,
  135. },
  136. },
  137. }); err != nil {
  138. return fmt.Errorf("send init message: %v", err)
  139. }
  140. // process the hello message
  141. resp, err := stream.Recv()
  142. if err != nil {
  143. return fmt.Errorf("recv init response: %v", err)
  144. }
  145. if resp.Error != "" {
  146. return fmt.Errorf("init response error: %v", resp.Error)
  147. }
  148. var publishedTsNs int64
  149. hasMoreData := int32(1)
  150. var wg sync.WaitGroup
  151. wg.Add(1)
  152. go func() {
  153. defer wg.Done()
  154. for {
  155. ackResp, err := publishClient.Recv()
  156. if err != nil {
  157. e, _ := status.FromError(err)
  158. if e.Code() == codes.Unknown && e.Message() == "EOF" {
  159. log.Printf("publish to %s EOF", publishClient.Broker)
  160. return
  161. }
  162. publishClient.Err = err
  163. log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
  164. return
  165. }
  166. if ackResp.Error != "" {
  167. publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
  168. log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
  169. return
  170. }
  171. if ackResp.AckSequence > 0 {
  172. log.Printf("ack %d", ackResp.AckSequence)
  173. }
  174. if atomic.LoadInt64(&publishedTsNs) == ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
  175. return
  176. }
  177. }
  178. }()
  179. publishCounter := 0
  180. for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
  181. if err := publishClient.Send(&mq_pb.PublishMessageRequest{
  182. Message: &mq_pb.PublishMessageRequest_Data{
  183. Data: data,
  184. },
  185. }); err != nil {
  186. return fmt.Errorf("send publish data: %v", err)
  187. }
  188. publishCounter++
  189. atomic.StoreInt64(&publishedTsNs, data.TsNs)
  190. }
  191. atomic.StoreInt32(&hasMoreData, 0)
  192. if publishCounter > 0 {
  193. wg.Wait()
  194. } else {
  195. // CloseSend would cancel the context on the server side
  196. if err := publishClient.CloseSend(); err != nil {
  197. return fmt.Errorf("close send: %v", err)
  198. }
  199. }
  200. log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
  201. return nil
  202. }
  203. func (p *TopicPublisher) doConfigureTopic() (err error) {
  204. if len(p.config.Brokers) == 0 {
  205. return fmt.Errorf("no bootstrap brokers")
  206. }
  207. var lastErr error
  208. for _, brokerAddress := range p.config.Brokers {
  209. err = pb.WithBrokerGrpcClient(false,
  210. brokerAddress,
  211. p.grpcDialOption,
  212. func(client mq_pb.SeaweedMessagingClient) error {
  213. _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
  214. Topic: p.config.Topic.ToPbTopic(),
  215. PartitionCount: p.config.CreateTopicPartitionCount,
  216. })
  217. return err
  218. })
  219. if err == nil {
  220. return nil
  221. } else {
  222. lastErr = fmt.Errorf("%s: %v", brokerAddress, err)
  223. }
  224. }
  225. if lastErr != nil {
  226. return fmt.Errorf("doConfigureTopic %s: %v", p.config.Topic, err)
  227. }
  228. return nil
  229. }
  230. func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
  231. if len(p.config.Brokers) == 0 {
  232. return nil, fmt.Errorf("no bootstrap brokers")
  233. }
  234. var lastErr error
  235. for _, brokerAddress := range p.config.Brokers {
  236. err := pb.WithBrokerGrpcClient(false,
  237. brokerAddress,
  238. p.grpcDialOption,
  239. func(client mq_pb.SeaweedMessagingClient) error {
  240. lookupResp, err := client.LookupTopicBrokers(context.Background(),
  241. &mq_pb.LookupTopicBrokersRequest{
  242. Topic: p.config.Topic.ToPbTopic(),
  243. })
  244. glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
  245. if err != nil {
  246. return err
  247. }
  248. if len(lookupResp.BrokerPartitionAssignments) == 0 {
  249. return fmt.Errorf("no broker partition assignments")
  250. }
  251. assignments = lookupResp.BrokerPartitionAssignments
  252. return nil
  253. })
  254. if err == nil {
  255. return assignments, nil
  256. } else {
  257. lastErr = err
  258. }
  259. }
  260. return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr)
  261. }