Browse Source

notify

mq-subscribe
chrislu 10 months ago
parent
commit
8e5068fd2f
  1. 14
      weed/mq/broker/broker_grpc_sub.go
  2. 19
      weed/mq/topic/local_partition.go

14
weed/mq/broker/broker_grpc_sub.go

@ -175,7 +175,6 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMes
atomic.StoreInt32(&localTopicPartition.FollowerId, followerId)
glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition)
sleepIntervalCount := 0
var counter int64
defer func() {
@ -198,11 +197,12 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMes
var prevFlushTsNs int64
_, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool {
sleepIntervalCount++
if sleepIntervalCount > 32 {
sleepIntervalCount = 32
}
time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
// wait for the log buffer to be ready
localTopicPartition.ListenersLock.Lock()
atomic.AddInt64(&localTopicPartition.ListenersWaits, 1)
localTopicPartition.ListenersCond.Wait()
atomic.AddInt64(&localTopicPartition.ListenersWaits, -1)
localTopicPartition.ListenersLock.Unlock()
if localTopicPartition.LogBuffer.IsStopping() {
newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
@ -246,8 +246,6 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMes
return true
}, func(logEntry *filer_pb.LogEntry) (bool, error) {
// reset the sleep interval count
sleepIntervalCount = 0
// check the follower id
newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)

19
weed/mq/topic/local_partition.go

@ -6,11 +6,18 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"sync"
"sync/atomic"
"time"
)
type LocalPartition struct {
ListenersWaits int64
// notifying clients
ListenersLock sync.Mutex
ListenersCond *sync.Cond
Partition
isLeader bool
FollowerBrokers []pb.ServerAddress
@ -24,15 +31,21 @@ type LocalPartition struct {
var TIME_FORMAT = "2006-01-02-15-04-05"
func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
return &LocalPartition{
lp := &LocalPartition{
Partition: partition,
isLeader: isLeader,
FollowerBrokers: followerBrokers,
LogBuffer: log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
2*time.Minute, logFlushFn, readFromDiskFn, func() {}),
Publishers: NewLocalPartitionPublishers(),
Subscribers: NewLocalPartitionSubscribers(),
}
lp.ListenersCond = sync.NewCond(&lp.ListenersLock)
lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
2*time.Minute, logFlushFn, readFromDiskFn, func() {
if atomic.LoadInt64(&lp.ListenersWaits) > 0 {
lp.ListenersCond.Broadcast()
}
})
return lp
}
func (p *LocalPartition) Publish(message *mq_pb.DataMessage) {

Loading…
Cancel
Save