|
|
@ -25,7 +25,7 @@ func (tp *TopicPartition) String() string { |
|
|
|
return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition) |
|
|
|
} |
|
|
|
|
|
|
|
type TopicCursor struct { |
|
|
|
type TopicControl struct { |
|
|
|
sync.Mutex |
|
|
|
cond *sync.Cond |
|
|
|
subscriberCount int |
|
|
@ -36,18 +36,18 @@ type TopicCursor struct { |
|
|
|
|
|
|
|
type TopicManager struct { |
|
|
|
sync.Mutex |
|
|
|
topicCursors map[TopicPartition]*TopicCursor |
|
|
|
topicCursors map[TopicPartition]*TopicControl |
|
|
|
broker *MessageBroker |
|
|
|
} |
|
|
|
|
|
|
|
func NewTopicManager(messageBroker *MessageBroker) *TopicManager { |
|
|
|
return &TopicManager{ |
|
|
|
topicCursors: make(map[TopicPartition]*TopicCursor), |
|
|
|
topicCursors: make(map[TopicPartition]*TopicControl), |
|
|
|
broker: messageBroker, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (tm *TopicManager) buildLogBuffer(tl *TopicCursor, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { |
|
|
|
func (tm *TopicManager) buildLogBuffer(tc *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { |
|
|
|
|
|
|
|
flushFn := func(startTime, stopTime time.Time, buf []byte) { |
|
|
|
|
|
|
@ -69,29 +69,29 @@ func (tm *TopicManager) buildLogBuffer(tl *TopicCursor, tp TopicPartition, topic |
|
|
|
} |
|
|
|
} |
|
|
|
logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() { |
|
|
|
tl.subscriptions.NotifyAll() |
|
|
|
tc.subscriptions.NotifyAll() |
|
|
|
}) |
|
|
|
|
|
|
|
return logBuffer |
|
|
|
} |
|
|
|
|
|
|
|
func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicCursor { |
|
|
|
func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl { |
|
|
|
tm.Lock() |
|
|
|
defer tm.Unlock() |
|
|
|
|
|
|
|
lock, found := tm.topicCursors[partition] |
|
|
|
tc, found := tm.topicCursors[partition] |
|
|
|
if !found { |
|
|
|
lock = &TopicCursor{} |
|
|
|
tm.topicCursors[partition] = lock |
|
|
|
lock.subscriptions = NewTopicPartitionSubscriptions() |
|
|
|
lock.logBuffer = tm.buildLogBuffer(lock, partition, topicConfig) |
|
|
|
tc = &TopicControl{} |
|
|
|
tm.topicCursors[partition] = tc |
|
|
|
tc.subscriptions = NewTopicPartitionSubscriptions() |
|
|
|
tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig) |
|
|
|
} |
|
|
|
if isPublisher { |
|
|
|
lock.publisherCount++ |
|
|
|
tc.publisherCount++ |
|
|
|
} else { |
|
|
|
lock.subscriberCount++ |
|
|
|
tc.subscriberCount++ |
|
|
|
} |
|
|
|
return lock |
|
|
|
return tc |
|
|
|
} |
|
|
|
|
|
|
|
func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) { |
|
|
|