From 37d1ee562d73dfa8f130fc8b9c0bf88fd3337c16 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 23 May 2024 09:21:48 -0700 Subject: [PATCH] refactor --- weed/mq/sub_coordinator/consumer_group.go | 19 -------------- .../consumer_group_instance.go | 26 +++++++++++++++++++ 2 files changed, 26 insertions(+), 19 deletions(-) create mode 100644 weed/mq/sub_coordinator/consumer_group_instance.go diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index 4f4265e5e..3b8f90fa5 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -1,7 +1,6 @@ package sub_coordinator import ( - "fmt" cmap "github.com/orcaman/concurrent-map/v2" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" @@ -10,18 +9,6 @@ import ( "time" ) -type ConsumerGroupInstance struct { - InstanceId string - // the consumer group instance may not have an active partition - Partitions []*topic.Partition - ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse - MaxPartitionCount int32 -} - -func (i ConsumerGroupInstance) AckUnAssignment(assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) { - fmt.Printf("ack unassignment %v\n", assignment) -} - type ConsumerGroup struct { topic topic.Topic // map a consumer group instance id to a consumer group instance @@ -42,12 +29,6 @@ func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.PubBalancer, fil } } -func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance { - return &ConsumerGroupInstance{ - InstanceId: instanceId, - ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1), - } -} func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic, maxPartitionCount, rebalanceSeconds int32) { cg.onConsumerGroupInstanceChange(true, "add consumer instance "+consumerGroupInstance, maxPartitionCount, rebalanceSeconds) } diff --git a/weed/mq/sub_coordinator/consumer_group_instance.go b/weed/mq/sub_coordinator/consumer_group_instance.go new file mode 100644 index 000000000..3fac28358 --- /dev/null +++ b/weed/mq/sub_coordinator/consumer_group_instance.go @@ -0,0 +1,26 @@ +package sub_coordinator + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +type ConsumerGroupInstance struct { + InstanceId string + // the consumer group instance may not have an active partition + Partitions []*topic.Partition + ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse + MaxPartitionCount int32 +} + +func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance { + return &ConsumerGroupInstance{ + InstanceId: instanceId, + ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1), + } +} + +func (i ConsumerGroupInstance) AckUnAssignment(assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) { + fmt.Printf("ack unassignment %v\n", assignment) +}