Browse Source

refactor

pull/5890/head
chrislu 9 months ago
parent
commit
ac63f2b5a1
  1. 6
      weed/mq/sub_coordinator/consumer_group.go
  2. 28
      weed/mq/sub_coordinator/partition_consumer_mapping.go
  3. 90
      weed/mq/sub_coordinator/partition_consumer_mapping_test.go

6
weed/mq/sub_coordinator/consumer_group.go

@ -84,12 +84,12 @@ func (cg *ConsumerGroup) BalanceConsumerGroupInstances(knownPartitionSlotToBroke
} }
// collect current consumer group instance ids // collect current consumer group instance ids
var consumerInstanceIds []string
var consumerInstances []*ConsumerGroupInstance
for _, consumerGroupInstance := range cg.ConsumerGroupInstances.Items() { for _, consumerGroupInstance := range cg.ConsumerGroupInstances.Items() {
consumerInstanceIds = append(consumerInstanceIds, consumerGroupInstance.InstanceId)
consumerInstances = append(consumerInstances, consumerGroupInstance)
} }
cg.mapping.BalanceToConsumerInstanceIds(partitionSlotToBrokerList, consumerInstanceIds)
cg.mapping.BalanceToConsumerInstances(partitionSlotToBrokerList, consumerInstances)
// convert cg.mapping currentMapping to map of consumer group instance id to partition slots // convert cg.mapping currentMapping to map of consumer group instance id to partition slots
consumerInstanceToPartitionSlots := make(map[string][]*PartitionSlotToConsumerInstance) consumerInstanceToPartitionSlots := make(map[string][]*PartitionSlotToConsumerInstance)

28
weed/mq/sub_coordinator/partition_consumer_mapping.go

