You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

161 lines
4.2 KiB

  1. package messages
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/util"
  4. "google.golang.org/grpc"
  5. "log"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. type OnMessageFunc func(message *Message)
  11. type MessagePipeline struct {
  12. // atomic status
  13. atomicPipelineStatus int64 // -1: stop
  14. // attributes
  15. ProducerId int32
  16. ProducerEpoch int32
  17. grpcDialOption grpc.DialOption
  18. emptyBuffersChan chan *MessageBuffer
  19. sealedBuffersChan chan *MessageBuffer
  20. movedBuffersChan chan MessageBufferReference
  21. onMessageFn OnMessageFunc
  22. mover MessageBufferMover
  23. moverPool *util.LimitedConcurrentExecutor
  24. // control pipeline
  25. doneChan chan struct{}
  26. batchSize int
  27. timeout time.Duration
  28. incomingMessageLock sync.Mutex
  29. incomingMessageBuffer *MessageBuffer
  30. messageSequence int64
  31. }
  32. func NewMessagePipeline(producerId int32, workerCount int, batchSize int, timeout time.Duration, mover MessageBufferMover) *MessagePipeline {
  33. t := &MessagePipeline{
  34. ProducerId: producerId,
  35. emptyBuffersChan: make(chan *MessageBuffer, workerCount),
  36. sealedBuffersChan: make(chan *MessageBuffer, workerCount),
  37. movedBuffersChan: make(chan MessageBufferReference, workerCount),
  38. doneChan: make(chan struct{}),
  39. batchSize: batchSize,
  40. timeout: timeout,
  41. moverPool: util.NewLimitedConcurrentExecutor(workerCount),
  42. mover: mover,
  43. }
  44. go t.doLoopUpload()
  45. return t
  46. }
  47. func (mp *MessagePipeline) OutputChan() chan MessageBufferReference {
  48. return mp.movedBuffersChan
  49. }
  50. func (mp *MessagePipeline) AddMessage(message *Message) {
  51. mp.incomingMessageLock.Lock()
  52. defer mp.incomingMessageLock.Unlock()
  53. // get existing message buffer or create a new one
  54. if mp.incomingMessageBuffer == nil {
  55. select {
  56. case mp.incomingMessageBuffer = <-mp.emptyBuffersChan:
  57. default:
  58. mp.incomingMessageBuffer = NewMessageBuffer()
  59. }
  60. mp.incomingMessageBuffer.Reset(mp.messageSequence)
  61. }
  62. // add one message
  63. mp.incomingMessageBuffer.AddMessage(message)
  64. mp.messageSequence++
  65. // seal the message buffer if full
  66. if mp.incomingMessageBuffer.Len() >= mp.batchSize {
  67. mp.incomingMessageBuffer.Seal(mp.ProducerId, mp.ProducerEpoch, 0, 0)
  68. mp.sealedBuffersChan <- mp.incomingMessageBuffer
  69. mp.incomingMessageBuffer = nil
  70. }
  71. }
  72. func (mp *MessagePipeline) doLoopUpload() {
  73. mp.mover.Setup()
  74. defer mp.mover.TearDown()
  75. ticker := time.NewTicker(mp.timeout)
  76. for {
  77. status := atomic.LoadInt64(&mp.atomicPipelineStatus)
  78. if status == -100 {
  79. return
  80. } else if status == -1 {
  81. // entering shutting down mode
  82. atomic.StoreInt64(&mp.atomicPipelineStatus, -2)
  83. mp.incomingMessageLock.Lock()
  84. mp.doFlushIncomingMessages()
  85. mp.incomingMessageLock.Unlock()
  86. }
  87. select {
  88. case messageBuffer := <-mp.sealedBuffersChan:
  89. ticker.Reset(mp.timeout)
  90. mp.moverPool.Execute(func() {
  91. util.RetryForever("message mover", func() error {
  92. if messageReference, flushErr := mp.mover.MoveBuffer(messageBuffer); flushErr != nil {
  93. return flushErr
  94. } else {
  95. mp.movedBuffersChan <- messageReference
  96. }
  97. return nil
  98. }, func(err error) (shouldContinue bool) {
  99. log.Printf("failed: %v", err)
  100. return true
  101. })
  102. })
  103. case <-ticker.C:
  104. if atomic.LoadInt64(&mp.atomicPipelineStatus) == -2 {
  105. atomic.StoreInt64(&mp.atomicPipelineStatus, -100)
  106. return
  107. }
  108. mp.incomingMessageLock.Lock()
  109. mp.doFlushIncomingMessages()
  110. mp.incomingMessageLock.Unlock()
  111. }
  112. }
  113. atomic.StoreInt64(&mp.atomicPipelineStatus, -100)
  114. close(mp.movedBuffersChan)
  115. }
  116. func (mp *MessagePipeline) doFlushIncomingMessages() {
  117. if mp.incomingMessageBuffer == nil || mp.incomingMessageBuffer.Len() == 0 {
  118. return
  119. }
  120. mp.incomingMessageBuffer.Seal(mp.ProducerId, mp.ProducerEpoch, 0, 0)
  121. mp.sealedBuffersChan <- mp.incomingMessageBuffer
  122. mp.incomingMessageBuffer = nil
  123. }
  124. func (mp *MessagePipeline) ShutdownStart() {
  125. if atomic.LoadInt64(&mp.atomicPipelineStatus) == 0 {
  126. atomic.StoreInt64(&mp.atomicPipelineStatus, -1)
  127. }
  128. }
  129. func (mp *MessagePipeline) ShutdownWait() {
  130. for atomic.LoadInt64(&mp.atomicPipelineStatus) != -100 {
  131. time.Sleep(331 * time.Millisecond)
  132. }
  133. }
  134. func (mp *MessagePipeline) ShutdownImmediate() {
  135. if atomic.LoadInt64(&mp.atomicPipelineStatus) == 0 {
  136. atomic.StoreInt64(&mp.atomicPipelineStatus, -100)
  137. }
  138. }