|
|
@ -190,6 +190,10 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa |
|
|
|
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { |
|
|
|
|
|
|
|
if p.Publishers.Size() == 0 && p.Subscribers.Size() == 0 { |
|
|
|
p.LogBuffer.ShutdownLogBuffer() |
|
|
|
for !p.LogBuffer.IsAllFlushed() { |
|
|
|
time.Sleep(113 * time.Millisecond) |
|
|
|
} |
|
|
|
if p.followerStream != nil { |
|
|
|
// send close to the follower
|
|
|
|
if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ |
|
|
@ -205,7 +209,6 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { |
|
|
|
p.follower = "" |
|
|
|
} |
|
|
|
|
|
|
|
p.LogBuffer.ShutdownLogBuffer() |
|
|
|
hasShutdown = true |
|
|
|
} |
|
|
|
|
|
|
@ -219,3 +222,18 @@ func (p *LocalPartition) Shutdown() { |
|
|
|
p.LogBuffer.ShutdownLogBuffer() |
|
|
|
glog.V(0).Infof("local partition %v shutting down", p.Partition) |
|
|
|
} |
|
|
|
|
|
|
|
func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) { |
|
|
|
if p.followerStream != nil { |
|
|
|
if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ |
|
|
|
Message: &mq_pb.PublishFollowMeRequest_Flush{ |
|
|
|
Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{ |
|
|
|
TsNs: flushTsNs, |
|
|
|
}, |
|
|
|
}, |
|
|
|
}); followErr != nil { |
|
|
|
glog.Errorf("send follower %s flush message: %v", p.follower, followErr) |
|
|
|
} |
|
|
|
println("notifying", p.follower, "flushed at", flushTsNs) |
|
|
|
} |
|
|
|
} |