diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index ad878a88c..7ea1db27d 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -54,9 +54,11 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) localTopicPartition = b.localTopicManager.GetTopicPartition(t, p) if localTopicPartition == nil { - response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) - glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) - return stream.Send(response) + if localTopicPartition, err = b.genLocalPartitionFromFiler(t, p); err != nil { + response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) + glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) + return stream.Send(response) + } } ackInterval = int(initMessage.AckInterval) stream.Send(response) @@ -141,34 +143,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis return nil } -func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) { - self := b.option.BrokerAddress() - - // load local topic partition from configuration on filer if not found - var conf *mq_pb.ConfigureTopicResponse - conf, err = b.readTopicConfFromFiler(t) - if err != nil { - return nil, err - } - - // create local topic partition - var hasCreated bool - for _, assignment := range conf.BrokerPartitionAssignments { - if assignment.LeaderBroker == string(self) && p.Equals(topic.FromPbPartition(assignment.Partition)) { - localTopicPartition = topic.FromPbBrokerPartitionAssignment(self, p, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) - b.localTopicManager.AddTopicPartition(t, localTopicPartition) - hasCreated = true - break - } - } - - if !hasCreated { - return nil, fmt.Errorf("topic %v partition %v not assigned to broker %v", t, p, self) - } - - return localTopicPartition, nil -} - // duplicated from master_grpc_server.go func findClientAddress(ctx context.Context) string { // fmt.Printf("FromContext %+v\n", ctx) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 72101ba86..8a6acadb9 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -22,8 +22,16 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) var localTopicPartition *topic.LocalPartition - localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) for localTopicPartition == nil { + localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) + if localTopicPartition == nil { + if localTopicPartition, err = b.genLocalPartitionFromFiler(t, partition); err != nil { + glog.V(1).Infof("topic %v partition %v not setup", t, partition) + } + } + if localTopicPartition != nil { + break + } time.Sleep(337 * time.Millisecond) // Check if the client has disconnected by monitoring the context select { @@ -38,7 +46,6 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest default: // Continue processing the request } - localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) } localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber()) diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index 710e95b38..0eeefbdf0 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -53,6 +53,23 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb. return conf, nil } +func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) { + self := b.option.BrokerAddress() + conf, err := b.readTopicConfFromFiler(t) + if err != nil { + return nil, err + } + for _, assignment := range conf.BrokerPartitionAssignments { + if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) { + localPartition = topic.FromPbBrokerPartitionAssignment(b.option.BrokerAddress(), partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) + b.localTopicManager.AddTopicPartition(t, localPartition) + break + } + } + + return localPartition, nil +} + func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) { // also fix assignee broker if invalid addedAssignments, updatedAssignments := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, conf.BrokerPartitionAssignments)