From f07875e8e10cc39e789931c56695a9dda8e884df Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 1 Apr 2024 16:01:26 -0700 Subject: [PATCH] send flush message to follower before shutting down logBuffer --- weed/mq/broker/broker_grpc_pub_follow.go | 4 ++++ .../broker_topic_partition_read_write.go | 10 ++++++++++ weed/mq/topic/local_partition.go | 20 ++++++++++++++++++- weed/util/log_buffer/log_buffer.go | 8 ++++++++ 4 files changed, 41 insertions(+), 1 deletion(-) diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index 3e7977eba..e5488a13a 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -43,6 +43,10 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi } else if closeMessage := req.GetClose(); closeMessage != nil { glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) break + } else if flushMessage := req.GetFlush(); flushMessage != nil { + glog.V(0).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage) + } else { + glog.Errorf("unknown message: %v", req) } } return nil diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go index a058d8da5..50470f879 100644 --- a/weed/mq/broker/broker_topic_partition_read_write.go +++ b/weed/mq/broker/broker_topic_partition_read_write.go @@ -41,6 +41,16 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par } atomic.StoreInt64(&logBuffer.LastFlushTsNs, stopTime.UnixNano()) + + b.accessLock.Lock() + defer b.accessLock.Unlock() + p := topic.FromPbPartition(partition) + if localPartition:=b.localTopicManager.GetLocalPartition(t, p); localPartition!=nil { + localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs) + } + + println("flushing at", logBuffer.LastFlushTsNs, "to", targetFile, "size", len(buf)) + } } diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 895faf596..6e429e5df 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -190,6 +190,10 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { if p.Publishers.Size() == 0 && p.Subscribers.Size() == 0 { + p.LogBuffer.ShutdownLogBuffer() + for !p.LogBuffer.IsAllFlushed() { + time.Sleep(113 * time.Millisecond) + } if p.followerStream != nil { // send close to the follower if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ @@ -205,7 +209,6 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { p.follower = "" } - p.LogBuffer.ShutdownLogBuffer() hasShutdown = true } @@ -219,3 +222,18 @@ func (p *LocalPartition) Shutdown() { p.LogBuffer.ShutdownLogBuffer() glog.V(0).Infof("local partition %v shutting down", p.Partition) } + +func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) { + if p.followerStream != nil { + if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Flush{ + Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{ + TsNs: flushTsNs, + }, + }, + }); followErr != nil { + glog.Errorf("send follower %s flush message: %v", p.follower, followErr) + } + println("notifying", p.follower, "flushed at", flushTsNs) + } +} diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index fa956317e..65d20a757 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -43,6 +43,7 @@ type LogBuffer struct { ReadFromDiskFn LogReadFromDiskFuncType notifyFn func() isStopping *atomic.Bool + isAllFlushed bool flushChan chan *dataToFlush LastTsNs int64 sync.RWMutex @@ -134,6 +135,7 @@ func (logBuffer *LogBuffer) IsStopping() bool { return logBuffer.isStopping.Load() } +// ShutdownLogBuffer flushes the buffer and stops the log buffer func (logBuffer *LogBuffer) ShutdownLogBuffer() { isAlreadyStopped := logBuffer.isStopping.Swap(true) if isAlreadyStopped { @@ -144,6 +146,11 @@ func (logBuffer *LogBuffer) ShutdownLogBuffer() { close(logBuffer.flushChan) } +// IsAllFlushed returns true if all data in the buffer has been flushed, after calling ShutdownLogBuffer(). +func (logBuffer *LogBuffer) IsAllFlushed() bool { + return logBuffer.isAllFlushed +} + func (logBuffer *LogBuffer) loopFlush() { for d := range logBuffer.flushChan { if d != nil { @@ -154,6 +161,7 @@ func (logBuffer *LogBuffer) loopFlush() { logBuffer.lastFlushDataTime = d.stopTime } } + logBuffer.isAllFlushed = true } func (logBuffer *LogBuffer) loopInterval() {