From 7d87c1d2bbe3c8847f5b5e5a4d8d8376996dcf8b Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 27 Mar 2024 22:48:37 -0700 Subject: [PATCH] refactor --- weed/mq/broker/broker_grpc_pub.go | 12 ------------ weed/mq/topic/local_partition.go | 18 ++++++++++++++++++ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index f8554ea5b..1253420ac 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -153,18 +153,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis localTopicPartition.Publishers.RemovePublisher(clientName) glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size()) if localTopicPartition.MaybeShutdownLocalPartition() { - if localTopicPartition.FollowerStream != nil { - // send close to the follower - if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ - Message: &mq_pb.PublishFollowMeRequest_Close{ - Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, - }, - }); followErr != nil { - glog.Errorf("Error closing follower stream: %v", followErr) - } - println("closing grpcConnection to follower") - localTopicPartition.FollowerGrpcConnection.Close() - } b.localTopicManager.RemoveTopicPartition(t, p) glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) } diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 34c2903f4..a6562fd5c 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -129,6 +129,24 @@ func (p *LocalPartition) WaitUntilNoPublishers() { } func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { + if p.MaybeShutdownLocalPartition() { + if p.FollowerStream != nil { + // send close to the follower + if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Close{ + Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, + }, + }); followErr != nil { + glog.Errorf("Error closing follower stream: %v", followErr) + } + println("closing grpcConnection to follower") + p.FollowerGrpcConnection.Close() + } + } + return +} + +func (p *LocalPartition) canShutdownLocalPartition() (hasShutdown bool) { if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() { p.LogBuffer.ShutdownLogBuffer() hasShutdown = true