Browse Source

refactor

pull/5890/head
chrislu 5 months ago
parent
commit
b6fd1ff4ce
  1. 5
      weed/mq/topic/local_manager.go
  2. 6
      weed/mq/topic/local_topic.go

5
weed/mq/topic/local_manager.go

@ -28,10 +28,7 @@ func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition
if !manager.topics.SetIfAbsent(topic.String(), localTopic) { if !manager.topics.SetIfAbsent(topic.String(), localTopic) {
localTopic, _ = manager.topics.Get(topic.String()) localTopic, _ = manager.topics.Get(topic.String())
} }
if localTopic.findPartition(localPartition.Partition) != nil {
return
}
localTopic.Partitions = append(localTopic.Partitions, localPartition)
localTopic.AddPartition(localPartition)
} }
// GetLocalPartition gets a topic from the local topic manager // GetLocalPartition gets a topic from the local topic manager

6
weed/mq/topic/local_topic.go

@ -37,6 +37,12 @@ func (localTopic *LocalTopic) removePartition(partition Partition) bool {
localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...) localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...)
return true return true
} }
func (localTopic *LocalTopic) AddPartition(localPartition *LocalPartition) {
if localTopic.findPartition(localPartition.Partition) != nil {
return
}
localTopic.Partitions = append(localTopic.Partitions, localPartition)
}
func (localTopic *LocalTopic) closePartitionPublishers(unixTsNs int64) bool { func (localTopic *LocalTopic) closePartitionPublishers(unixTsNs int64) bool {
var wg sync.WaitGroup var wg sync.WaitGroup

Loading…
Cancel
Save