Browse Source

wait for publishing clients

pull/5637/head
chrislu 1 year ago
parent
commit
b51dfe2bff
  1. 7
      weed/mq/client/cmd/weed_pub/publisher.go
  2. 14
      weed/mq/client/pub_client/scheduler.go

7
weed/mq/client/cmd/weed_pub/publisher.go

@ -43,18 +43,21 @@ func main() {
CreateTopicPartitionCount: int32(*partitionCount),
}
publisher := pub_client.NewTopicPublisher(*namespace, *topic, config)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
brokers := strings.Split(*seedBrokers, ",")
if err := publisher.StartSchedulerThread(brokers); err != nil {
if err := publisher.StartSchedulerThread(brokers, &wg); err != nil {
fmt.Println(err)
return
}
}()
wg.Wait()
startTime := time.Now()
// Start multiple publishers
var wg sync.WaitGroup
for i := 0; i < *concurrency; i++ {
wg.Add(1)
go func(id int) {

14
weed/mq/client/pub_client/scheduler.go

@ -28,7 +28,7 @@ type EachPartitionPublishJob struct {
generation int
inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
}
func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error {
func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string, wg *sync.WaitGroup) error {
if err := p.doEnsureConfigureTopic(bootstrapBrokers); err != nil {
return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err)
@ -39,10 +39,10 @@ func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error {
generation := 0
var errChan chan EachPartitionError
for {
glog.V(0).Infof("lookup partitions gen %d topic %s/%s", generation, p.namespace, p.topic)
glog.V(0).Infof("lookup partitions gen %d topic %s/%s", generation+1, p.namespace, p.topic)
if assignments, err := p.doLookupTopicPartitions(bootstrapBrokers); err == nil {
generation++
glog.V(0).Infof("start generation %d", generation)
glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
if errChan == nil {
errChan = make(chan EachPartitionError, len(assignments))
}
@ -53,6 +53,10 @@ func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error {
continue
}
if generation == 1 {
wg.Done()
}
// wait for any error to happen. If so, consume all remaining errors, and retry
for {
select {
@ -237,6 +241,10 @@ func (p *TopicPublisher) doLookupTopicPartitions(bootstrapBrokers []string) (ass
return err
}
if len(lookupResp.BrokerPartitionAssignments) == 0 {
return fmt.Errorf("no broker partition assignments")
}
assignments = lookupResp.BrokerPartitionAssignments
return nil

Loading…
Cancel
Save