Browse Source

refactor

mq-subscribe
chrislu 10 months ago
parent
commit
7d87c1d2bb
  1. 12
      weed/mq/broker/broker_grpc_pub.go
  2. 18
      weed/mq/topic/local_partition.go

12
weed/mq/broker/broker_grpc_pub.go

@ -153,18 +153,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
localTopicPartition.Publishers.RemovePublisher(clientName) localTopicPartition.Publishers.RemovePublisher(clientName)
glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size()) glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size())
if localTopicPartition.MaybeShutdownLocalPartition() { if localTopicPartition.MaybeShutdownLocalPartition() {
if localTopicPartition.FollowerStream != nil {
// send close to the follower
if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Close{
Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
},
}); followErr != nil {
glog.Errorf("Error closing follower stream: %v", followErr)
}
println("closing grpcConnection to follower")
localTopicPartition.FollowerGrpcConnection.Close()
}
b.localTopicManager.RemoveTopicPartition(t, p) b.localTopicManager.RemoveTopicPartition(t, p)
glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
} }

18
weed/mq/topic/local_partition.go

@ -129,6 +129,24 @@ func (p *LocalPartition) WaitUntilNoPublishers() {
} }
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
if p.MaybeShutdownLocalPartition() {
if p.FollowerStream != nil {
// send close to the follower
if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Close{
Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
},
}); followErr != nil {
glog.Errorf("Error closing follower stream: %v", followErr)
}
println("closing grpcConnection to follower")
p.FollowerGrpcConnection.Close()
}
}
return
}
func (p *LocalPartition) canShutdownLocalPartition() (hasShutdown bool) {
if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() { if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() {
p.LogBuffer.ShutdownLogBuffer() p.LogBuffer.ShutdownLogBuffer()
hasShutdown = true hasShutdown = true

Loading…
Cancel
Save