Browse Source

renaming

mq-subscribe
chrislu 10 months ago
parent
commit
e568e742c9
  1. 6
      weed/mq/broker/broker_grpc_assign.go
  2. 2
      weed/mq/broker/broker_grpc_pub.go
  3. 2
      weed/mq/broker/broker_grpc_sub.go
  4. 4
      weed/mq/broker/broker_topic_conf_read_write.go
  5. 10
      weed/mq/topic/local_manager.go

6
weed/mq/broker/broker_grpc_assign.go

@ -22,12 +22,12 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
b.accessLock.Lock() b.accessLock.Lock()
if request.IsDraining { if request.IsDraining {
// TODO drain existing topic partition subscriptions // TODO drain existing topic partition subscriptions
b.localTopicManager.RemoveTopicPartition(t, partition)
b.localTopicManager.RemoveLocalPartition(t, partition)
} else { } else {
var localPartition *topic.LocalPartition var localPartition *topic.LocalPartition
if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil {
localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
b.localTopicManager.AddTopicPartition(t, localPartition)
b.localTopicManager.AddLocalPartition(t, localPartition)
} }
} }
b.accessLock.Unlock() b.accessLock.Unlock()

2
weed/mq/broker/broker_grpc_pub.go

@ -110,7 +110,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
// remove the publisher // remove the publisher
localTopicPartition.Publishers.RemovePublisher(clientName) localTopicPartition.Publishers.RemovePublisher(clientName)
if localTopicPartition.MaybeShutdownLocalPartition() { if localTopicPartition.MaybeShutdownLocalPartition() {
b.localTopicManager.RemoveTopicPartition(t, p)
b.localTopicManager.RemoveLocalPartition(t, p)
glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
} }
}() }()

2
weed/mq/broker/broker_grpc_sub.go

@ -43,7 +43,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
localTopicPartition.Subscribers.RemoveSubscriber(clientName) localTopicPartition.Subscribers.RemoveSubscriber(clientName)
glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter) glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
if localTopicPartition.MaybeShutdownLocalPartition() { if localTopicPartition.MaybeShutdownLocalPartition() {
b.localTopicManager.RemoveTopicPartition(t, partition)
b.localTopicManager.RemoveLocalPartition(t, partition)
} }
}() }()

4
weed/mq/broker/broker_topic_conf_read_write.go

@ -75,7 +75,7 @@ func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition t
b.accessLock.Lock() b.accessLock.Lock()
defer b.accessLock.Unlock() defer b.accessLock.Unlock()
if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil {
localPartition, isGenerated, err = b.genLocalPartitionFromFiler(t, partition, conf) localPartition, isGenerated, err = b.genLocalPartitionFromFiler(t, partition, conf)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
@ -89,7 +89,7 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition
for _, assignment := range conf.BrokerPartitionAssignments { for _, assignment := range conf.BrokerPartitionAssignments {
if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) { if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) {
localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
b.localTopicManager.AddTopicPartition(t, localPartition)
b.localTopicManager.AddLocalPartition(t, localPartition)
isGenerated = true isGenerated = true
break break
} }

10
weed/mq/topic/local_manager.go

@ -19,8 +19,8 @@ func NewLocalTopicManager() *LocalTopicManager {
} }
} }
// AddTopicPartition adds a topic to the local topic manager
func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) {
// AddLocalPartition adds a topic to the local topic manager
func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition *LocalPartition) {
localTopic, ok := manager.topics.Get(topic.String()) localTopic, ok := manager.topics.Get(topic.String())
if !ok { if !ok {
localTopic = NewLocalTopic(topic) localTopic = NewLocalTopic(topic)
@ -34,8 +34,8 @@ func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition
localTopic.Partitions = append(localTopic.Partitions, localPartition) localTopic.Partitions = append(localTopic.Partitions, localPartition)
} }
// GetTopicPartition gets a topic from the local topic manager
func (manager *LocalTopicManager) GetTopicPartition(topic Topic, partition Partition) *LocalPartition {
// GetLocalPartition gets a topic from the local topic manager
func (manager *LocalTopicManager) GetLocalPartition(topic Topic, partition Partition) *LocalPartition {
localTopic, ok := manager.topics.Get(topic.String()) localTopic, ok := manager.topics.Get(topic.String())
if !ok { if !ok {
return nil return nil
@ -48,7 +48,7 @@ func (manager *LocalTopicManager) RemoveTopic(topic Topic) {
manager.topics.Remove(topic.String()) manager.topics.Remove(topic.String())
} }
func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Partition) (removed bool) {
func (manager *LocalTopicManager) RemoveLocalPartition(topic Topic, partition Partition) (removed bool) {
localTopic, ok := manager.topics.Get(topic.String()) localTopic, ok := manager.topics.Get(topic.String())
if !ok { if !ok {
return false return false

Loading…
Cancel
Save