|
|
@ -182,26 +182,26 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa |
|
|
|
} |
|
|
|
|
|
|
|
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { |
|
|
|
if !p.Publishers.IsEmpty() { |
|
|
|
return |
|
|
|
} |
|
|
|
if !p.Subscribers.IsEmpty() { |
|
|
|
return |
|
|
|
} |
|
|
|
p.LogBuffer.ShutdownLogBuffer() |
|
|
|
|
|
|
|
if p.followerStream != nil { |
|
|
|
// send close to the follower
|
|
|
|
if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ |
|
|
|
Message: &mq_pb.PublishFollowMeRequest_Close{ |
|
|
|
Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, |
|
|
|
}, |
|
|
|
}); followErr != nil { |
|
|
|
glog.Errorf("Error closing follower stream: %v", followErr) |
|
|
|
if p.Publishers.IsEmpty() { |
|
|
|
if p.followerStream != nil { |
|
|
|
// send close to the follower
|
|
|
|
if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ |
|
|
|
Message: &mq_pb.PublishFollowMeRequest_Close{ |
|
|
|
Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, |
|
|
|
}, |
|
|
|
}); followErr != nil { |
|
|
|
glog.Errorf("Error closing follower stream: %v", followErr) |
|
|
|
} |
|
|
|
glog.V(4).Infof("closing grpcConnection to follower") |
|
|
|
p.followerGrpcConnection.Close() |
|
|
|
p.followerStream = nil |
|
|
|
} |
|
|
|
glog.V(4).Infof("closing grpcConnection to follower") |
|
|
|
p.followerGrpcConnection.Close() |
|
|
|
p.followerStream = nil |
|
|
|
} |
|
|
|
|
|
|
|
if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() { |
|
|
|
p.LogBuffer.ShutdownLogBuffer() |
|
|
|
hasShutdown = true |
|
|
|
} |
|
|
|
|
|
|
|
return |
|
|
|