|
|
@ -36,13 +36,13 @@ type TopicCursor struct { |
|
|
|
|
|
|
|
type TopicManager struct { |
|
|
|
sync.Mutex |
|
|
|
topicControls map[TopicPartition]*TopicCursor |
|
|
|
topicCursors map[TopicPartition]*TopicCursor |
|
|
|
broker *MessageBroker |
|
|
|
} |
|
|
|
|
|
|
|
func NewTopicManager(messageBroker *MessageBroker) *TopicManager { |
|
|
|
return &TopicManager{ |
|
|
|
topicControls: make(map[TopicPartition]*TopicCursor), |
|
|
|
topicCursors: make(map[TopicPartition]*TopicCursor), |
|
|
|
broker: messageBroker, |
|
|
|
} |
|
|
|
} |
|
|
@ -79,10 +79,10 @@ func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messa |
|
|
|
tm.Lock() |
|
|
|
defer tm.Unlock() |
|
|
|
|
|
|
|
lock, found := tm.topicControls[partition] |
|
|
|
lock, found := tm.topicCursors[partition] |
|
|
|
if !found { |
|
|
|
lock = &TopicCursor{} |
|
|
|
tm.topicControls[partition] = lock |
|
|
|
tm.topicCursors[partition] = lock |
|
|
|
lock.subscriptions = NewTopicPartitionSubscriptions() |
|
|
|
lock.logBuffer = tm.buildLogBuffer(lock, partition, topicConfig) |
|
|
|
} |
|
|
@ -98,7 +98,7 @@ func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) |
|
|
|
tm.Lock() |
|
|
|
defer tm.Unlock() |
|
|
|
|
|
|
|
lock, found := tm.topicControls[partition] |
|
|
|
lock, found := tm.topicCursors[partition] |
|
|
|
if !found { |
|
|
|
return |
|
|
|
} |
|
|
@ -108,7 +108,7 @@ func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) |
|
|
|
lock.subscriberCount-- |
|
|
|
} |
|
|
|
if lock.subscriberCount <= 0 && lock.publisherCount <= 0 { |
|
|
|
delete(tm.topicControls, partition) |
|
|
|
delete(tm.topicCursors, partition) |
|
|
|
lock.logBuffer.Shutdown() |
|
|
|
} |
|
|
|
} |
|
|
@ -117,7 +117,7 @@ func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) { |
|
|
|
tm.Lock() |
|
|
|
defer tm.Unlock() |
|
|
|
|
|
|
|
for k := range tm.topicControls { |
|
|
|
for k := range tm.topicCursors { |
|
|
|
tps = append(tps, k) |
|
|
|
} |
|
|
|
return |
|
|
|