diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 8c46ea99d..4d37d5393 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -53,7 +53,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis var p topic.Partition if initMessage != nil { t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) - localTopicPartition, err = b.GetOrGenLocalPartition(t, p) + localTopicPartition, _, err = b.GetOrGenLocalPartition(t, p) if err != nil { response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 5fd4522bd..e6027d26b 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -26,7 +26,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest var localTopicPartition *topic.LocalPartition for localTopicPartition == nil { - localTopicPartition, err = b.GetOrGenLocalPartition(t, partition) + localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition) if err != nil { glog.V(1).Infof("topic %v partition %v not setup", t, partition) } @@ -143,7 +143,7 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe var localTopicPartition *topic.LocalPartition for localTopicPartition == nil { - localTopicPartition, err = b.GetOrGenLocalPartition(t, partition) + localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition) if err != nil { glog.V(1).Infof("topic %v partition %v not setup", t, partition) } diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index 4bcb62931..35d95c0e4 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -56,34 +56,35 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb. return conf, nil } -func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) { +func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, isGenerated bool, err error) { b.accessLock.Lock() defer b.accessLock.Unlock() if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil { - localPartition, err = b.genLocalPartitionFromFiler(t, partition) + localPartition, isGenerated, err = b.genLocalPartitionFromFiler(t, partition) if err != nil { - return nil, err + return nil, false, err } } - return localPartition, nil + return localPartition, isGenerated, nil } -func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) { +func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, isGenerated bool, err error) { self := b.option.BrokerAddress() conf, err := b.readTopicConfFromFiler(t) if err != nil { - return nil, err + return nil, isGenerated, err } for _, assignment := range conf.BrokerPartitionAssignments { if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) { localPartition = topic.FromPbBrokerPartitionAssignment(b.option.BrokerAddress(), partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) b.localTopicManager.AddTopicPartition(t, localPartition) + isGenerated = true break } } - return localPartition, nil + return localPartition, isGenerated, nil } func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) {