From c616081ab9d802b8b87ee9cbf2c7142b3bc5f4b2 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 17 Jan 2024 22:29:59 -0800 Subject: [PATCH] refactor --- weed/mq/broker/broker_grpc_pub.go | 38 +++++++++++++++++++------------ 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index e8238a5f7..3b585f6f6 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -149,26 +149,17 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) { + self := b.option.BrokerAddress() + glog.V(0).Infof("broker %s load topic %v partition %v", self, t, p) + // load local topic partition from configuration on filer if not found var conf *mq_pb.ConfigureTopicResponse - topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) - if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") - if err != nil { - return fmt.Errorf("read topic %v partition %v conf: %v", t, p, err) - } - // parse into filer conf object - conf = &mq_pb.ConfigureTopicResponse{} - if err = jsonpb.Unmarshal(data, conf); err != nil { - return fmt.Errorf("unmarshal topic %v partition %v conf: %v", t, p, err) - } - return nil - }); err != nil { + conf, err = b.readTopicConfFromFiler(t, p) + if err != nil { return nil, err } // create local topic partition - self := b.option.BrokerAddress() var hasCreated bool for _, assignment := range conf.BrokerPartitionAssignments { if assignment.LeaderBroker == string(self) && p.Equals(topic.FromPbPartition(assignment.Partition)) { @@ -186,6 +177,25 @@ func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p t return localTopicPartition, nil } +func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic, p topic.Partition) (conf *mq_pb.ConfigureTopicResponse, err error) { + topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) + if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") + if err != nil { + return fmt.Errorf("read topic %v partition %v conf: %v", t, p, err) + } + // parse into filer conf object + conf = &mq_pb.ConfigureTopicResponse{} + if err = jsonpb.Unmarshal(data, conf); err != nil { + return fmt.Errorf("unmarshal topic %v partition %v conf: %v", t, p, err) + } + return nil + }); err != nil { + return nil, err + } + return conf, err +} + // duplicated from master_grpc_server.go func findClientAddress(ctx context.Context) string { // fmt.Printf("FromContext %+v\n", ctx)