@ -23,8 +23,8 @@ func NewPartitionConsumerMapping(ringSize int32) *PartitionConsumerMapping {
// 2. allow one consumer instance to be down unexpectedly // 2. allow one consumer instance to be down unexpectedly
// without affecting the processing power utilization // without affecting the processing power utilization
func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, consumerInstanceIds []string) {
if len(partitionSlotToBrokerList.PartitionSlots) == 0 || len(consumerInstanceIds) == 0 {
func (pcm *PartitionConsumerMapping) BalanceToConsumerInstances(partitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, consumerInstances []*ConsumerGroupInstance) {
if len(partitionSlotToBrokerList.PartitionSlots) == 0 || len(consumerInstances) == 0 {
return return
} }
newVersion := time.Now().UnixNano() newVersion := time.Now().UnixNano()
@ -35,7 +35,7 @@ func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitionSlotT
} else { } else {
prevMapping = nil prevMapping = nil
} }
newMapping.PartitionSlots = doBalanceSticky(partitionSlotToBrokerList.PartitionSlots, consumerInstanceIds, prevMapping)
newMapping.PartitionSlots = doBalanceSticky(partitionSlotToBrokerList.PartitionSlots, consumerInstances, prevMapping)
if pcm.currentMapping != nil { if pcm.currentMapping != nil {
pcm.prevMappings = append(pcm.prevMappings, pcm.currentMapping) pcm.prevMappings = append(pcm.prevMappings, pcm.currentMapping)
if len(pcm.prevMappings) > 10 { if len(pcm.prevMappings) > 10 {
@ -45,7 +45,7 @@ func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitionSlotT
pcm.currentMapping = newMapping pcm.currentMapping = newMapping
} }
func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerInstanceIds []string, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) {
func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerInstances []*ConsumerGroupInstance, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) {
// collect previous consumer instance ids // collect previous consumer instance ids
prevConsumerInstanceIds := make(map[string]struct{}) prevConsumerInstanceIds := make(map[string]struct{})
if prevMapping != nil { if prevMapping != nil {
@ -57,8 +57,8 @@ func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerI
} }
// collect current consumer instance ids // collect current consumer instance ids
currConsumerInstanceIds := make(map[string]struct{}) currConsumerInstanceIds := make(map[string]struct{})
for _, consumerInstanceId := range consumerInstanceIds {
currConsumerInstanceIds[consumerInstanceId] = struct{}{}
for _, consumerInstance := range consumerInstances {
currConsumerInstanceIds[consumerInstance.InstanceId] = struct{}{}
} }
// check deleted consumer instances // check deleted consumer instances
@ -106,25 +106,25 @@ func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerI
} }
} }
// average number of partitions that are assigned to each consumer instance // average number of partitions that are assigned to each consumer instance
averageConsumerInstanceLoad := float32(len(partitions)) / float32(len(consumerInstanceIds))
averageConsumerInstanceLoad := float32(len(partitions)) / float32(len(consumerInstances))
// assign unassigned partition slots to consumer instances that is underloaded // assign unassigned partition slots to consumer instances that is underloaded
consumerInstanceIdsIndex := 0 consumerInstanceIdsIndex := 0
for _, newPartitionSlot := range newPartitionSlots { for _, newPartitionSlot := range newPartitionSlots {
if newPartitionSlot.AssignedInstanceId == "" { if newPartitionSlot.AssignedInstanceId == "" {
for avoidDeadLoop := len(consumerInstanceIds); avoidDeadLoop > 0; avoidDeadLoop-- {
consumerInstanceId := consumerInstanceIds[consumerInstanceIdsIndex]
if float32(consumerInstancePartitionCount[consumerInstanceId]) < averageConsumerInstanceLoad {
newPartitionSlot.AssignedInstanceId = consumerInstanceId
consumerInstancePartitionCount[consumerInstanceId]++
for avoidDeadLoop := len(consumerInstances); avoidDeadLoop > 0; avoidDeadLoop-- {
consumerInstance := consumerInstances[consumerInstanceIdsIndex]
if float32(consumerInstancePartitionCount[consumerInstance.InstanceId]) < averageConsumerInstanceLoad {
newPartitionSlot.AssignedInstanceId = consumerInstance.InstanceId
consumerInstancePartitionCount[consumerInstance.InstanceId]++
consumerInstanceIdsIndex++ consumerInstanceIdsIndex++
if consumerInstanceIdsIndex >= len(consumerInstanceIds) {
if consumerInstanceIdsIndex >= len(consumerInstances) {
consumerInstanceIdsIndex = 0 consumerInstanceIdsIndex = 0
} }
break break
} else { } else {
consumerInstanceIdsIndex++ consumerInstanceIdsIndex++
if consumerInstanceIdsIndex >= len(consumerInstanceIds) {
if consumerInstanceIdsIndex >= len(consumerInstances) {
consumerInstanceIdsIndex = 0 consumerInstanceIdsIndex = 0
} }
} }

90
weed/mq/sub_coordinator/partition_consumer_mapping_test.go

@ -9,7 +9,7 @@ import (
func Test_doBalanceSticky(t *testing.T) { func Test_doBalanceSticky(t *testing.T) {
type args struct { type args struct {
partitions []*pub_balancer.PartitionSlotToBroker partitions []*pub_balancer.PartitionSlotToBroker
consumerInstanceIds []string
consumerInstanceIds []*ConsumerGroupInstance
prevMapping *PartitionSlotToConsumerInstanceList prevMapping *PartitionSlotToConsumerInstanceList
} }
tests := []struct { tests := []struct {
@ -26,7 +26,12 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 100, RangeStop: 100,
}, },
}, },
consumerInstanceIds: []string{"consumer-instance-1"},
consumerInstanceIds: []*ConsumerGroupInstance{
{
InstanceId: "consumer-instance-1",
MaxPartitionCount: 1,
},
},
prevMapping: nil, prevMapping: nil,
}, },
wantPartitionSlots: []*PartitionSlotToConsumerInstance{ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
@ -46,7 +51,16 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 100, RangeStop: 100,
}, },
}, },
consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
consumerInstanceIds: []*ConsumerGroupInstance{
{
InstanceId: "consumer-instance-1",
MaxPartitionCount: 1,
},
{
InstanceId: "consumer-instance-2",
MaxPartitionCount: 1,
},
},
prevMapping: nil, prevMapping: nil,
}, },
wantPartitionSlots: []*PartitionSlotToConsumerInstance{ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
@ -70,7 +84,12 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 100, RangeStop: 100,
}, },
}, },
consumerInstanceIds: []string{"consumer-instance-1"},
consumerInstanceIds: []*ConsumerGroupInstance{
{
InstanceId: "consumer-instance-1",
MaxPartitionCount: 1,
},
},
prevMapping: nil, prevMapping: nil,
}, },
wantPartitionSlots: []*PartitionSlotToConsumerInstance{ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
@ -99,7 +118,16 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 100, RangeStop: 100,
}, },
}, },
consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
consumerInstanceIds: []*ConsumerGroupInstance{
{
InstanceId: "consumer-instance-1",
MaxPartitionCount: 1,
},
{
InstanceId: "consumer-instance-2",
MaxPartitionCount: 1,
},
},
prevMapping: nil, prevMapping: nil,
}, },
wantPartitionSlots: []*PartitionSlotToConsumerInstance{ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
@ -128,7 +156,16 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 100, RangeStop: 100,
}, },
}, },
consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
consumerInstanceIds: []*ConsumerGroupInstance{
{
InstanceId: "consumer-instance-1",
MaxPartitionCount: 1,
},
{
InstanceId: "consumer-instance-2",
MaxPartitionCount: 1,
},
},
prevMapping: &PartitionSlotToConsumerInstanceList{ prevMapping: &PartitionSlotToConsumerInstanceList{
PartitionSlots: []*PartitionSlotToConsumerInstance{ PartitionSlots: []*PartitionSlotToConsumerInstance{
{ {
@ -170,7 +207,20 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 100, RangeStop: 100,
}, },
}, },
consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2", "consumer-instance-3"},
consumerInstanceIds: []*ConsumerGroupInstance{
{
InstanceId: "consumer-instance-1",
MaxPartitionCount: 1,
},
{
InstanceId: "consumer-instance-2",
MaxPartitionCount: 1,
},
{
InstanceId: "consumer-instance-3",
MaxPartitionCount: 1,
},
},
prevMapping: &PartitionSlotToConsumerInstanceList{ prevMapping: &PartitionSlotToConsumerInstanceList{
PartitionSlots: []*PartitionSlotToConsumerInstance{ PartitionSlots: []*PartitionSlotToConsumerInstance{
{ {
@ -216,7 +266,16 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 150, RangeStop: 150,
}, },
}, },
consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
consumerInstanceIds: []*ConsumerGroupInstance{
{
InstanceId: "consumer-instance-1",
MaxPartitionCount: 1,
},
{
InstanceId: "consumer-instance-2",
MaxPartitionCount: 1,
},
},
prevMapping: &PartitionSlotToConsumerInstanceList{ prevMapping: &PartitionSlotToConsumerInstanceList{
PartitionSlots: []*PartitionSlotToConsumerInstance{ PartitionSlots: []*PartitionSlotToConsumerInstance{
{ {
@ -267,7 +326,20 @@ func Test_doBalanceSticky(t *testing.T) {
RangeStop: 150, RangeStop: 150,
}, },
}, },
consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2", "consumer-instance-3"},
consumerInstanceIds: []*ConsumerGroupInstance{
{
InstanceId: "consumer-instance-1",
MaxPartitionCount: 1,
},
{
InstanceId: "consumer-instance-2",
MaxPartitionCount: 1,
},
{
InstanceId: "consumer-instance-3",
MaxPartitionCount: 1,
},
},
prevMapping: &PartitionSlotToConsumerInstanceList{ prevMapping: &PartitionSlotToConsumerInstanceList{
PartitionSlots: []*PartitionSlotToConsumerInstance{ PartitionSlots: []*PartitionSlotToConsumerInstance{
{ {

Loading…
Cancel
Save