Browse Source

pub/sub broker only check local assigned partitions

pull/5637/head
chrislu 1 year ago
parent
commit
c77d35313e
  1. 16
      weed/mq/broker/broker_grpc_pub.go
  2. 34
      weed/mq/broker/broker_grpc_sub.go

16
weed/mq/broker/broker_grpc_pub.go

@ -52,10 +52,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
var p topic.Partition var p topic.Partition
if initMessage != nil { if initMessage != nil {
t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
localTopicPartition, err = b.loadLocalTopicPartition(t, p)
if err != nil {
response.Error = fmt.Sprintf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err)
glog.Errorf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err)
localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
if localTopicPartition == nil {
response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
return stream.Send(response) return stream.Send(response)
} }
ackInterval = int(initMessage.AckInterval) ackInterval = int(initMessage.AckInterval)
@ -141,14 +141,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return nil return nil
} }
func (b *MessageQueueBroker) loadLocalTopicPartition(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) {
localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
if localTopicPartition == nil {
localTopicPartition, err = b.loadLocalTopicPartitionFromFiler(t, p)
}
return localTopicPartition, err
}
func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) { func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) {
self := b.option.BrokerAddress() self := b.option.BrokerAddress()

34
weed/mq/broker/broker_grpc_sub.go

@ -24,31 +24,21 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
var localTopicPartition *topic.LocalPartition var localTopicPartition *topic.LocalPartition
localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
for localTopicPartition == nil { for localTopicPartition == nil {
localTopicPartition, err = b.loadLocalTopicPartitionFromFiler(t, partition)
// if not created, return error
if err != nil {
stream.Send(&mq_pb.SubscribeMessageResponse{
Message: &mq_pb.SubscribeMessageResponse_Ctrl{
Ctrl: &mq_pb.SubscribeMessageResponse_CtrlMessage{
Error: fmt.Sprintf("topic %v partition %v not setup: %v", t, partition, err),
},
},
})
time.Sleep(337 * time.Millisecond)
// Check if the client has disconnected by monitoring the context
select {
case <-ctx.Done():
err := ctx.Err()
if err == context.Canceled {
// Client disconnected
return nil
}
glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
time.Sleep(337 * time.Millisecond)
// Check if the client has disconnected by monitoring the context
select {
case <-ctx.Done():
err := ctx.Err()
if err == context.Canceled {
// Client disconnected
return nil return nil
default:
// Continue processing the request
} }
glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
return nil
default:
// Continue processing the request
} }
localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
} }
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber()) localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())

Loading…
Cancel
Save