Browse Source

revert to one subscriber one thread

pull/1318/head
Chris Lu 5 years ago
parent
commit
e02a8c67da
  1. 5
      weed/messaging/broker/broker_grpc_server_subscribe.go
  2. 82
      weed/messaging/broker/subscription.go
  3. 21
      weed/messaging/broker/topic_manager.go

5
weed/messaging/broker/broker_grpc_server_subscribe.go

@ -48,7 +48,6 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
Partition: in.Init.Partition, Partition: in.Init.Partition,
} }
lock := broker.topicManager.RequestLock(tp, topicConfig, false) lock := broker.topicManager.RequestLock(tp, topicConfig, false)
subscription := lock.subscriptions.AddSubscription(subscriberId)
defer broker.topicManager.ReleaseLock(tp, false) defer broker.topicManager.ReleaseLock(tp, false)
lastReadTime := time.Now() lastReadTime := time.Now()
@ -103,7 +102,9 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
} }
err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
subscription.Wait()
lock.Mutex.Lock()
lock.cond.Wait()
lock.Mutex.Unlock()
return true return true
}, eachLogEntryFn) }, eachLogEntryFn)

82
weed/messaging/broker/subscription.go

@ -1,82 +0,0 @@
package broker
import (
"sync"
)
type TopicPartitionSubscription struct {
sync.Mutex
name string
lastReadTsNs int64
cond *sync.Cond
}
func NewTopicPartitionSubscription(name string) *TopicPartitionSubscription {
t := &TopicPartitionSubscription{
name: name,
}
t.cond = sync.NewCond(t)
return t
}
func (s *TopicPartitionSubscription) Wait() {
s.Mutex.Lock()
s.cond.Wait()
s.Mutex.Unlock()
}
func (s *TopicPartitionSubscription) NotifyOne() {
// notify one waiting goroutine
s.cond.Signal()
}
type TopicPartitionSubscriptions struct {
sync.Mutex
cond *sync.Cond
subscriptions map[string]*TopicPartitionSubscription
subscriptionsLock sync.RWMutex
}
func NewTopicPartitionSubscriptions() *TopicPartitionSubscriptions {
m := &TopicPartitionSubscriptions{
subscriptions: make(map[string]*TopicPartitionSubscription),
}
m.cond = sync.NewCond(m)
return m
}
func (m *TopicPartitionSubscriptions) AddSubscription(subscription string) *TopicPartitionSubscription {
m.subscriptionsLock.Lock()
defer m.subscriptionsLock.Unlock()
if s, found := m.subscriptions[subscription]; found {
return s
}
s := NewTopicPartitionSubscription(subscription)
m.subscriptions[subscription] = s
return s
}
func (m *TopicPartitionSubscriptions) NotifyAll() {
m.subscriptionsLock.RLock()
defer m.subscriptionsLock.RUnlock()
for name, tps := range m.subscriptions {
println("notifying", name)
tps.NotifyOne()
}
}
func (m *TopicPartitionSubscriptions) Wait() {
m.Mutex.Lock()
m.cond.Wait()
for _, tps := range m.subscriptions {
tps.NotifyOne()
}
m.Mutex.Unlock()
}

21
weed/messaging/broker/topic_manager.go

@ -31,23 +31,22 @@ type TopicControl struct {
subscriberCount int subscriberCount int
publisherCount int publisherCount int
logBuffer *log_buffer.LogBuffer logBuffer *log_buffer.LogBuffer
subscriptions *TopicPartitionSubscriptions
} }
type TopicManager struct { type TopicManager struct {
sync.Mutex sync.Mutex
topicCursors map[TopicPartition]*TopicControl
topicControls map[TopicPartition]*TopicControl
broker *MessageBroker broker *MessageBroker
} }
func NewTopicManager(messageBroker *MessageBroker) *TopicManager { func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
return &TopicManager{ return &TopicManager{
topicCursors: make(map[TopicPartition]*TopicControl),
topicControls: make(map[TopicPartition]*TopicControl),
broker: messageBroker, broker: messageBroker,
} }
} }
func (tm *TopicManager) buildLogBuffer(tc *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
flushFn := func(startTime, stopTime time.Time, buf []byte) { flushFn := func(startTime, stopTime time.Time, buf []byte) {
@ -69,7 +68,7 @@ func (tm *TopicManager) buildLogBuffer(tc *TopicControl, tp TopicPartition, topi
} }
} }
logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() { logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() {
tc.subscriptions.NotifyAll()
tl.cond.Broadcast()
}) })
return logBuffer return logBuffer
@ -79,11 +78,11 @@ func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messa
tm.Lock() tm.Lock()
defer tm.Unlock() defer tm.Unlock()
tc, found := tm.topicCursors[partition]
tc, found := tm.topicControls[partition]
if !found { if !found {
tc = &TopicControl{} tc = &TopicControl{}
tm.topicCursors[partition] = tc
tc.subscriptions = NewTopicPartitionSubscriptions()
tc.cond = sync.NewCond(&tc.Mutex)
tm.topicControls[partition] = tc
tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig) tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig)
} }
if isPublisher { if isPublisher {
@ -98,7 +97,7 @@ func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool)
tm.Lock() tm.Lock()
defer tm.Unlock() defer tm.Unlock()
lock, found := tm.topicCursors[partition]
lock, found := tm.topicControls[partition]
if !found { if !found {
return return
} }
@ -108,7 +107,7 @@ func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool)
lock.subscriberCount-- lock.subscriberCount--
} }
if lock.subscriberCount <= 0 && lock.publisherCount <= 0 { if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
delete(tm.topicCursors, partition)
delete(tm.topicControls, partition)
lock.logBuffer.Shutdown() lock.logBuffer.Shutdown()
} }
} }
@ -117,7 +116,7 @@ func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) {
tm.Lock() tm.Lock()
defer tm.Unlock() defer tm.Unlock()
for k := range tm.topicCursors {
for k := range tm.topicControls {
tps = append(tps, k) tps = append(tps, k)
} }
return return

Loading…
Cancel
Save