|
@ -125,7 +125,7 @@ func (p *LocalPartition) closeSubscribers() { |
|
|
|
|
|
|
|
|
func (p *LocalPartition) WaitUntilNoPublishers() { |
|
|
func (p *LocalPartition) WaitUntilNoPublishers() { |
|
|
for { |
|
|
for { |
|
|
if p.Publishers.IsEmpty() { |
|
|
|
|
|
|
|
|
if p.Publishers.Size() == 0 { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
time.Sleep(113 * time.Millisecond) |
|
|
time.Sleep(113 * time.Millisecond) |
|
@ -183,7 +183,7 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa |
|
|
|
|
|
|
|
|
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { |
|
|
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { |
|
|
|
|
|
|
|
|
if p.Publishers.IsEmpty() { |
|
|
|
|
|
|
|
|
if p.Publishers.Size() == 0 { |
|
|
if p.followerStream != nil { |
|
|
if p.followerStream != nil { |
|
|
// send close to the follower
|
|
|
// send close to the follower
|
|
|
if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ |
|
|
if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ |
|
@ -196,10 +196,11 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { |
|
|
glog.V(4).Infof("closing grpcConnection to follower") |
|
|
glog.V(4).Infof("closing grpcConnection to follower") |
|
|
p.followerGrpcConnection.Close() |
|
|
p.followerGrpcConnection.Close() |
|
|
p.followerStream = nil |
|
|
p.followerStream = nil |
|
|
|
|
|
p.follower = "" |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() { |
|
|
|
|
|
|
|
|
if p.Publishers.Size() == 0 && p.Subscribers.Size() == 0 { |
|
|
p.LogBuffer.ShutdownLogBuffer() |
|
|
p.LogBuffer.ShutdownLogBuffer() |
|
|
hasShutdown = true |
|
|
hasShutdown = true |
|
|
} |
|
|
} |
|
|