|
@ -24,7 +24,6 @@ type PublishClient struct { |
|
|
type TopicPublisher struct { |
|
|
type TopicPublisher struct { |
|
|
namespace string |
|
|
namespace string |
|
|
topic string |
|
|
topic string |
|
|
partition2Broker *interval.SearchTree[*PublishClient, int32] |
|
|
|
|
|
partition2Buffer *interval.SearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage], int32] |
|
|
partition2Buffer *interval.SearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage], int32] |
|
|
grpcDialOption grpc.DialOption |
|
|
grpcDialOption grpc.DialOption |
|
|
sync.Mutex // protects grpc
|
|
|
sync.Mutex // protects grpc
|
|
@ -36,9 +35,6 @@ func NewTopicPublisher(namespace, topic string, config *PublisherConfiguration) |
|
|
return &TopicPublisher{ |
|
|
return &TopicPublisher{ |
|
|
namespace: namespace, |
|
|
namespace: namespace, |
|
|
topic: topic, |
|
|
topic: topic, |
|
|
partition2Broker: interval.NewSearchTree[*PublishClient](func(a, b int32) int { |
|
|
|
|
|
return int(a - b) |
|
|
|
|
|
}), |
|
|
|
|
|
partition2Buffer: interval.NewSearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage]](func(a, b int32) int { |
|
|
partition2Buffer: interval.NewSearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage]](func(a, b int32) int { |
|
|
return int(a - b) |
|
|
return int(a - b) |
|
|
}), |
|
|
}), |
|
@ -49,11 +45,6 @@ func NewTopicPublisher(namespace, topic string, config *PublisherConfiguration) |
|
|
|
|
|
|
|
|
func (p *TopicPublisher) Shutdown() error { |
|
|
func (p *TopicPublisher) Shutdown() error { |
|
|
|
|
|
|
|
|
if clients, found := p.partition2Broker.AllIntersections(0, pub_balancer.MaxPartitionCount); found { |
|
|
|
|
|
for _, client := range clients { |
|
|
|
|
|
client.CloseSend() |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found { |
|
|
if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found { |
|
|
for _, inputBuffer := range inputBuffers { |
|
|
for _, inputBuffer := range inputBuffers { |
|
|
inputBuffer.CloseInput() |
|
|
inputBuffer.CloseInput() |
|
|