diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go index 48a20e334..ee69db30d 100644 --- a/weed/mq/broker/broker_grpc_assign.go +++ b/weed/mq/broker/broker_grpc_assign.go @@ -22,12 +22,12 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m b.accessLock.Lock() if request.IsDraining { // TODO drain existing topic partition subscriptions - b.localTopicManager.RemoveTopicPartition(t, partition) + b.localTopicManager.RemoveLocalPartition(t, partition) } else { var localPartition *topic.LocalPartition - if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil { + if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil { localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) - b.localTopicManager.AddTopicPartition(t, localPartition) + b.localTopicManager.AddLocalPartition(t, localPartition) } } b.accessLock.Unlock() diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 9c004e31f..fb0e3a11f 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -110,7 +110,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis // remove the publisher localTopicPartition.Publishers.RemovePublisher(clientName) if localTopicPartition.MaybeShutdownLocalPartition() { - b.localTopicManager.RemoveTopicPartition(t, p) + b.localTopicManager.RemoveLocalPartition(t, p) glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) } }() diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 6c03ba409..02488b2b0 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -43,7 +43,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest localTopicPartition.Subscribers.RemoveSubscriber(clientName) glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter) if localTopicPartition.MaybeShutdownLocalPartition() { - b.localTopicManager.RemoveTopicPartition(t, partition) + b.localTopicManager.RemoveLocalPartition(t, partition) } }() diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index dfdf9a708..cddd6cf1c 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -75,7 +75,7 @@ func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition t b.accessLock.Lock() defer b.accessLock.Unlock() - if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil { + if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil { localPartition, isGenerated, err = b.genLocalPartitionFromFiler(t, partition, conf) if err != nil { return nil, false, err @@ -89,7 +89,7 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition for _, assignment := range conf.BrokerPartitionAssignments { if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) { localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) - b.localTopicManager.AddTopicPartition(t, localPartition) + b.localTopicManager.AddLocalPartition(t, localPartition) isGenerated = true break } diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index 02334a27e..79a84561c 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -19,8 +19,8 @@ func NewLocalTopicManager() *LocalTopicManager { } } -// AddTopicPartition adds a topic to the local topic manager -func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) { +// AddLocalPartition adds a topic to the local topic manager +func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition *LocalPartition) { localTopic, ok := manager.topics.Get(topic.String()) if !ok { localTopic = NewLocalTopic(topic) @@ -34,8 +34,8 @@ func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition localTopic.Partitions = append(localTopic.Partitions, localPartition) } -// GetTopicPartition gets a topic from the local topic manager -func (manager *LocalTopicManager) GetTopicPartition(topic Topic, partition Partition) *LocalPartition { +// GetLocalPartition gets a topic from the local topic manager +func (manager *LocalTopicManager) GetLocalPartition(topic Topic, partition Partition) *LocalPartition { localTopic, ok := manager.topics.Get(topic.String()) if !ok { return nil @@ -48,7 +48,7 @@ func (manager *LocalTopicManager) RemoveTopic(topic Topic) { manager.topics.Remove(topic.String()) } -func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Partition) (removed bool) { +func (manager *LocalTopicManager) RemoveLocalPartition(topic Topic, partition Partition) (removed bool) { localTopic, ok := manager.topics.Get(topic.String()) if !ok { return false