From 96b326a304062093f1437c9c9800e06984034aee Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 26 May 2024 14:09:51 -0700 Subject: [PATCH] use type ConsumerGroupInstanceId --- weed/mq/sub_coordinator/consumer_group.go | 3 +-- weed/mq/sub_coordinator/consumer_group_instance.go | 13 +++++++------ .../sub_coordinator/partition_consumer_mapping.go | 8 ++++---- weed/mq/sub_coordinator/partition_list.go | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index 3b8f90fa5..51dea5572 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -106,7 +106,7 @@ func (cg *ConsumerGroup) BalanceConsumerGroupInstances(knownPartitionSlotToBroke cg.mapping.BalanceToConsumerInstances(partitionSlotToBrokerList, consumerInstances) // convert cg.mapping currentMapping to map of consumer group instance id to partition slots - consumerInstanceToPartitionSlots := make(map[string][]*PartitionSlotToConsumerInstance) + consumerInstanceToPartitionSlots := make(map[ConsumerGroupInstanceId][]*PartitionSlotToConsumerInstance) for _, partitionSlot := range cg.mapping.currentMapping.PartitionSlots { consumerInstanceToPartitionSlots[partitionSlot.AssignedInstanceId] = append(consumerInstanceToPartitionSlots[partitionSlot.AssignedInstanceId], partitionSlot) } @@ -117,7 +117,6 @@ func (cg *ConsumerGroup) BalanceConsumerGroupInstances(knownPartitionSlotToBroke if !found { partitionSlots = make([]*PartitionSlotToConsumerInstance, 0) } - consumerGroupInstance.Partitions = ToPartitions(partitionSlotToBrokerList.RingSize, partitionSlots) for _, partitionSlot := range partitionSlots { consumerGroupInstance.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{ Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{ diff --git a/weed/mq/sub_coordinator/consumer_group_instance.go b/weed/mq/sub_coordinator/consumer_group_instance.go index 3fac28358..d688ccc19 100644 --- a/weed/mq/sub_coordinator/consumer_group_instance.go +++ b/weed/mq/sub_coordinator/consumer_group_instance.go @@ -6,17 +6,18 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) +type ConsumerGroupInstanceId string + type ConsumerGroupInstance struct { - InstanceId string - // the consumer group instance may not have an active partition - Partitions []*topic.Partition - ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse - MaxPartitionCount int32 + InstanceId ConsumerGroupInstanceId + AssignedPartitions []topic.Partition + ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse + MaxPartitionCount int32 } func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance { return &ConsumerGroupInstance{ - InstanceId: instanceId, + InstanceId: ConsumerGroupInstanceId(instanceId), ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1), } } diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping.go b/weed/mq/sub_coordinator/partition_consumer_mapping.go index 23e0fb00f..5d1cf158a 100644 --- a/weed/mq/sub_coordinator/partition_consumer_mapping.go +++ b/weed/mq/sub_coordinator/partition_consumer_mapping.go @@ -47,7 +47,7 @@ func (pcm *PartitionConsumerMapping) BalanceToConsumerInstances(partitionSlotToB func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerInstances []*ConsumerGroupInstance, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) { // collect previous consumer instance ids - prevConsumerInstanceIds := make(map[string]struct{}) + prevConsumerInstanceIds := make(map[ConsumerGroupInstanceId]struct{}) if prevMapping != nil { for _, prevPartitionSlot := range prevMapping.PartitionSlots { if prevPartitionSlot.AssignedInstanceId != "" { @@ -56,13 +56,13 @@ func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerI } } // collect current consumer instance ids - currConsumerInstanceIds := make(map[string]struct{}) + currConsumerInstanceIds := make(map[ConsumerGroupInstanceId]struct{}) for _, consumerInstance := range consumerInstances { currConsumerInstanceIds[consumerInstance.InstanceId] = struct{}{} } // check deleted consumer instances - deletedConsumerInstanceIds := make(map[string]struct{}) + deletedConsumerInstanceIds := make(map[ConsumerGroupInstanceId]struct{}) for consumerInstanceId := range prevConsumerInstanceIds { if _, ok := currConsumerInstanceIds[consumerInstanceId]; !ok { deletedConsumerInstanceIds[consumerInstanceId] = struct{}{} @@ -100,7 +100,7 @@ func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerI // for all consumer instances, count the average number of partitions // that are assigned to them - consumerInstancePartitionCount := make(map[string]int) + consumerInstancePartitionCount := make(map[ConsumerGroupInstanceId]int) for _, newPartitionSlot := range newPartitionSlots { if newPartitionSlot.AssignedInstanceId != "" { consumerInstancePartitionCount[newPartitionSlot.AssignedInstanceId]++ diff --git a/weed/mq/sub_coordinator/partition_list.go b/weed/mq/sub_coordinator/partition_list.go index 5dccdffbf..384c1b875 100644 --- a/weed/mq/sub_coordinator/partition_list.go +++ b/weed/mq/sub_coordinator/partition_list.go @@ -7,7 +7,7 @@ type PartitionSlotToConsumerInstance struct { RangeStop int32 UnixTimeNs int64 Broker string - AssignedInstanceId string + AssignedInstanceId ConsumerGroupInstanceId FollowerBroker string }