diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index 419f68f42..b49976b5a 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "log" "strings" "sync" @@ -16,7 +17,7 @@ var ( partitionCount = flag.Int("p", 6, "partition count") namespace = flag.String("ns", "test", "namespace") - topic = flag.String("topic", "test", "topic") + t = flag.String("t", "test", "t") seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") ) @@ -39,10 +40,11 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) { func main() { flag.Parse() config := &pub_client.PublisherConfiguration{ + Topic: topic.NewTopic(*namespace, *t), CreateTopic: true, CreateTopicPartitionCount: int32(*partitionCount), } - publisher := pub_client.NewTopicPublisher(*namespace, *topic, config) + publisher := pub_client.NewTopicPublisher(config) wg := sync.WaitGroup{} wg.Add(1) go func() { diff --git a/weed/mq/client/pub_client/connect.go b/weed/mq/client/pub_client/connect.go index 7f6d62a67..045c9593c 100644 --- a/weed/mq/client/pub_client/connect.go +++ b/weed/mq/client/pub_client/connect.go @@ -32,10 +32,7 @@ func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress str if err = publishClient.Send(&mq_pb.PublishMessageRequest{ Message: &mq_pb.PublishMessageRequest_Init{ Init: &mq_pb.PublishMessageRequest_InitMessage{ - Topic: &mq_pb.Topic{ - Namespace: p.namespace, - Name: p.topic, - }, + Topic: p.config.Topic.ToPbTopic(), Partition: &mq_pb.Partition{ RingSize: partition.RingSize, RangeStart: partition.RangeStart, diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index 5a134b3c2..1ffbeea46 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -3,6 +3,7 @@ package pub_client import ( "github.com/rdleal/intervalst/interval" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" "google.golang.org/grpc" @@ -12,6 +13,7 @@ import ( ) type PublisherConfiguration struct { + Topic topic.Topic CreateTopic bool CreateTopicPartitionCount int32 } @@ -22,8 +24,6 @@ type PublishClient struct { Err error } type TopicPublisher struct { - namespace string - topic string partition2Buffer *interval.SearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage], int32] grpcDialOption grpc.DialOption sync.Mutex // protects grpc @@ -31,10 +31,8 @@ type TopicPublisher struct { jobs []*EachPartitionPublishJob } -func NewTopicPublisher(namespace, topic string, config *PublisherConfiguration) *TopicPublisher { +func NewTopicPublisher(config *PublisherConfiguration) *TopicPublisher { return &TopicPublisher{ - namespace: namespace, - topic: topic, partition2Buffer: interval.NewSearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage]](func(a, b int32) int { return int(a - b) }), diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index 9d02d5f7b..2b9f186e1 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -31,15 +31,15 @@ type EachPartitionPublishJob struct { func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string, wg *sync.WaitGroup) error { if err := p.doEnsureConfigureTopic(bootstrapBrokers); err != nil { - return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err) + return fmt.Errorf("configure topic %s: %v", p.config.Topic, err) } - log.Printf("start scheduler thread for topic %s/%s", p.namespace, p.topic) + log.Printf("start scheduler thread for topic %s", p.config.Topic) generation := 0 var errChan chan EachPartitionError for { - glog.V(0).Infof("lookup partitions gen %d topic %s/%s", generation+1, p.namespace, p.topic) + glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic) if assignments, err := p.doLookupTopicPartitions(bootstrapBrokers); err == nil { generation++ glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments)) @@ -48,7 +48,7 @@ func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string, wg *syn } p.onEachAssignments(generation, assignments, errChan) } else { - glog.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err) + glog.Errorf("lookup topic %s: %v", p.config.Topic, err) time.Sleep(5 * time.Second) continue } @@ -61,7 +61,7 @@ func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string, wg *syn for { select { case eachErr := <-errChan: - glog.Errorf("gen %d publish to topic %s/%s partition %v: %v", eachErr.generation, p.namespace, p.topic, eachErr.Partition, eachErr.Err) + glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err) if eachErr.generation < generation { continue } @@ -140,10 +140,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro if err = publishClient.Send(&mq_pb.PublishMessageRequest{ Message: &mq_pb.PublishMessageRequest_Init{ Init: &mq_pb.PublishMessageRequest_InitMessage{ - Topic: &mq_pb.Topic{ - Namespace: p.namespace, - Name: p.topic, - }, + Topic: p.config.Topic.ToPbTopic(), Partition: job.Partition, AckInterval: 128, }, @@ -197,10 +194,7 @@ func (p *TopicPublisher) doEnsureConfigureTopic(bootstrapBrokers []string) (err p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ - Topic: &mq_pb.Topic{ - Namespace: p.namespace, - Name: p.topic, - }, + Topic: p.config.Topic.ToPbTopic(), PartitionCount: p.config.CreateTopicPartitionCount, }) return err @@ -213,7 +207,7 @@ func (p *TopicPublisher) doEnsureConfigureTopic(bootstrapBrokers []string) (err } if lastErr != nil { - return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err) + return fmt.Errorf("configure topic %s: %v", p.config.Topic, err) } return nil } @@ -230,12 +224,9 @@ func (p *TopicPublisher) doLookupTopicPartitions(bootstrapBrokers []string) (ass func(client mq_pb.SeaweedMessagingClient) error { lookupResp, err := client.LookupTopicBrokers(context.Background(), &mq_pb.LookupTopicBrokersRequest{ - Topic: &mq_pb.Topic{ - Namespace: p.namespace, - Name: p.topic, - }, + Topic: p.config.Topic.ToPbTopic(), }) - glog.V(0).Infof("lookup topic %s/%s: %v", p.namespace, p.topic, lookupResp) + glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp) if err != nil { return err @@ -256,6 +247,6 @@ func (p *TopicPublisher) doLookupTopicPartitions(bootstrapBrokers []string) (ass } } - return nil, fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, lastErr) + return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr) }