diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go index 7cc8601eb..e9f8903e8 100644 --- a/weed/messaging/broker/topic_manager.go +++ b/weed/messaging/broker/topic_manager.go @@ -25,7 +25,7 @@ func (tp *TopicPartition) String() string { return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition) } -type TopicCursor struct { +type TopicControl struct { sync.Mutex cond *sync.Cond subscriberCount int @@ -36,18 +36,18 @@ type TopicCursor struct { type TopicManager struct { sync.Mutex - topicCursors map[TopicPartition]*TopicCursor + topicCursors map[TopicPartition]*TopicControl broker *MessageBroker } func NewTopicManager(messageBroker *MessageBroker) *TopicManager { return &TopicManager{ - topicCursors: make(map[TopicPartition]*TopicCursor), + topicCursors: make(map[TopicPartition]*TopicControl), broker: messageBroker, } } -func (tm *TopicManager) buildLogBuffer(tl *TopicCursor, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { +func (tm *TopicManager) buildLogBuffer(tc *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { flushFn := func(startTime, stopTime time.Time, buf []byte) { @@ -69,29 +69,29 @@ func (tm *TopicManager) buildLogBuffer(tl *TopicCursor, tp TopicPartition, topic } } logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() { - tl.subscriptions.NotifyAll() + tc.subscriptions.NotifyAll() }) return logBuffer } -func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicCursor { +func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl { tm.Lock() defer tm.Unlock() - lock, found := tm.topicCursors[partition] + tc, found := tm.topicCursors[partition] if !found { - lock = &TopicCursor{} - tm.topicCursors[partition] = lock - lock.subscriptions = NewTopicPartitionSubscriptions() - lock.logBuffer = tm.buildLogBuffer(lock, partition, topicConfig) + tc = &TopicControl{} + tm.topicCursors[partition] = tc + tc.subscriptions = NewTopicPartitionSubscriptions() + tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig) } if isPublisher { - lock.publisherCount++ + tc.publisherCount++ } else { - lock.subscriberCount++ + tc.subscriberCount++ } - return lock + return tc } func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) {