diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 7ea1db27d..e4861e9bc 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -52,13 +52,11 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis var p topic.Partition if initMessage != nil { t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) - localTopicPartition = b.localTopicManager.GetTopicPartition(t, p) - if localTopicPartition == nil { - if localTopicPartition, err = b.genLocalPartitionFromFiler(t, p); err != nil { - response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) - glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) - return stream.Send(response) - } + localTopicPartition, err = b.GetOrGenLocalPartition(t, p) + if err != nil { + response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) + glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) + return stream.Send(response) } ackInterval = int(initMessage.AckInterval) stream.Send(response) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 1a34d49e7..ddd6786d0 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -22,13 +22,12 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) waitIntervalCount := 0 + var localTopicPartition *topic.LocalPartition for localTopicPartition == nil { - localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) - if localTopicPartition == nil { - if localTopicPartition, err = b.genLocalPartitionFromFiler(t, partition); err != nil { - glog.V(1).Infof("topic %v partition %v not setup", t, partition) - } + localTopicPartition, err = b.GetOrGenLocalPartition(t, partition) + if err != nil { + glog.V(1).Infof("topic %v partition %v not setup", t, partition) } if localTopicPartition != nil { break @@ -75,9 +74,9 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) } if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST { - startPosition = log_buffer.NewMessagePosition(1, -2) + startPosition = log_buffer.NewMessagePosition(1, -3) } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST { - startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -2) + startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4) } } diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index c5d8e3b78..dbd0d97c7 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -56,6 +56,19 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb. return conf, nil } +func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) { + b.accessLock.Lock() + defer b.accessLock.Unlock() + + if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil { + localPartition, err = b.genLocalPartitionFromFiler(t, partition) + if err != nil { + return nil, err + } + } + return localPartition, nil +} + func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) { self := b.option.BrokerAddress() conf, err := b.readTopicConfFromFiler(t)