From 6e5075e14eba59ffdfd7640cb664b2c4da017221 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 14 May 2024 08:50:17 -0700 Subject: [PATCH] move read write topic config into filer client accessor --- weed/mq/broker/broker_grpc_configure.go | 4 +- weed/mq/broker/broker_grpc_lookup.go | 2 +- weed/mq/broker/broker_server.go | 8 +++ .../mq/broker/broker_topic_conf_read_write.go | 52 +-------------- weed/mq/sub_coordinator/consumer_group.go | 4 +- weed/mq/sub_coordinator/coordinator.go | 3 +- .../sub_coordinator/filer_client_accessor.go | 65 +++++++++++++++++++ 7 files changed, 83 insertions(+), 55 deletions(-) create mode 100644 weed/mq/sub_coordinator/filer_client_accessor.go diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 40ac8df23..b177786f5 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -38,7 +38,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. t := topic.FromPbTopic(request.Topic) var readErr, assignErr error - resp, readErr = b.readTopicConfFromFiler(t) + resp, readErr = b.fca.ReadTopicConfFromFiler(t) if readErr != nil { glog.V(0).Infof("read topic %s conf: %v", request.Topic, readErr) } @@ -68,7 +68,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. resp.RecordType = request.RecordType // save the topic configuration on filer - if err := b.saveTopicConfToFiler(request.Topic, resp); err != nil { + if err := b.fca.SaveTopicConfToFiler(request.Topic, resp); err != nil { return nil, fmt.Errorf("configure topic: %v", err) } diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index 14c1f37da..da2c64dfc 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -26,7 +26,7 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq ret := &mq_pb.LookupTopicBrokersResponse{} conf := &mq_pb.ConfigureTopicResponse{} ret.Topic = request.Topic - if conf, err = b.readTopicConfFromFiler(t); err != nil { + if conf, err = b.fca.ReadTopicConfFromFiler(t); err != nil { glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err) } else { err = b.ensureTopicActiveAssignments(t, conf) diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 9c321744b..510c03558 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -47,6 +47,7 @@ type MessageQueueBroker struct { lockAsBalancer *cluster.LiveLock Coordinator *sub_coordinator.Coordinator accessLock sync.Mutex + fca *sub_coordinator.FilerClientAccessor } func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { @@ -63,6 +64,13 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial Balancer: pub_broker_balancer, Coordinator: coordinator, } + fca := &sub_coordinator.FilerClientAccessor{ + GetFilerFn: mqBroker.GetFiler, + GrpcDialOption: grpcDialOption, + } + mqBroker.fca = fca + coordinator.FilerClientAccessor = fca + mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate) pub_broker_balancer.OnPartitionChange = mqBroker.Coordinator.OnPartitionChange pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnSubAddBroker diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index cddd6cf1c..211473dad 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -1,64 +1,16 @@ package broker import ( - "bytes" "fmt" - "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - jsonpb "google.golang.org/protobuf/encoding/protojson" ) -func (b *MessageQueueBroker) saveTopicConfToFiler(t *mq_pb.Topic, conf *mq_pb.ConfigureTopicResponse) error { - - glog.V(0).Infof("save conf for topic %v to filer", t) - - // save the topic configuration on filer - topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) - if err := b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - var buf bytes.Buffer - filer.ProtoToText(&buf, conf) - return filer.SaveInsideFiler(client, topicDir, "topic.conf", buf.Bytes()) - }); err != nil { - return fmt.Errorf("save topic to %s: %v", topicDir, err) - } - return nil -} - -// readTopicConfFromFiler reads the topic configuration from filer -// this should only be run in broker leader, to ensure correct active broker list. -func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) { - - glog.V(0).Infof("load conf for topic %v from filer", t) - - topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) - if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") - if err == filer_pb.ErrNotFound { - return err - } - if err != nil { - return fmt.Errorf("read topic.conf of %v: %v", t, err) - } - // parse into filer conf object - conf = &mq_pb.ConfigureTopicResponse{} - if err = jsonpb.Unmarshal(data, conf); err != nil { - return fmt.Errorf("unmarshal topic %v conf: %v", t, err) - } - return nil - }); err != nil { - return nil, err - } - - return conf, nil -} - func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) { // get or generate a local partition - conf, readConfErr := b.readTopicConfFromFiler(t) + conf, readConfErr := b.fca.ReadTopicConfFromFiler(t) if readConfErr != nil { glog.Errorf("topic %v not found: %v", t, readConfErr) return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr) @@ -103,7 +55,7 @@ func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *m hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, 1, conf.BrokerPartitionAssignments) if hasChanges { glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments) - if err = b.saveTopicConfToFiler(t.ToPbTopic(), conf); err != nil { + if err = b.fca.SaveTopicConfToFiler(t.ToPbTopic(), conf); err != nil { return err } } diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index 020f459b6..be87b4105 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -23,14 +23,16 @@ type ConsumerGroup struct { mapping *PartitionConsumerMapping reBalanceTimer *time.Timer pubBalancer *pub_balancer.Balancer + filerClientAccessor *FilerClientAccessor } -func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.Balancer) *ConsumerGroup { +func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.Balancer, filerClientAccessor *FilerClientAccessor) *ConsumerGroup { return &ConsumerGroup{ topic: topic.FromPbTopic(t), ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](), mapping: NewPartitionConsumerMapping(pub_balancer.MaxPartitionCount), pubBalancer: pubBalancer, + filerClientAccessor: filerClientAccessor, } } diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go index 62c44fd48..a27391da6 100644 --- a/weed/mq/sub_coordinator/coordinator.go +++ b/weed/mq/sub_coordinator/coordinator.go @@ -19,6 +19,7 @@ type Coordinator struct { // map topic name to consumer groups TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups] balancer *pub_balancer.Balancer + FilerClientAccessor *FilerClientAccessor } func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator { @@ -55,7 +56,7 @@ func (c *Coordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinato tcg := c.GetTopicConsumerGroups(initMessage.Topic, true) cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup) if cg == nil { - cg = NewConsumerGroup(initMessage.Topic, c.balancer) + cg = NewConsumerGroup(initMessage.Topic, c.balancer, c.FilerClientAccessor) if !tcg.ConsumerGroups.SetIfAbsent(initMessage.ConsumerGroup, cg) { cg, _ = tcg.ConsumerGroups.Get(initMessage.ConsumerGroup) } diff --git a/weed/mq/sub_coordinator/filer_client_accessor.go b/weed/mq/sub_coordinator/filer_client_accessor.go new file mode 100644 index 000000000..85bb5e29d --- /dev/null +++ b/weed/mq/sub_coordinator/filer_client_accessor.go @@ -0,0 +1,65 @@ +package sub_coordinator + +import ( + "bytes" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc" + jsonpb "google.golang.org/protobuf/encoding/protojson" +) + +type FilerClientAccessor struct { + GetFiler func() pb.ServerAddress + GetGrpcDialOption func()grpc.DialOption +} + +func (fca *FilerClientAccessor) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(streamingMode, 0, fca.GetFiler(), fca.GetGrpcDialOption(), fn) +} + +func (fca *FilerClientAccessor) SaveTopicConfToFiler(t *mq_pb.Topic, conf *mq_pb.ConfigureTopicResponse) error { + + glog.V(0).Infof("save conf for topic %v to filer", t) + + // save the topic configuration on filer + topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) + if err := fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + var buf bytes.Buffer + filer.ProtoToText(&buf, conf) + return filer.SaveInsideFiler(client, topicDir, "topic.conf", buf.Bytes()) + }); err != nil { + return fmt.Errorf("save topic to %s: %v", topicDir, err) + } + return nil +} + +func (fca *FilerClientAccessor) ReadTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) { + + glog.V(0).Infof("load conf for topic %v from filer", t) + + topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) + if err = fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") + if err == filer_pb.ErrNotFound { + return err + } + if err != nil { + return fmt.Errorf("read topic.conf of %v: %v", t, err) + } + // parse into filer conf object + conf = &mq_pb.ConfigureTopicResponse{} + if err = jsonpb.Unmarshal(data, conf); err != nil { + return fmt.Errorf("unmarshal topic %v conf: %v", t, err) + } + return nil + }); err != nil { + return nil, err + } + + return conf, nil +}