diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 371697529..bff0ec17a 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -125,7 +125,7 @@ func (p *LocalPartition) closeSubscribers() { func (p *LocalPartition) WaitUntilNoPublishers() { for { - if p.Publishers.IsEmpty() { + if p.Publishers.Size() == 0 { return } time.Sleep(113 * time.Millisecond) @@ -183,7 +183,7 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { - if p.Publishers.IsEmpty() { + if p.Publishers.Size() == 0 { if p.followerStream != nil { // send close to the follower if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ @@ -196,10 +196,11 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { glog.V(4).Infof("closing grpcConnection to follower") p.followerGrpcConnection.Close() p.followerStream = nil + p.follower = "" } } - if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() { + if p.Publishers.Size() == 0 && p.Subscribers.Size() == 0 { p.LogBuffer.ShutdownLogBuffer() hasShutdown = true } diff --git a/weed/mq/topic/local_partition_publishers.go b/weed/mq/topic/local_partition_publishers.go index c12f66336..e3c4e3ca6 100644 --- a/weed/mq/topic/local_partition_publishers.go +++ b/weed/mq/topic/local_partition_publishers.go @@ -44,13 +44,6 @@ func (p *LocalPartitionPublishers) SignalShutdown() { } } -func (p *LocalPartitionPublishers) IsEmpty() bool { - p.publishersLock.RLock() - defer p.publishersLock.RUnlock() - - return len(p.publishers) == 0 -} - func (p *LocalPartitionPublishers) Size() int { p.publishersLock.RLock() defer p.publishersLock.RUnlock() diff --git a/weed/mq/topic/local_partition_subscribers.go b/weed/mq/topic/local_partition_subscribers.go index d3b989d72..24341ce7e 100644 --- a/weed/mq/topic/local_partition_subscribers.go +++ b/weed/mq/topic/local_partition_subscribers.go @@ -48,13 +48,6 @@ func (p *LocalPartitionSubscribers) SignalShutdown() { } } -func (p *LocalPartitionSubscribers) IsEmpty() bool { - p.SubscribersLock.RLock() - defer p.SubscribersLock.RUnlock() - - return len(p.Subscribers) == 0 -} - func (p *LocalPartitionSubscribers) Size() int { p.SubscribersLock.RLock() defer p.SubscribersLock.RUnlock()