From c77d35313e8376798bb6f0dd63bf748a42d84370 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 22 Jan 2024 00:49:57 -0800 Subject: [PATCH] pub/sub broker only check local assigned partitions --- weed/mq/broker/broker_grpc_pub.go | 16 ++++----------- weed/mq/broker/broker_grpc_sub.go | 34 +++++++++++-------------------- 2 files changed, 16 insertions(+), 34 deletions(-) diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index f37629b81..ad878a88c 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -52,10 +52,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis var p topic.Partition if initMessage != nil { t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) - localTopicPartition, err = b.loadLocalTopicPartition(t, p) - if err != nil { - response.Error = fmt.Sprintf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err) - glog.Errorf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err) + localTopicPartition = b.localTopicManager.GetTopicPartition(t, p) + if localTopicPartition == nil { + response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) + glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) return stream.Send(response) } ackInterval = int(initMessage.AckInterval) @@ -141,14 +141,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis return nil } -func (b *MessageQueueBroker) loadLocalTopicPartition(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) { - localTopicPartition = b.localTopicManager.GetTopicPartition(t, p) - if localTopicPartition == nil { - localTopicPartition, err = b.loadLocalTopicPartitionFromFiler(t, p) - } - return localTopicPartition, err -} - func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) { self := b.option.BrokerAddress() diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 2f4af3be9..72101ba86 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -24,31 +24,21 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest var localTopicPartition *topic.LocalPartition localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) for localTopicPartition == nil { - localTopicPartition, err = b.loadLocalTopicPartitionFromFiler(t, partition) - // if not created, return error - if err != nil { - stream.Send(&mq_pb.SubscribeMessageResponse{ - Message: &mq_pb.SubscribeMessageResponse_Ctrl{ - Ctrl: &mq_pb.SubscribeMessageResponse_CtrlMessage{ - Error: fmt.Sprintf("topic %v partition %v not setup: %v", t, partition, err), - }, - }, - }) - time.Sleep(337 * time.Millisecond) - // Check if the client has disconnected by monitoring the context - select { - case <-ctx.Done(): - err := ctx.Err() - if err == context.Canceled { - // Client disconnected - return nil - } - glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) + time.Sleep(337 * time.Millisecond) + // Check if the client has disconnected by monitoring the context + select { + case <-ctx.Done(): + err := ctx.Err() + if err == context.Canceled { + // Client disconnected return nil - default: - // Continue processing the request } + glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) + return nil + default: + // Continue processing the request } + localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) } localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())