diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 8b9970f20..a9f861710 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -182,26 +182,26 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa } func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { - if !p.Publishers.IsEmpty() { - return - } - if !p.Subscribers.IsEmpty() { - return - } - p.LogBuffer.ShutdownLogBuffer() - if p.followerStream != nil { - // send close to the follower - if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ - Message: &mq_pb.PublishFollowMeRequest_Close{ - Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, - }, - }); followErr != nil { - glog.Errorf("Error closing follower stream: %v", followErr) + if p.Publishers.IsEmpty() { + if p.followerStream != nil { + // send close to the follower + if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Close{ + Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, + }, + }); followErr != nil { + glog.Errorf("Error closing follower stream: %v", followErr) + } + glog.V(4).Infof("closing grpcConnection to follower") + p.followerGrpcConnection.Close() + p.followerStream = nil } - glog.V(4).Infof("closing grpcConnection to follower") - p.followerGrpcConnection.Close() - p.followerStream = nil + } + + if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() { + p.LogBuffer.ShutdownLogBuffer() + hasShutdown = true } return