From 25b2850e7d2f0a3e33568183fcb13c2862cb276b Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 6 Jun 2024 19:44:19 -0700 Subject: [PATCH] refactor out FilerClientAccessor --- .../filer_client_accessor.go | 2 +- weed/mq/broker/broker_server.go | 5 +++-- weed/mq/sub_coordinator/consumer_group.go | 5 +++-- weed/mq/sub_coordinator/sub_coordinator.go | 3 ++- 4 files changed, 9 insertions(+), 6 deletions(-) rename weed/{mq/sub_coordinator => filer_client}/filer_client_accessor.go (98%) diff --git a/weed/mq/sub_coordinator/filer_client_accessor.go b/weed/filer_client/filer_client_accessor.go similarity index 98% rename from weed/mq/sub_coordinator/filer_client_accessor.go rename to weed/filer_client/filer_client_accessor.go index dc50ac128..be70f2b82 100644 --- a/weed/mq/sub_coordinator/filer_client_accessor.go +++ b/weed/filer_client/filer_client_accessor.go @@ -1,4 +1,4 @@ -package sub_coordinator +package filer_client import ( "bytes" diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index f0a1f1254..f002bf31a 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -1,6 +1,7 @@ package broker import ( + "github.com/seaweedfs/seaweedfs/weed/filer_client" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" @@ -47,7 +48,7 @@ type MessageQueueBroker struct { lockAsBalancer *cluster.LiveLock SubCoordinator *sub_coordinator.SubCoordinator accessLock sync.Mutex - fca *sub_coordinator.FilerClientAccessor + fca *filer_client.FilerClientAccessor } func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { @@ -64,7 +65,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial PubBalancer: pubBalancer, SubCoordinator: subCoordinator, } - fca := &sub_coordinator.FilerClientAccessor{ + fca := &filer_client.FilerClientAccessor{ GetFiler: mqBroker.GetFiler, GetGrpcDialOption: mqBroker.GetGrpcDialOption, } diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index 247434288..80a2d5f80 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -3,6 +3,7 @@ package sub_coordinator import ( "fmt" cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/filer_client" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -15,11 +16,11 @@ type ConsumerGroup struct { ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance] Market *Market reBalanceTimer *time.Timer - filerClientAccessor *FilerClientAccessor + filerClientAccessor *filer_client.FilerClientAccessor stopCh chan struct{} } -func NewConsumerGroup(t *mq_pb.Topic, reblanceSeconds int32, filerClientAccessor *FilerClientAccessor) *ConsumerGroup { +func NewConsumerGroup(t *mq_pb.Topic, reblanceSeconds int32, filerClientAccessor *filer_client.FilerClientAccessor) *ConsumerGroup { cg := &ConsumerGroup{ topic: topic.FromPbTopic(t), ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](), diff --git a/weed/mq/sub_coordinator/sub_coordinator.go b/weed/mq/sub_coordinator/sub_coordinator.go index 0042b8e94..47101eee4 100644 --- a/weed/mq/sub_coordinator/sub_coordinator.go +++ b/weed/mq/sub_coordinator/sub_coordinator.go @@ -3,6 +3,7 @@ package sub_coordinator import ( "fmt" cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/filer_client" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) @@ -18,7 +19,7 @@ type TopicConsumerGroups struct { type SubCoordinator struct { // map topic name to consumer groups TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups] - FilerClientAccessor *FilerClientAccessor + FilerClientAccessor *filer_client.FilerClientAccessor } func NewSubCoordinator() *SubCoordinator {