diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index dcc621c4c..9ba67f250 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -1,7 +1,6 @@ package broker import ( - "bytes" "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -50,13 +49,8 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. ret.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount) // save the topic configuration on filer - topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, request.Topic.Namespace, request.Topic.Name) - if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - var buf bytes.Buffer - filer.ProtoToText(&buf, ret) - return filer.SaveInsideFiler(client, topicDir, "topic.conf", buf.Bytes()) - }); err != nil { - return nil, fmt.Errorf("create topic %s: %v", topicDir, err) + if err := b.saveTopicConfToFiler(request.Topic, ret); err != nil { + return nil, fmt.Errorf("configure topic: %v", err) } b.Balancer.OnPartitionChange(request.Topic, ret.BrokerPartitionAssignments) diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 5686f8dd9..f37629b81 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -3,13 +3,10 @@ package broker import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/peer" - jsonpb "google.golang.org/protobuf/encoding/protojson" "math/rand" "net" "sync/atomic" @@ -180,28 +177,6 @@ func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p t return localTopicPartition, nil } -func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) { - - glog.V(0).Infof("load conf for topic %v from filer", t) - - topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) - if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") - if err != nil { - return fmt.Errorf("read topic %v partition %v conf: %v", t, err) - } - // parse into filer conf object - conf = &mq_pb.ConfigureTopicResponse{} - if err = jsonpb.Unmarshal(data, conf); err != nil { - return fmt.Errorf("unmarshal topic %v conf: %v", t, err) - } - return nil - }); err != nil { - return nil, err - } - return conf, err -} - // duplicated from master_grpc_server.go func findClientAddress(ctx context.Context) string { // fmt.Printf("FromContext %+v\n", ctx) diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go new file mode 100644 index 000000000..3294aa5cb --- /dev/null +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -0,0 +1,50 @@ +package broker + +import ( + "bytes" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + jsonpb "google.golang.org/protobuf/encoding/protojson" +) + +func (b *MessageQueueBroker) saveTopicConfToFiler(t *mq_pb.Topic, conf *mq_pb.ConfigureTopicResponse) error { + + glog.V(0).Infof("save conf for topic %v to filer", t) + + // save the topic configuration on filer + topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) + if err := b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + var buf bytes.Buffer + filer.ProtoToText(&buf, conf) + return filer.SaveInsideFiler(client, topicDir, "topic.conf", buf.Bytes()) + }); err != nil { + return fmt.Errorf("save topic to %s: %v", topicDir, err) + } + return nil +} + +func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) { + + glog.V(0).Infof("load conf for topic %v from filer", t) + + topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) + if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") + if err != nil { + return fmt.Errorf("read topic %v partition %v conf: %v", t, err) + } + // parse into filer conf object + conf = &mq_pb.ConfigureTopicResponse{} + if err = jsonpb.Unmarshal(data, conf); err != nil { + return fmt.Errorf("unmarshal topic %v conf: %v", t, err) + } + return nil + }); err != nil { + return nil, err + } + return conf, err +}