|
@ -179,28 +179,28 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { |
|
|
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { |
|
|
if p.canShutdownLocalPartition() { |
|
|
|
|
|
if p.FollowerStream != nil { |
|
|
|
|
|
// send close to the follower
|
|
|
|
|
|
if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ |
|
|
|
|
|
Message: &mq_pb.PublishFollowMeRequest_Close{ |
|
|
|
|
|
Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, |
|
|
|
|
|
}, |
|
|
|
|
|
}); followErr != nil { |
|
|
|
|
|
glog.Errorf("Error closing follower stream: %v", followErr) |
|
|
|
|
|
} |
|
|
|
|
|
println("closing grpcConnection to follower") |
|
|
|
|
|
p.FollowerGrpcConnection.Close() |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
if !p.Publishers.IsEmpty() { |
|
|
|
|
|
return |
|
|
} |
|
|
} |
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
if !p.Subscribers.IsEmpty() { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
p.LogBuffer.ShutdownLogBuffer() |
|
|
|
|
|
|
|
|
func (p *LocalPartition) canShutdownLocalPartition() (hasShutdown bool) { |
|
|
|
|
|
if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() { |
|
|
|
|
|
p.LogBuffer.ShutdownLogBuffer() |
|
|
|
|
|
hasShutdown = true |
|
|
|
|
|
|
|
|
if p.FollowerStream != nil { |
|
|
|
|
|
// send close to the follower
|
|
|
|
|
|
if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ |
|
|
|
|
|
Message: &mq_pb.PublishFollowMeRequest_Close{ |
|
|
|
|
|
Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, |
|
|
|
|
|
}, |
|
|
|
|
|
}); followErr != nil { |
|
|
|
|
|
glog.Errorf("Error closing follower stream: %v", followErr) |
|
|
|
|
|
} |
|
|
|
|
|
glog.V(4).Infof("closing grpcConnection to follower") |
|
|
|
|
|
p.FollowerGrpcConnection.Close() |
|
|
|
|
|
p.FollowerStream = nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|