diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index eb6946e81..e7cbb6441 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -48,7 +48,6 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs Partition: in.Init.Partition, } lock := broker.topicManager.RequestLock(tp, topicConfig, false) - subscription := lock.subscriptions.AddSubscription(subscriberId) defer broker.topicManager.ReleaseLock(tp, false) lastReadTime := time.Now() @@ -103,7 +102,9 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs } err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { - subscription.Wait() + lock.Mutex.Lock() + lock.cond.Wait() + lock.Mutex.Unlock() return true }, eachLogEntryFn) diff --git a/weed/messaging/broker/subscription.go b/weed/messaging/broker/subscription.go deleted file mode 100644 index f74b0c546..000000000 --- a/weed/messaging/broker/subscription.go +++ /dev/null @@ -1,82 +0,0 @@ -package broker - -import ( - "sync" -) - -type TopicPartitionSubscription struct { - sync.Mutex - name string - lastReadTsNs int64 - cond *sync.Cond -} - -func NewTopicPartitionSubscription(name string) *TopicPartitionSubscription { - t := &TopicPartitionSubscription{ - name: name, - } - t.cond = sync.NewCond(t) - return t -} - -func (s *TopicPartitionSubscription) Wait() { - s.Mutex.Lock() - s.cond.Wait() - s.Mutex.Unlock() -} - -func (s *TopicPartitionSubscription) NotifyOne() { - // notify one waiting goroutine - s.cond.Signal() -} - -type TopicPartitionSubscriptions struct { - sync.Mutex - cond *sync.Cond - subscriptions map[string]*TopicPartitionSubscription - subscriptionsLock sync.RWMutex -} - -func NewTopicPartitionSubscriptions() *TopicPartitionSubscriptions { - m := &TopicPartitionSubscriptions{ - subscriptions: make(map[string]*TopicPartitionSubscription), - } - m.cond = sync.NewCond(m) - return m -} - -func (m *TopicPartitionSubscriptions) AddSubscription(subscription string) *TopicPartitionSubscription { - m.subscriptionsLock.Lock() - defer m.subscriptionsLock.Unlock() - - if s, found := m.subscriptions[subscription]; found { - return s - } - - s := NewTopicPartitionSubscription(subscription) - m.subscriptions[subscription] = s - - return s - -} - -func (m *TopicPartitionSubscriptions) NotifyAll() { - - m.subscriptionsLock.RLock() - defer m.subscriptionsLock.RUnlock() - - for name, tps := range m.subscriptions { - println("notifying", name) - tps.NotifyOne() - } - -} - -func (m *TopicPartitionSubscriptions) Wait() { - m.Mutex.Lock() - m.cond.Wait() - for _, tps := range m.subscriptions { - tps.NotifyOne() - } - m.Mutex.Unlock() -} diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go index e9f8903e8..b563fffa1 100644 --- a/weed/messaging/broker/topic_manager.go +++ b/weed/messaging/broker/topic_manager.go @@ -31,23 +31,22 @@ type TopicControl struct { subscriberCount int publisherCount int logBuffer *log_buffer.LogBuffer - subscriptions *TopicPartitionSubscriptions } type TopicManager struct { sync.Mutex - topicCursors map[TopicPartition]*TopicControl - broker *MessageBroker + topicControls map[TopicPartition]*TopicControl + broker *MessageBroker } func NewTopicManager(messageBroker *MessageBroker) *TopicManager { return &TopicManager{ - topicCursors: make(map[TopicPartition]*TopicControl), - broker: messageBroker, + topicControls: make(map[TopicPartition]*TopicControl), + broker: messageBroker, } } -func (tm *TopicManager) buildLogBuffer(tc *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { +func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { flushFn := func(startTime, stopTime time.Time, buf []byte) { @@ -69,7 +68,7 @@ func (tm *TopicManager) buildLogBuffer(tc *TopicControl, tp TopicPartition, topi } } logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() { - tc.subscriptions.NotifyAll() + tl.cond.Broadcast() }) return logBuffer @@ -79,11 +78,11 @@ func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messa tm.Lock() defer tm.Unlock() - tc, found := tm.topicCursors[partition] + tc, found := tm.topicControls[partition] if !found { tc = &TopicControl{} - tm.topicCursors[partition] = tc - tc.subscriptions = NewTopicPartitionSubscriptions() + tc.cond = sync.NewCond(&tc.Mutex) + tm.topicControls[partition] = tc tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig) } if isPublisher { @@ -98,7 +97,7 @@ func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) tm.Lock() defer tm.Unlock() - lock, found := tm.topicCursors[partition] + lock, found := tm.topicControls[partition] if !found { return } @@ -108,7 +107,7 @@ func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) lock.subscriberCount-- } if lock.subscriberCount <= 0 && lock.publisherCount <= 0 { - delete(tm.topicCursors, partition) + delete(tm.topicControls, partition) lock.logBuffer.Shutdown() } } @@ -117,7 +116,7 @@ func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) { tm.Lock() defer tm.Unlock() - for k := range tm.topicCursors { + for k := range tm.topicControls { tps = append(tps, k) } return