|
|
@ -12,6 +12,7 @@ import ( |
|
|
|
"log" |
|
|
|
"sort" |
|
|
|
"sync" |
|
|
|
"sync/atomic" |
|
|
|
"time" |
|
|
|
) |
|
|
|
|
|
|
@ -159,6 +160,8 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro |
|
|
|
return fmt.Errorf("init response error: %v", resp.Error) |
|
|
|
} |
|
|
|
|
|
|
|
var publishedTsNs int64 |
|
|
|
hasMoreData := int32(1) |
|
|
|
var wg sync.WaitGroup |
|
|
|
wg.Add(1) |
|
|
|
go func() { |
|
|
@ -183,6 +186,9 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro |
|
|
|
if ackResp.AckSequence > 0 { |
|
|
|
log.Printf("ack %d", ackResp.AckSequence) |
|
|
|
} |
|
|
|
if atomic.LoadInt64(&publishedTsNs) == ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
}() |
|
|
|
|
|
|
@ -196,14 +202,17 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro |
|
|
|
return fmt.Errorf("send publish data: %v", err) |
|
|
|
} |
|
|
|
publishCounter++ |
|
|
|
atomic.StoreInt64(&publishedTsNs, data.TsNs) |
|
|
|
} |
|
|
|
atomic.StoreInt32(&hasMoreData, 0) |
|
|
|
if publishCounter > 0 { |
|
|
|
wg.Wait() |
|
|
|
} else { |
|
|
|
// CloseSend would cancel the context on the server side
|
|
|
|
if err := publishClient.CloseSend(); err != nil { |
|
|
|
return fmt.Errorf("close send: %v", err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// CloseSend would cancel the context on the server side
|
|
|
|
//if err := publishClient.CloseSend(); err != nil {
|
|
|
|
// return fmt.Errorf("close send: %v", err)
|
|
|
|
//}
|
|
|
|
|
|
|
|
wg.Wait() |
|
|
|
|
|
|
|
log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) |
|
|
|
|
|
|
|