diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 72ddf30f3..6cb907794 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -39,7 +39,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis // 2. find the topic metadata owning filer // 3. write to the filer - var localTopicPartition *topic.LocalPartition req, err := stream.Recv() if err != nil { return err @@ -56,18 +55,13 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis // get or generate a local partition t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) - conf, readConfErr := b.readTopicConfFromFiler(t) - if readConfErr != nil { - response.Error = fmt.Sprintf("topic %v not found: %v", initMessage.Topic, readConfErr) - glog.Errorf("topic %v not found: %v", initMessage.Topic, readConfErr) - return stream.Send(response) - } - localTopicPartition, _, err = b.GetOrGenLocalPartition(t, p, conf) - if err != nil { - response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) - glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) + localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p) + if getOrGenErr != nil { + response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr) + glog.Errorf("topic %v not found: %v", t, getOrGenErr) return stream.Send(response) } + ackInterval = int(initMessage.AckInterval) // connect to follower brokers diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 724a34f7e..e07c7b67f 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -27,17 +27,9 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) - // get or generate a local partition - var localTopicPartition *topic.LocalPartition - conf, readConfErr := b.readTopicConfFromFiler(t) - if readConfErr != nil { - glog.Errorf("topic %v not found: %v", initMessage.Topic, readConfErr) - return fmt.Errorf("topic %v not found: %v", initMessage.Topic, readConfErr) - } - localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition, conf) - if err != nil { - glog.Errorf("topic %v partition %v not setup", initMessage.Topic, partition) - return fmt.Errorf("topic %v partition %v not setup", initMessage.Topic, partition) + localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition) + if getOrGenErr != nil { + return getOrGenErr } localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber()) @@ -105,6 +97,21 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest }) } +func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) { + // get or generate a local partition + conf, readConfErr := b.readTopicConfFromFiler(t) + if readConfErr != nil { + glog.Errorf("topic %v not found: %v", t, readConfErr) + return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr) + } + localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf) + if getOrGenError != nil { + glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) + return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) + } + return localTopicPartition, nil +} + func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer.MessagePosition) { if offset.StartTsNs != 0 { startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index b5d47e1c9..abe25a240 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -56,7 +56,7 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb. return conf, nil } -func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) { +func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) { b.accessLock.Lock() defer b.accessLock.Unlock()