|
|
@ -106,11 +106,9 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis |
|
|
|
clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition) |
|
|
|
localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher()) |
|
|
|
|
|
|
|
var ackSequence int64 |
|
|
|
defer func() { |
|
|
|
// remove the publisher
|
|
|
|
localTopicPartition.Publishers.RemovePublisher(clientName) |
|
|
|
glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size()) |
|
|
|
if localTopicPartition.MaybeShutdownLocalPartition() { |
|
|
|
b.localTopicManager.RemoveTopicPartition(t, p) |
|
|
|
glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) |
|
|
|