You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

178 lines
4.2 KiB

2 years ago
  1. package client
  2. import (
  3. "context"
  4. flatbuffers "github.com/google/flatbuffers/go"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/segment"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/credentials/insecure"
  11. "log"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. )
  16. const (
  17. batchCountLimit = 3
  18. )
  19. type PublishStreamProcessor struct {
  20. // attributes
  21. ProducerId int32
  22. ProducerEpoch int32
  23. grpcDialOption grpc.DialOption
  24. // input
  25. sync.Mutex
  26. timeout time.Duration
  27. // convert into bytes
  28. messagesChan chan *Message
  29. builders chan *flatbuffers.Builder
  30. batchMessageCountLimit int
  31. messagesSequence int64
  32. // done channel
  33. doneChan chan struct{}
  34. }
  35. type UploadProcess struct {
  36. bufferBuilder *flatbuffers.Builder
  37. batchBuilder *segment.MessageBatchBuilder
  38. }
  39. func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration) *PublishStreamProcessor {
  40. t := &PublishStreamProcessor{
  41. grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
  42. batchMessageCountLimit: batchMessageCountLimit,
  43. builders: make(chan *flatbuffers.Builder, batchCountLimit),
  44. messagesChan: make(chan *Message, 1024),
  45. doneChan: make(chan struct{}),
  46. timeout: timeout,
  47. }
  48. for i := 0; i < batchCountLimit; i++ {
  49. t.builders <- flatbuffers.NewBuilder(4 * 1024 * 1024)
  50. }
  51. go t.doLoopUpload()
  52. return t
  53. }
  54. func (p *PublishStreamProcessor) AddMessage(m *Message) error {
  55. p.messagesChan <- m
  56. return nil
  57. }
  58. func (p *PublishStreamProcessor) Shutdown() error {
  59. p.doneChan <- struct{}{}
  60. return nil
  61. }
  62. func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*Message) error {
  63. if len(messages) == 0 {
  64. return nil
  65. }
  66. builder := <-p.builders
  67. bb := segment.NewMessageBatchBuilder(builder, p.ProducerId, p.ProducerEpoch, 3, 4)
  68. for _, m := range messages {
  69. bb.AddMessage(p.messagesSequence, m.Ts.UnixNano(), m.Properties, m.Key, m.Content)
  70. p.messagesSequence++
  71. }
  72. bb.BuildMessageBatch()
  73. defer func() {
  74. p.builders <- builder
  75. }()
  76. return stream.Send(&mq_pb.PublishRequest{
  77. Data: &mq_pb.PublishRequest_DataMessage{
  78. Message: bb.GetBytes(),
  79. },
  80. })
  81. }
  82. func (p *PublishStreamProcessor) doLoopUpload() {
  83. brokerGrpcAddress := "localhost:17777"
  84. // TOOD parallelize the uploading with separate uploader
  85. messages := make([]*Message, 0, p.batchMessageCountLimit)
  86. util.RetryForever("publish message", func() error {
  87. return pb.WithBrokerGrpcClient(false, brokerGrpcAddress, p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  88. ctx, cancel := context.WithCancel(context.Background())
  89. defer cancel()
  90. stream, err := client.PublishMessage(ctx)
  91. if err != nil {
  92. log.Printf("grpc PublishMessage: %v", err)
  93. return err
  94. }
  95. var atomicStatus int64
  96. go func() {
  97. resp, err := stream.Recv()
  98. if err != nil {
  99. log.Printf("response error: %v", err)
  100. } else {
  101. log.Printf("response: %v", resp.AckSequence)
  102. }
  103. if atomic.LoadInt64(&atomicStatus) < 0 {
  104. return
  105. }
  106. }()
  107. var flushErr error
  108. // retry previously failed messages
  109. if len(messages) >= p.batchMessageCountLimit {
  110. flushErr = p.doFlush(stream, messages)
  111. if flushErr != nil {
  112. return flushErr
  113. }
  114. messages = messages[:0]
  115. }
  116. for {
  117. select {
  118. case m := <-p.messagesChan:
  119. messages = append(messages, m)
  120. if len(messages) >= p.batchMessageCountLimit {
  121. if flushErr = p.doFlush(stream, messages); flushErr != nil {
  122. return flushErr
  123. }
  124. messages = messages[:0]
  125. }
  126. case <-time.After(p.timeout):
  127. if flushErr = p.doFlush(stream, messages); flushErr != nil {
  128. return flushErr
  129. }
  130. messages = messages[:0]
  131. case <-p.doneChan:
  132. if flushErr = p.doFlush(stream, messages); flushErr != nil {
  133. return flushErr
  134. }
  135. messages = messages[:0]
  136. println("$ stopping ...")
  137. break
  138. }
  139. }
  140. // stop the response consuming goroutine
  141. atomic.StoreInt64(&atomicStatus, -1)
  142. return flushErr
  143. })
  144. }, func(err error) (shouldContinue bool) {
  145. log.Printf("failed with grpc %s: %v", brokerGrpcAddress, err)
  146. return true
  147. })
  148. }