From 4f5c4c33882dac37ff87fadce34b9a2592ea4fa2 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 24 Mar 2024 13:07:51 -0700 Subject: [PATCH] refactor --- weed/mq/broker/broker_grpc_sub.go | 15 --------------- weed/mq/broker/broker_topic_conf_read_write.go | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index e07c7b67f..6c03ba409 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -97,21 +97,6 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest }) } -func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) { - // get or generate a local partition - conf, readConfErr := b.readTopicConfFromFiler(t) - if readConfErr != nil { - glog.Errorf("topic %v not found: %v", t, readConfErr) - return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr) - } - localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf) - if getOrGenError != nil { - glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) - return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) - } - return localTopicPartition, nil -} - func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer.MessagePosition) { if offset.StartTsNs != 0 { startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index abe25a240..987c60243 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -56,6 +56,21 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb. return conf, nil } +func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) { + // get or generate a local partition + conf, readConfErr := b.readTopicConfFromFiler(t) + if readConfErr != nil { + glog.Errorf("topic %v not found: %v", t, readConfErr) + return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr) + } + localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf) + if getOrGenError != nil { + glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) + return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) + } + return localTopicPartition, nil +} + func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) { b.accessLock.Lock() defer b.accessLock.Unlock()