From 8e5068fd2fe6e4cc8af23d5a8a50101a657af0c5 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 16 Mar 2024 23:16:33 -0700 Subject: [PATCH] notify --- weed/mq/broker/broker_grpc_sub.go | 14 ++++++-------- weed/mq/topic/local_partition.go | 19 ++++++++++++++++--- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 940af7490..846342596 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -175,7 +175,6 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMes atomic.StoreInt32(&localTopicPartition.FollowerId, followerId) glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition) - sleepIntervalCount := 0 var counter int64 defer func() { @@ -198,11 +197,12 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMes var prevFlushTsNs int64 _, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool { - sleepIntervalCount++ - if sleepIntervalCount > 32 { - sleepIntervalCount = 32 - } - time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond) + // wait for the log buffer to be ready + localTopicPartition.ListenersLock.Lock() + atomic.AddInt64(&localTopicPartition.ListenersWaits, 1) + localTopicPartition.ListenersCond.Wait() + atomic.AddInt64(&localTopicPartition.ListenersWaits, -1) + localTopicPartition.ListenersLock.Unlock() if localTopicPartition.LogBuffer.IsStopping() { newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId) @@ -246,8 +246,6 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMes return true }, func(logEntry *filer_pb.LogEntry) (bool, error) { - // reset the sleep interval count - sleepIntervalCount = 0 // check the follower id newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId) diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 798949736..145b1a450 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -6,11 +6,18 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "sync" "sync/atomic" "time" ) type LocalPartition struct { + ListenersWaits int64 + + // notifying clients + ListenersLock sync.Mutex + ListenersCond *sync.Cond + Partition isLeader bool FollowerBrokers []pb.ServerAddress @@ -24,15 +31,21 @@ type LocalPartition struct { var TIME_FORMAT = "2006-01-02-15-04-05" func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { - return &LocalPartition{ + lp := &LocalPartition{ Partition: partition, isLeader: isLeader, FollowerBrokers: followerBrokers, - LogBuffer: log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop), - 2*time.Minute, logFlushFn, readFromDiskFn, func() {}), Publishers: NewLocalPartitionPublishers(), Subscribers: NewLocalPartitionSubscribers(), } + lp.ListenersCond = sync.NewCond(&lp.ListenersLock) + lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop), + 2*time.Minute, logFlushFn, readFromDiskFn, func() { + if atomic.LoadInt64(&lp.ListenersWaits) > 0 { + lp.ListenersCond.Broadcast() + } + }) + return lp } func (p *LocalPartition) Publish(message *mq_pb.DataMessage) {