diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index 80a1ac9ef..e91127522 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -12,6 +12,7 @@ import ( "log" "sort" "sync" + "sync/atomic" "time" ) @@ -159,6 +160,8 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro return fmt.Errorf("init response error: %v", resp.Error) } + var publishedTsNs int64 + hasMoreData := int32(1) var wg sync.WaitGroup wg.Add(1) go func() { @@ -183,6 +186,9 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro if ackResp.AckSequence > 0 { log.Printf("ack %d", ackResp.AckSequence) } + if atomic.LoadInt64(&publishedTsNs) == ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { + return + } } }() @@ -196,14 +202,17 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro return fmt.Errorf("send publish data: %v", err) } publishCounter++ + atomic.StoreInt64(&publishedTsNs, data.TsNs) + } + atomic.StoreInt32(&hasMoreData, 0) + if publishCounter > 0 { + wg.Wait() + } else { + // CloseSend would cancel the context on the server side + if err := publishClient.CloseSend(); err != nil { + return fmt.Errorf("close send: %v", err) + } } - - // CloseSend would cancel the context on the server side - //if err := publishClient.CloseSend(); err != nil { - // return fmt.Errorf("close send: %v", err) - //} - - wg.Wait() log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)