diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index 3ac037973..419f68f42 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -43,18 +43,21 @@ func main() { CreateTopicPartitionCount: int32(*partitionCount), } publisher := pub_client.NewTopicPublisher(*namespace, *topic, config) + wg := sync.WaitGroup{} + wg.Add(1) go func() { brokers := strings.Split(*seedBrokers, ",") - if err := publisher.StartSchedulerThread(brokers); err != nil { + if err := publisher.StartSchedulerThread(brokers, &wg); err != nil { fmt.Println(err) return } }() + wg.Wait() + startTime := time.Now() // Start multiple publishers - var wg sync.WaitGroup for i := 0; i < *concurrency; i++ { wg.Add(1) go func(id int) { diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index e617af09f..9d02d5f7b 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -28,7 +28,7 @@ type EachPartitionPublishJob struct { generation int inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage] } -func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error { +func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string, wg *sync.WaitGroup) error { if err := p.doEnsureConfigureTopic(bootstrapBrokers); err != nil { return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err) @@ -39,10 +39,10 @@ func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error { generation := 0 var errChan chan EachPartitionError for { - glog.V(0).Infof("lookup partitions gen %d topic %s/%s", generation, p.namespace, p.topic) + glog.V(0).Infof("lookup partitions gen %d topic %s/%s", generation+1, p.namespace, p.topic) if assignments, err := p.doLookupTopicPartitions(bootstrapBrokers); err == nil { generation++ - glog.V(0).Infof("start generation %d", generation) + glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments)) if errChan == nil { errChan = make(chan EachPartitionError, len(assignments)) } @@ -53,6 +53,10 @@ func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error { continue } + if generation == 1 { + wg.Done() + } + // wait for any error to happen. If so, consume all remaining errors, and retry for { select { @@ -237,6 +241,10 @@ func (p *TopicPublisher) doLookupTopicPartitions(bootstrapBrokers []string) (ass return err } + if len(lookupResp.BrokerPartitionAssignments) == 0 { + return fmt.Errorf("no broker partition assignments") + } + assignments = lookupResp.BrokerPartitionAssignments return nil