Browse Source

passing broker into the assignments

pull/5637/head
chrislu 1 year ago
parent
commit
2845230329
  1. 3
      weed/mq/client/sub_client/connect_to_sub_coordinator.go
  2. 21
      weed/mq/sub_coordinator/consumer_group.go
  3. 21
      weed/mq/sub_coordinator/partition_consumer_mapping.go
  4. 20
      weed/mq/sub_coordinator/partition_consumer_mapping_test.go
  5. 11
      weed/mq/sub_coordinator/partition_list.go

3
weed/mq/client/sub_client/connect_to_sub_coordinator.go

@ -78,7 +78,7 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo
go func(partition *mq_pb.Partition, broker string) {
defer wg.Done()
defer func() { <-semaphore }()
glog.V(0).Infof("subscriber %s/%s/%s assigned partition %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition)
glog.V(0).Infof("subscriber %s/%s/%s assigned partition %+v at %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker)
sub.onEachPartition(partition, broker)
}(assigned.Partition, assigned.Broker)
}
@ -87,5 +87,4 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo
}
func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) {
glog.V(0).Infof("subscriber %s/%s/%s processing partition %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition)
}

21
weed/mq/sub_coordinator/consumer_group.go

@ -40,19 +40,19 @@ func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance {
}
}
func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
cg.onConsumerGroupInstanceChange()
cg.onConsumerGroupInstanceChange("add consumer instance "+ consumerGroupInstance)
}
func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
cg.onConsumerGroupInstanceChange()
cg.onConsumerGroupInstanceChange("remove consumer instance "+ consumerGroupInstance)
}
func (cg *ConsumerGroup) onConsumerGroupInstanceChange(){
func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string){
if cg.reBalanceTimer != nil {
cg.reBalanceTimer.Stop()
cg.reBalanceTimer = nil
}
cg.reBalanceTimer = time.AfterFunc(5*time.Second, func() {
cg.RebalanceConsumberGroupInstances()
cg.RebalanceConsumberGroupInstances(reason)
cg.reBalanceTimer = nil
})
}
@ -61,11 +61,11 @@ func (cg *ConsumerGroup) OnPartitionListChange() {
cg.reBalanceTimer.Stop()
cg.reBalanceTimer = nil
}
cg.RebalanceConsumberGroupInstances()
cg.RebalanceConsumberGroupInstances("partition list change")
}
func (cg *ConsumerGroup) RebalanceConsumberGroupInstances() {
println("rebalance...")
func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(reason string) {
println("rebalance due to", reason, "...")
now := time.Now().UnixNano()
@ -75,10 +75,6 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances() {
glog.V(0).Infof("topic %s not found in balancer", cg.topic.String())
return
}
partitions := make([]*topic.Partition, 0)
for _, partitionSlot := range partitionSlotToBrokerList.PartitionSlots {
partitions = append(partitions, topic.NewPartition(partitionSlot.RangeStart, partitionSlot.RangeStop, partitionSlotToBrokerList.RingSize, now))
}
// collect current consumer group instance ids
consumerInstanceIds := make([]string, 0)
@ -86,7 +82,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances() {
consumerInstanceIds = append(consumerInstanceIds, consumerGroupInstance.InstanceId)
}
cg.mapping.BalanceToConsumerInstanceIds(partitions, consumerInstanceIds)
cg.mapping.BalanceToConsumerInstanceIds(partitionSlotToBrokerList, consumerInstanceIds)
// convert cg.mapping currentMapping to map of consumer group instance id to partition slots
consumerInstanceToPartitionSlots := make(map[string][]*PartitionSlotToConsumerInstance)
@ -110,6 +106,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances() {
RingSize: partitionSlotToBrokerList.RingSize,
UnixTimeNs: now,
},
Broker: partitionSlot.Broker,
}
}
response := &mq_pb.SubscriberToSubCoordinatorResponse{

21
weed/mq/sub_coordinator/partition_consumer_mapping.go

@ -2,7 +2,7 @@ package sub_coordinator
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"time"
)
@ -23,19 +23,19 @@ func NewPartitionConsumerMapping(ringSize int32) *PartitionConsumerMapping {
// 2. allow one consumer instance to be down unexpectedly
// without affecting the processing power utilization
func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitions []*topic.Partition, consumerInstanceIds []string) {
if len(partitions) == 0 || len(consumerInstanceIds) == 0 {
func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, consumerInstanceIds []string) {
if len(partitionSlotToBrokerList.PartitionSlots) == 0 || len(consumerInstanceIds) == 0 {
return
}
newVersion := time.Now().UnixNano()
newMapping := NewPartitionSlotToConsumerInstanceList(partitions[0].RingSize, newVersion)
newMapping := NewPartitionSlotToConsumerInstanceList(partitionSlotToBrokerList.RingSize, newVersion)
var prevMapping *PartitionSlotToConsumerInstanceList
if len(pcm.prevMappings) > 0 {
prevMapping = pcm.prevMappings[len(pcm.prevMappings)-1]
} else {
prevMapping = nil
}
newMapping.PartitionSlots = doBalanceSticky(partitions, consumerInstanceIds, prevMapping)
newMapping.PartitionSlots = doBalanceSticky(partitionSlotToBrokerList.PartitionSlots, consumerInstanceIds, prevMapping)
if pcm.currentMapping != nil {
pcm.prevMappings = append(pcm.prevMappings, pcm.currentMapping)
if len(pcm.prevMappings) > 10 {
@ -45,7 +45,7 @@ func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitions []*
pcm.currentMapping = newMapping
}
func doBalanceSticky(partitions []*topic.Partition, consumerInstanceIds []string, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) {
func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerInstanceIds []string, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) {
// collect previous consumer instance ids
prevConsumerInstanceIds := make(map[string]struct{})
if prevMapping != nil {
@ -79,7 +79,14 @@ func doBalanceSticky(partitions []*topic.Partition, consumerInstanceIds []string
}
// make a copy of old mapping, skipping the deleted consumer instances
newPartitionSlots := ToPartitionSlots(partitions)
newPartitionSlots := make([]*PartitionSlotToConsumerInstance, 0, len(partitions))
for _, partition := range partitions {
newPartitionSlots = append(newPartitionSlots, &PartitionSlotToConsumerInstance{
RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop,
Broker: partition.AssignedBroker,
})
}
for _, newPartitionSlot := range newPartitionSlots {
key := fmt.Sprintf("%d-%d", newPartitionSlot.RangeStart, newPartitionSlot.RangeStop)
if prevPartitionSlot, ok := prevPartitionSlotMap[key]; ok {

20
weed/mq/sub_coordinator/partition_consumer_mapping_test.go

@ -1,14 +1,14 @@
package sub_coordinator
import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"reflect"
"testing"
)
func Test_doBalanceSticky(t *testing.T) {
type args struct {
partitions []*topic.Partition
partitions []*pub_balancer.PartitionSlotToBroker
consumerInstanceIds []string
prevMapping *PartitionSlotToConsumerInstanceList
}
@ -20,7 +20,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "1 consumer instance, 1 partition",
args: args{
partitions: []*topic.Partition{
partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 100,
@ -40,7 +40,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "2 consumer instances, 1 partition",
args: args{
partitions: []*topic.Partition{
partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 100,
@ -60,7 +60,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "1 consumer instance, 2 partitions",
args: args{
partitions: []*topic.Partition{
partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 50,
@ -89,7 +89,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "2 consumer instances, 2 partitions",
args: args{
partitions: []*topic.Partition{
partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 50,
@ -118,7 +118,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "2 consumer instances, 2 partitions, 1 deleted consumer instance",
args: args{
partitions: []*topic.Partition{
partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 50,
@ -160,7 +160,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "2 consumer instances, 2 partitions, 1 new consumer instance",
args: args{
partitions: []*topic.Partition{
partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 50,
@ -202,7 +202,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "2 consumer instances, 2 partitions, 1 new partition",
args: args{
partitions: []*topic.Partition{
partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 50,
@ -253,7 +253,7 @@ func Test_doBalanceSticky(t *testing.T) {
{
name: "2 consumer instances, 2 partitions, 1 new partition, 1 new consumer instance",
args: args{
partitions: []*topic.Partition{
partitions: []*pub_balancer.PartitionSlotToBroker{
{
RangeStart: 0,
RangeStop: 50,

11
weed/mq/sub_coordinator/partition_list.go

@ -5,6 +5,7 @@ import "github.com/seaweedfs/seaweedfs/weed/mq/topic"
type PartitionSlotToConsumerInstance struct {
RangeStart int32
RangeStop int32
Broker string
AssignedInstanceId string
}
@ -21,16 +22,6 @@ func NewPartitionSlotToConsumerInstanceList(ringSize int32, version int64) *Part
}
}
func ToPartitionSlots(partitions []*topic.Partition) (partitionSlots []*PartitionSlotToConsumerInstance) {
for _, partition := range partitions {
partitionSlots = append(partitionSlots, &PartitionSlotToConsumerInstance{
RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop,
})
}
return
}
func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance, unixTimeNs int64) []*topic.Partition {
partitions := make([]*topic.Partition, 0, len(slots))
for _, slot := range slots {

Loading…
Cancel
Save