|
@ -25,7 +25,7 @@ func (tp *TopicPartition) String() string { |
|
|
return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition) |
|
|
return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
type TopicLock struct { |
|
|
|
|
|
|
|
|
type TopicControl struct { |
|
|
sync.Mutex |
|
|
sync.Mutex |
|
|
cond *sync.Cond |
|
|
cond *sync.Cond |
|
|
subscriberCount int |
|
|
subscriberCount int |
|
@ -33,20 +33,20 @@ type TopicLock struct { |
|
|
logBuffer *log_buffer.LogBuffer |
|
|
logBuffer *log_buffer.LogBuffer |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
type TopicLocks struct { |
|
|
|
|
|
|
|
|
type TopicManager struct { |
|
|
sync.Mutex |
|
|
sync.Mutex |
|
|
locks map[TopicPartition]*TopicLock |
|
|
|
|
|
broker *MessageBroker |
|
|
|
|
|
|
|
|
topicControls map[TopicPartition]*TopicControl |
|
|
|
|
|
broker *MessageBroker |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func NewTopicLocks(messageBroker *MessageBroker) *TopicLocks { |
|
|
|
|
|
return &TopicLocks{ |
|
|
|
|
|
locks: make(map[TopicPartition]*TopicLock), |
|
|
|
|
|
broker: messageBroker, |
|
|
|
|
|
|
|
|
func NewTopicManager(messageBroker *MessageBroker) *TopicManager { |
|
|
|
|
|
return &TopicManager{ |
|
|
|
|
|
topicControls: make(map[TopicPartition]*TopicControl), |
|
|
|
|
|
broker: messageBroker, |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { |
|
|
|
|
|
|
|
|
func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { |
|
|
|
|
|
|
|
|
flushFn := func(startTime, stopTime time.Time, buf []byte) { |
|
|
flushFn := func(startTime, stopTime time.Time, buf []byte) { |
|
|
|
|
|
|
|
@ -63,7 +63,7 @@ func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicC |
|
|
tp.Partition, |
|
|
tp.Partition, |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
if err := locks.broker.appendToFile(targetFile, topicConfig, buf); err != nil { |
|
|
|
|
|
|
|
|
if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil { |
|
|
glog.V(0).Infof("log write failed %s: %v", targetFile, err) |
|
|
glog.V(0).Infof("log write failed %s: %v", targetFile, err) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -74,16 +74,16 @@ func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicC |
|
|
return logBuffer |
|
|
return logBuffer |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (tl *TopicLocks) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicLock { |
|
|
|
|
|
tl.Lock() |
|
|
|
|
|
defer tl.Unlock() |
|
|
|
|
|
|
|
|
func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl { |
|
|
|
|
|
tm.Lock() |
|
|
|
|
|
defer tm.Unlock() |
|
|
|
|
|
|
|
|
lock, found := tl.locks[partition] |
|
|
|
|
|
|
|
|
lock, found := tm.topicControls[partition] |
|
|
if !found { |
|
|
if !found { |
|
|
lock = &TopicLock{} |
|
|
|
|
|
|
|
|
lock = &TopicControl{} |
|
|
lock.cond = sync.NewCond(&lock.Mutex) |
|
|
lock.cond = sync.NewCond(&lock.Mutex) |
|
|
tl.locks[partition] = lock |
|
|
|
|
|
lock.logBuffer = tl.buildLogBuffer(lock, partition, topicConfig) |
|
|
|
|
|
|
|
|
tm.topicControls[partition] = lock |
|
|
|
|
|
lock.logBuffer = tm.buildLogBuffer(lock, partition, topicConfig) |
|
|
} |
|
|
} |
|
|
if isPublisher { |
|
|
if isPublisher { |
|
|
lock.publisherCount++ |
|
|
lock.publisherCount++ |
|
@ -93,11 +93,11 @@ func (tl *TopicLocks) RequestLock(partition TopicPartition, topicConfig *messagi |
|
|
return lock |
|
|
return lock |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) { |
|
|
|
|
|
tl.Lock() |
|
|
|
|
|
defer tl.Unlock() |
|
|
|
|
|
|
|
|
func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) { |
|
|
|
|
|
tm.Lock() |
|
|
|
|
|
defer tm.Unlock() |
|
|
|
|
|
|
|
|
lock, found := tl.locks[partition] |
|
|
|
|
|
|
|
|
lock, found := tm.topicControls[partition] |
|
|
if !found { |
|
|
if !found { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
@ -107,16 +107,16 @@ func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) { |
|
|
lock.subscriberCount-- |
|
|
lock.subscriberCount-- |
|
|
} |
|
|
} |
|
|
if lock.subscriberCount <= 0 && lock.publisherCount <= 0 { |
|
|
if lock.subscriberCount <= 0 && lock.publisherCount <= 0 { |
|
|
delete(tl.locks, partition) |
|
|
|
|
|
|
|
|
delete(tm.topicControls, partition) |
|
|
lock.logBuffer.Shutdown() |
|
|
lock.logBuffer.Shutdown() |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (tl *TopicLocks) ListTopicPartitions() (tps []TopicPartition) { |
|
|
|
|
|
tl.Lock() |
|
|
|
|
|
defer tl.Unlock() |
|
|
|
|
|
|
|
|
func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) { |
|
|
|
|
|
tm.Lock() |
|
|
|
|
|
defer tm.Unlock() |
|
|
|
|
|
|
|
|
for k := range tl.locks { |
|
|
|
|
|
|
|
|
for k := range tm.topicControls { |
|
|
tps = append(tps, k) |
|
|
tps = append(tps, k) |
|
|
} |
|
|
} |
|
|
return |
|
|
return |