diff --git a/weed/mq/sub_coordinator/market.go b/weed/mq/sub_coordinator/market.go new file mode 100644 index 000000000..48b9ae649 --- /dev/null +++ b/weed/mq/sub_coordinator/market.go @@ -0,0 +1,346 @@ +package sub_coordinator + +import ( + "errors" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "sync" + "time" +) + +/* +Market is a data structure that keeps track of the state of the consumer group instances and the partitions. + +When rebalancing, the market will try to balance the load of the partitions among the consumer group instances. +For each loop, the market will: +* If a consumer group instance has more partitions than the average, it will unassign some partitions. +* If a consumer group instance has less partitions than the average, it will assign some partitions. + +Trigger rebalance when: +* A new consumer group instance is added +* Some partitions are unassigned from a consumer group instance. + +If multiple reblance requests are received, after a certain period, the market will only process the latest request. + +However, if the number of unassigned partition is increased to exactly the total number of partitions, +and total partitions are less than or equal to the sum of the max partition count of all consumer group instances, +the market will process the request immediately. +This is to ensure a partition can be migrated to another consumer group instance as soon as possible. + +Emit these adjustments to the subscriber coordinator: +* Assign a partition to a consumer group instance +* Unassign a partition from a consumer group instance + +Because the adjustment is sent to the subscriber coordinator, the market will keep track of the inflight adjustments. +The subscriber coordinator will send back the response to the market when the adjustment is processed. +If the adjustment is older than a certain time(inflightAdjustmentTTL), it would be considered expired. +Otherwise, the adjustment is considered inflight, so it would be used when calculating the load. + +Later features: +* A consumer group instance is not keeping up with the load. + +Since a coordinator, and thus the market, may be restarted or moved to another node, the market should be able to recover the state from the subscriber coordinator. +The subscriber coordinator should be able to send the current state of the consumer group instances and the partitions to the market. + +*/ + +type PartitionSlot struct { + Partition topic.Partition + AssignedTo *ConsumerGroupInstance // Track the consumer assigned to this partition slot +} + +type Adjustment struct { + isAssign bool + partition topic.Partition + consumer ConsumerGroupInstanceId + ts time.Time +} + +type Market struct { + mu sync.Mutex + partitions map[topic.Partition]*PartitionSlot + consumerInstances map[ConsumerGroupInstanceId]*ConsumerGroupInstance + AdjustmentChan chan *Adjustment + inflightAdjustments []*Adjustment + inflightAdjustmentTTL time.Duration + lastBalancedTime time.Time + stopChan chan struct{} + balanceRequestChan chan struct{} + hasBalanceRequest bool +} + +func NewMarket(partitions []topic.Partition, inflightAdjustmentTTL time.Duration) *Market { + partitionMap := make(map[topic.Partition]*PartitionSlot) + for _, partition := range partitions { + partitionMap[partition] = &PartitionSlot{ + Partition: partition, + } + } + m := &Market{ + partitions: partitionMap, + consumerInstances: make(map[ConsumerGroupInstanceId]*ConsumerGroupInstance), + AdjustmentChan: make(chan *Adjustment, 100), + inflightAdjustmentTTL: inflightAdjustmentTTL, + stopChan: make(chan struct{}), + balanceRequestChan: make(chan struct{}), + } + m.lastBalancedTime = time.Now() + go m.loopBalanceLoad() + + return m +} + +func (m *Market) ShutdownMarket() { + close(m.stopChan) + close(m.AdjustmentChan) +} + +func (m *Market) AddConsumerInstance(consumer *ConsumerGroupInstance) error { + m.mu.Lock() + defer m.mu.Unlock() + + if _, exists := m.consumerInstances[consumer.InstanceId]; exists { + return errors.New("consumer instance already exists") + } + + m.consumerInstances[consumer.InstanceId] = consumer + m.balanceRequestChan <- struct{}{} + + return nil +} + +func (m *Market) RemoveConsumerInstance(consumerId ConsumerGroupInstanceId) error { + m.mu.Lock() + defer m.mu.Unlock() + + consumer, exists := m.consumerInstances[consumerId] + if !exists { + return nil + } + delete(m.consumerInstances, consumerId) + + for _, partition := range consumer.AssignedPartitions { + if partitionSlot, exists := m.partitions[partition]; exists { + partitionSlot.AssignedTo = nil + } + } + m.balanceRequestChan <- struct{}{} + + return nil +} + +func (m *Market) assignPartitionToConsumer(partition *PartitionSlot) { + var bestConsumer *ConsumerGroupInstance + var minLoad = int(^uint(0) >> 1) // Max int value + + inflightConsumerAdjustments := make(map[ConsumerGroupInstanceId]int) + for _, adjustment := range m.inflightAdjustments { + if adjustment.isAssign { + inflightConsumerAdjustments[adjustment.consumer]++ + } else { + inflightConsumerAdjustments[adjustment.consumer]-- + } + } + for _, consumer := range m.consumerInstances { + consumerLoad := len(consumer.AssignedPartitions) + if inflightAdjustments, exists := inflightConsumerAdjustments[consumer.InstanceId]; exists { + consumerLoad += inflightAdjustments + } + fmt.Printf("Consumer %+v has load %d, max %d, min %d\n", consumer.InstanceId, consumerLoad, consumer.MaxPartitionCount, minLoad) + if consumerLoad < int(consumer.MaxPartitionCount) { + if consumerLoad < minLoad { + bestConsumer = consumer + minLoad = consumerLoad + fmt.Printf("picked: Consumer %+v has load %d, max %d, min %d\n", consumer.InstanceId, consumerLoad, consumer.MaxPartitionCount, minLoad) + } + } + } + + if bestConsumer != nil { + adjustment := &Adjustment{ + isAssign: true, + partition: partition.Partition, + consumer: bestConsumer.InstanceId, + ts: time.Now(), + } + m.AdjustmentChan <- adjustment + m.inflightAdjustments = append(m.inflightAdjustments, adjustment) + m.lastBalancedTime = adjustment.ts + } +} + +func (m *Market) loopBalanceLoad() { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if m.hasBalanceRequest { + m.hasBalanceRequest = false + inflightAdjustments := make([]*Adjustment, 0, len(m.inflightAdjustments)) + for _, adjustment := range m.inflightAdjustments { + if adjustment.ts.Add(m.inflightAdjustmentTTL).After(time.Now()) { + inflightAdjustments = append(inflightAdjustments, adjustment) + } + } + m.inflightAdjustments = inflightAdjustments + + m.doBalanceLoad() + } + case <-m.balanceRequestChan: + m.hasBalanceRequest = true + case <-m.stopChan: + return + } + } +} + +// doBalanceLoad will balance the load of the partitions among the consumer group instances. +// It will try to unassign partitions from the consumer group instances that have more partitions than the average. +// It will try to assign partitions to the consumer group instances that have less partitions than the average. +func (m *Market) doBalanceLoad() { + if len(m.consumerInstances) == 0 { + return + } + + // find the average load for all consumers + averageLoad := m.findAverageLoad() + + // find the consumers with the higher load than average + if m.adjustBusyConsumers(averageLoad) { + return + } + + // find partitions with no consumer assigned + m.adjustUnassignedPartitions() +} +func (m *Market) findAverageLoad() (averageLoad float32) { + var totalLoad int + for _, consumer := range m.consumerInstances { + totalLoad += len(consumer.AssignedPartitions) + } + for _, adjustment := range m.inflightAdjustments { + if adjustment.isAssign { + totalLoad++ + } else { + totalLoad-- + } + } + averageLoad = float32(totalLoad) / float32(len(m.consumerInstances)) + return +} + +func (m *Market) adjustBusyConsumers(averageLoad float32) (hasAdjustments bool) { + inflightConsumerAdjustments := make(map[ConsumerGroupInstanceId]int) + for _, adjustment := range m.inflightAdjustments { + if adjustment.isAssign { + inflightConsumerAdjustments[adjustment.consumer]++ + } else { + inflightConsumerAdjustments[adjustment.consumer]-- + } + } + for _, consumer := range m.consumerInstances { + consumerLoad := len(consumer.AssignedPartitions) + if inflightAdjustment, exists := inflightConsumerAdjustments[consumer.InstanceId]; exists { + consumerLoad += inflightAdjustment + } + delta := int(float32(consumerLoad) - averageLoad) + if delta <= 0 { + continue + } + adjustTime := time.Now() + for i := 0; i < delta; i++ { + adjustment := &Adjustment{ + isAssign: false, + partition: consumer.AssignedPartitions[i], + consumer: consumer.InstanceId, + ts: adjustTime, + } + m.AdjustmentChan <- adjustment + m.inflightAdjustments = append(m.inflightAdjustments, adjustment) + m.lastBalancedTime = adjustment.ts + } + hasAdjustments = true + } + return +} + +func (m *Market) adjustUnassignedPartitions() { + inflightPartitionAdjustments := make(map[topic.Partition]bool) + for _, adjustment := range m.inflightAdjustments { + inflightPartitionAdjustments[adjustment.partition] = true + } + for _, partitionSlot := range m.partitions { + if partitionSlot.AssignedTo == nil { + if _, exists := inflightPartitionAdjustments[partitionSlot.Partition]; exists { + continue + } + fmt.Printf("Assigning partition %+v to consumer\n", partitionSlot.Partition) + m.assignPartitionToConsumer(partitionSlot) + } + } +} + +func (m *Market) ConfirmAdjustment(adjustment *Adjustment) { + if adjustment.isAssign { + m.confirmAssignPartition(adjustment.partition, adjustment.consumer) + } else { + m.unassignPartitionSlot(adjustment.partition) + } +} + +func (m *Market) unassignPartitionSlot(partition topic.Partition) { + m.mu.Lock() + defer m.mu.Unlock() + + partitionSlot, exists := m.partitions[partition] + if !exists { + glog.V(0).Infof("partition %+v slot is not tracked", partition) + return + } + + if partitionSlot.AssignedTo == nil { + glog.V(0).Infof("partition %+v slot is not assigned to any consumer", partition) + return + } + + consumer := partitionSlot.AssignedTo + for i, p := range consumer.AssignedPartitions { + if p == partition { + consumer.AssignedPartitions = append(consumer.AssignedPartitions[:i], consumer.AssignedPartitions[i+1:]...) + partitionSlot.AssignedTo = nil + m.balanceRequestChan <- struct{}{} + return + } + } + + glog.V(0).Infof("partition %+v slot not found in assigned consumer", partition) + +} + +func (m *Market) confirmAssignPartition(partition topic.Partition, consumerInstanceId ConsumerGroupInstanceId) { + m.mu.Lock() + defer m.mu.Unlock() + + partitionSlot, exists := m.partitions[partition] + if !exists { + glog.V(0).Infof("partition %+v slot is not tracked", partition) + return + } + + if partitionSlot.AssignedTo != nil { + glog.V(0).Infof("partition %+v slot is already assigned to %+v", partition, partitionSlot.AssignedTo.InstanceId) + return + } + + consumerInstance, exists := m.consumerInstances[consumerInstanceId] + if !exists { + glog.V(0).Infof("consumer %+v is not tracked", consumerInstanceId) + return + } + + partitionSlot.AssignedTo = consumerInstance + consumerInstance.AssignedPartitions = append(consumerInstance.AssignedPartitions, partition) + +} diff --git a/weed/mq/sub_coordinator/market_test.go b/weed/mq/sub_coordinator/market_test.go new file mode 100644 index 000000000..150a88a8d --- /dev/null +++ b/weed/mq/sub_coordinator/market_test.go @@ -0,0 +1,103 @@ +package sub_coordinator + +import ( + "fmt" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/stretchr/testify/assert" +) + +var partitions = []topic.Partition{ + { + RangeStart: 0, + RangeStop: 1, + RingSize: 3, + UnixTimeNs: 0, + }, + { + RangeStart: 1, + RangeStop: 2, + RingSize: 3, + UnixTimeNs: 0, + }, + { + RangeStart: 2, + RangeStop: 3, + RingSize: 3, + UnixTimeNs: 0, + }, +} + +func TestAddConsumerInstance(t *testing.T) { + market := NewMarket(partitions, 10*time.Second) + + consumer := &ConsumerGroupInstance{ + InstanceId: "first", + MaxPartitionCount: 2, + } + err := market.AddConsumerInstance(consumer) + + assert.Nil(t, err) + time.Sleep(1 * time.Second) // Allow time for background rebalancing + market.ShutdownMarket() + for adjustment := range market.AdjustmentChan { + fmt.Printf("%+v\n", adjustment) + } +} + +func TestMultipleConsumerInstances(t *testing.T) { + market := NewMarket(partitions, 10*time.Second) + + market.AddConsumerInstance(&ConsumerGroupInstance{ + InstanceId: "first", + MaxPartitionCount: 2, + }) + market.AddConsumerInstance(&ConsumerGroupInstance{ + InstanceId: "second", + MaxPartitionCount: 2, + }) + market.AddConsumerInstance(&ConsumerGroupInstance{ + InstanceId: "third", + MaxPartitionCount: 2, + }) + + time.Sleep(1 * time.Second) // Allow time for background rebalancing + market.ShutdownMarket() + for adjustment := range market.AdjustmentChan { + fmt.Printf("%+v\n", adjustment) + } +} + +func TestConfirmAdjustment(t *testing.T) { + market := NewMarket(partitions, 1*time.Second) + + market.AddConsumerInstance(&ConsumerGroupInstance{ + InstanceId: "first", + MaxPartitionCount: 2, + }) + market.AddConsumerInstance(&ConsumerGroupInstance{ + InstanceId: "second", + MaxPartitionCount: 2, + }) + market.AddConsumerInstance(&ConsumerGroupInstance{ + InstanceId: "third", + MaxPartitionCount: 2, + }) + + go func() { + time.Sleep(5 * time.Second) // Allow time for background rebalancing + market.ShutdownMarket() + }() + go func() { + time.Sleep(2 * time.Second) + market.RemoveConsumerInstance("third") + }() + + for adjustment := range market.AdjustmentChan { + fmt.Printf("%+v\n", adjustment) + market.ConfirmAdjustment(adjustment) + } + +}