diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index b49976b5a..6f5b2312d 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -31,7 +31,7 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) { fmt.Println(err) break } - println("Published", string(key), string(value)) + // println("Published", string(key), string(value)) } elapsed := time.Since(startTime) log.Printf("Publisher %d finished in %s", id, elapsed) @@ -43,22 +43,13 @@ func main() { Topic: topic.NewTopic(*namespace, *t), CreateTopic: true, CreateTopicPartitionCount: int32(*partitionCount), + Brokers: strings.Split(*seedBrokers, ","), } publisher := pub_client.NewTopicPublisher(config) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - brokers := strings.Split(*seedBrokers, ",") - if err := publisher.StartSchedulerThread(brokers, &wg); err != nil { - fmt.Println(err) - return - } - }() - - wg.Wait() startTime := time.Now() + var wg sync.WaitGroup // Start multiple publishers for i := 0; i < *concurrency; i++ { wg.Add(1) diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index 1ffbeea46..68082a70f 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -8,6 +8,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "log" "sync" "time" ) @@ -16,6 +17,7 @@ type PublisherConfiguration struct { Topic topic.Topic CreateTopic bool CreateTopicPartitionCount int32 + Brokers []string } type PublishClient struct { @@ -32,13 +34,26 @@ type TopicPublisher struct { } func NewTopicPublisher(config *PublisherConfiguration) *TopicPublisher { - return &TopicPublisher{ + tp := &TopicPublisher{ partition2Buffer: interval.NewSearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage]](func(a, b int32) int { return int(a - b) }), grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), config: config, } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + if err := tp.StartSchedulerThread(&wg); err != nil { + log.Println(err) + return + } + }() + + wg.Wait() + + return tp } func (p *TopicPublisher) Shutdown() error { diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index 2b9f186e1..12cbe303d 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -28,9 +28,9 @@ type EachPartitionPublishJob struct { generation int inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage] } -func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string, wg *sync.WaitGroup) error { +func (p *TopicPublisher) StartSchedulerThread(wg *sync.WaitGroup) error { - if err := p.doEnsureConfigureTopic(bootstrapBrokers); err != nil { + if err := p.doEnsureConfigureTopic(); err != nil { return fmt.Errorf("configure topic %s: %v", p.config.Topic, err) } @@ -40,7 +40,7 @@ func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string, wg *syn var errChan chan EachPartitionError for { glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic) - if assignments, err := p.doLookupTopicPartitions(bootstrapBrokers); err == nil { + if assignments, err := p.doLookupTopicPartitions(); err == nil { generation++ glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments)) if errChan == nil { @@ -183,12 +183,12 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro return nil } -func (p *TopicPublisher) doEnsureConfigureTopic(bootstrapBrokers []string) (err error) { - if len(bootstrapBrokers) == 0 { +func (p *TopicPublisher) doEnsureConfigureTopic() (err error) { + if len(p.config.Brokers) == 0 { return fmt.Errorf("no bootstrap brokers") } var lastErr error - for _, brokerAddress := range bootstrapBrokers { + for _, brokerAddress := range p.config.Brokers { err = pb.WithBrokerGrpcClient(false, brokerAddress, p.grpcDialOption, @@ -212,12 +212,12 @@ func (p *TopicPublisher) doEnsureConfigureTopic(bootstrapBrokers []string) (err return nil } -func (p *TopicPublisher) doLookupTopicPartitions(bootstrapBrokers []string) (assignments []*mq_pb.BrokerPartitionAssignment, err error) { - if len(bootstrapBrokers) == 0 { +func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) { + if len(p.config.Brokers) == 0 { return nil, fmt.Errorf("no bootstrap brokers") } var lastErr error - for _, brokerAddress := range bootstrapBrokers { + for _, brokerAddress := range p.config.Brokers { err := pb.WithBrokerGrpcClient(false, brokerAddress, p.grpcDialOption,