diff --git a/weed/mq/messages/message_pipeline.go b/weed/mq/messages/message_pipeline.go index d8e3a85b8..be967b32e 100644 --- a/weed/mq/messages/message_pipeline.go +++ b/weed/mq/messages/message_pipeline.go @@ -25,7 +25,7 @@ type MessagePipeline struct { movedBuffersChan chan MessageBufferReference onMessageFn OnMessageFunc mover MessageBufferMover - moverPool *util.LimitedConcurrentExecutor + moverPool *util.LimitedAsyncExecutor // control pipeline doneChan chan struct{} @@ -47,15 +47,15 @@ func NewMessagePipeline(producerId int32, workerCount int, batchSize int, timeou doneChan: make(chan struct{}), batchSize: batchSize, timeout: timeout, - moverPool: util.NewLimitedConcurrentExecutor(workerCount), + moverPool: util.NewLimitedAsyncExecutor(workerCount), mover: mover, } go t.doLoopUpload() return t } -func (mp *MessagePipeline) OutputChan() chan MessageBufferReference { - return mp.movedBuffersChan +func (mp *MessagePipeline) NextMessageBufferReference() MessageBufferReference { + return mp.moverPool.NextFuture().Await().(MessageBufferReference) } func (mp *MessagePipeline) AddMessage(message *Message) { @@ -105,18 +105,20 @@ func (mp *MessagePipeline) doLoopUpload() { select { case messageBuffer := <-mp.sealedBuffersChan: ticker.Reset(mp.timeout) - mp.moverPool.Execute(func() { + mp.moverPool.Execute(func() any { + var output MessageBufferReference util.RetryForever("message mover", func() error { if messageReference, flushErr := mp.mover.MoveBuffer(messageBuffer); flushErr != nil { return flushErr } else { - mp.movedBuffersChan <- messageReference + output = messageReference } return nil }, func(err error) (shouldContinue bool) { log.Printf("failed: %v", err) return true }) + return output }) case <-ticker.C: if atomic.LoadInt64(&mp.atomicPipelineStatus) == -2 { diff --git a/weed/mq/messages/message_pipeline_test.go b/weed/mq/messages/message_pipeline_test.go index aaa25e0fd..e7ecfa8f6 100644 --- a/weed/mq/messages/message_pipeline_test.go +++ b/weed/mq/messages/message_pipeline_test.go @@ -5,11 +5,11 @@ import ( "time" ) -func TestAddMessage(t *testing.T) { +func TestAddMessage1(t *testing.T) { mp := NewMessagePipeline(0, 3, 10, time.Second, &EmptyMover{}) go func() { - outChan := mp.OutputChan() - for mr := range outChan { + for i := 0; i < 100; i++ { + mr := mp.NextMessageBufferReference() println(mr.sequence, mr.fileId) } }() @@ -26,4 +26,28 @@ func TestAddMessage(t *testing.T) { mp.ShutdownStart() mp.ShutdownWait() + +} + +func TestAddMessage2(t *testing.T) { + mp := NewMessagePipeline(0, 3, 10, time.Second, &EmptyMover{}) + + for i := 0; i < 100; i++ { + message := &Message{ + Key: []byte("key"), + Content: []byte("data"), + Properties: nil, + Ts: time.Now(), + } + mp.AddMessage(message) + } + + mp.ShutdownStart() + mp.ShutdownWait() + + for i := 0; i < 100; i++ { + mr := mp.NextMessageBufferReference() + println(mr.sequence, mr.fileId) + } + }