|
@ -96,10 +96,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis |
|
|
} |
|
|
} |
|
|
close(respChan) |
|
|
close(respChan) |
|
|
localTopicPartition.Publishers.RemovePublisher(clientName) |
|
|
localTopicPartition.Publishers.RemovePublisher(clientName) |
|
|
|
|
|
glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size()) |
|
|
if localTopicPartition.MaybeShutdownLocalPartition() { |
|
|
if localTopicPartition.MaybeShutdownLocalPartition() { |
|
|
b.localTopicManager.RemoveTopicPartition(t, p) |
|
|
b.localTopicManager.RemoveTopicPartition(t, p) |
|
|
} |
|
|
} |
|
|
glog.V(0).Infof("topic %v partition %v published %d messges.", initMessage.Topic, initMessage.Partition, ackSequence) |
|
|
|
|
|
}() |
|
|
}() |
|
|
go func() { |
|
|
go func() { |
|
|
for resp := range respChan { |
|
|
for resp := range respChan { |
|
|