diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index ae821bfdd..a2f7c7a3d 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -78,7 +78,7 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo go func(partition *mq_pb.Partition, broker string) { defer wg.Done() defer func() { <-semaphore }() - glog.V(0).Infof("subscriber %s/%s/%s assigned partition %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition) + glog.V(0).Infof("subscriber %s/%s/%s assigned partition %+v at %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker) sub.onEachPartition(partition, broker) }(assigned.Partition, assigned.Broker) } @@ -87,5 +87,4 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo } func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) { - glog.V(0).Infof("subscriber %s/%s/%s processing partition %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition) } diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index 9bd020ad3..dad93dfe5 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -40,19 +40,19 @@ func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance { } } func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) { - cg.onConsumerGroupInstanceChange() + cg.onConsumerGroupInstanceChange("add consumer instance "+ consumerGroupInstance) } func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) { - cg.onConsumerGroupInstanceChange() + cg.onConsumerGroupInstanceChange("remove consumer instance "+ consumerGroupInstance) } -func (cg *ConsumerGroup) onConsumerGroupInstanceChange(){ +func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string){ if cg.reBalanceTimer != nil { cg.reBalanceTimer.Stop() cg.reBalanceTimer = nil } cg.reBalanceTimer = time.AfterFunc(5*time.Second, func() { - cg.RebalanceConsumberGroupInstances() + cg.RebalanceConsumberGroupInstances(reason) cg.reBalanceTimer = nil }) } @@ -61,11 +61,11 @@ func (cg *ConsumerGroup) OnPartitionListChange() { cg.reBalanceTimer.Stop() cg.reBalanceTimer = nil } - cg.RebalanceConsumberGroupInstances() + cg.RebalanceConsumberGroupInstances("partition list change") } -func (cg *ConsumerGroup) RebalanceConsumberGroupInstances() { - println("rebalance...") +func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(reason string) { + println("rebalance due to", reason, "...") now := time.Now().UnixNano() @@ -75,10 +75,6 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances() { glog.V(0).Infof("topic %s not found in balancer", cg.topic.String()) return } - partitions := make([]*topic.Partition, 0) - for _, partitionSlot := range partitionSlotToBrokerList.PartitionSlots { - partitions = append(partitions, topic.NewPartition(partitionSlot.RangeStart, partitionSlot.RangeStop, partitionSlotToBrokerList.RingSize, now)) - } // collect current consumer group instance ids consumerInstanceIds := make([]string, 0) @@ -86,7 +82,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances() { consumerInstanceIds = append(consumerInstanceIds, consumerGroupInstance.InstanceId) } - cg.mapping.BalanceToConsumerInstanceIds(partitions, consumerInstanceIds) + cg.mapping.BalanceToConsumerInstanceIds(partitionSlotToBrokerList, consumerInstanceIds) // convert cg.mapping currentMapping to map of consumer group instance id to partition slots consumerInstanceToPartitionSlots := make(map[string][]*PartitionSlotToConsumerInstance) @@ -110,6 +106,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances() { RingSize: partitionSlotToBrokerList.RingSize, UnixTimeNs: now, }, + Broker: partitionSlot.Broker, } } response := &mq_pb.SubscriberToSubCoordinatorResponse{ diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping.go b/weed/mq/sub_coordinator/partition_consumer_mapping.go index b7e5b12c6..ae2bf1c17 100644 --- a/weed/mq/sub_coordinator/partition_consumer_mapping.go +++ b/weed/mq/sub_coordinator/partition_consumer_mapping.go @@ -2,7 +2,7 @@ package sub_coordinator import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "time" ) @@ -23,19 +23,19 @@ func NewPartitionConsumerMapping(ringSize int32) *PartitionConsumerMapping { // 2. allow one consumer instance to be down unexpectedly // without affecting the processing power utilization -func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitions []*topic.Partition, consumerInstanceIds []string) { - if len(partitions) == 0 || len(consumerInstanceIds) == 0 { +func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, consumerInstanceIds []string) { + if len(partitionSlotToBrokerList.PartitionSlots) == 0 || len(consumerInstanceIds) == 0 { return } newVersion := time.Now().UnixNano() - newMapping := NewPartitionSlotToConsumerInstanceList(partitions[0].RingSize, newVersion) + newMapping := NewPartitionSlotToConsumerInstanceList(partitionSlotToBrokerList.RingSize, newVersion) var prevMapping *PartitionSlotToConsumerInstanceList if len(pcm.prevMappings) > 0 { prevMapping = pcm.prevMappings[len(pcm.prevMappings)-1] } else { prevMapping = nil } - newMapping.PartitionSlots = doBalanceSticky(partitions, consumerInstanceIds, prevMapping) + newMapping.PartitionSlots = doBalanceSticky(partitionSlotToBrokerList.PartitionSlots, consumerInstanceIds, prevMapping) if pcm.currentMapping != nil { pcm.prevMappings = append(pcm.prevMappings, pcm.currentMapping) if len(pcm.prevMappings) > 10 { @@ -45,7 +45,7 @@ func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitions []* pcm.currentMapping = newMapping } -func doBalanceSticky(partitions []*topic.Partition, consumerInstanceIds []string, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) { +func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerInstanceIds []string, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) { // collect previous consumer instance ids prevConsumerInstanceIds := make(map[string]struct{}) if prevMapping != nil { @@ -79,7 +79,14 @@ func doBalanceSticky(partitions []*topic.Partition, consumerInstanceIds []string } // make a copy of old mapping, skipping the deleted consumer instances - newPartitionSlots := ToPartitionSlots(partitions) + newPartitionSlots := make([]*PartitionSlotToConsumerInstance, 0, len(partitions)) + for _, partition := range partitions { + newPartitionSlots = append(newPartitionSlots, &PartitionSlotToConsumerInstance{ + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + Broker: partition.AssignedBroker, + }) + } for _, newPartitionSlot := range newPartitionSlots { key := fmt.Sprintf("%d-%d", newPartitionSlot.RangeStart, newPartitionSlot.RangeStop) if prevPartitionSlot, ok := prevPartitionSlotMap[key]; ok { diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping_test.go b/weed/mq/sub_coordinator/partition_consumer_mapping_test.go index 1d3050ef4..9a9abe011 100644 --- a/weed/mq/sub_coordinator/partition_consumer_mapping_test.go +++ b/weed/mq/sub_coordinator/partition_consumer_mapping_test.go @@ -1,14 +1,14 @@ package sub_coordinator import ( - "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "reflect" "testing" ) func Test_doBalanceSticky(t *testing.T) { type args struct { - partitions []*topic.Partition + partitions []*pub_balancer.PartitionSlotToBroker consumerInstanceIds []string prevMapping *PartitionSlotToConsumerInstanceList } @@ -20,7 +20,7 @@ func Test_doBalanceSticky(t *testing.T) { { name: "1 consumer instance, 1 partition", args: args{ - partitions: []*topic.Partition{ + partitions: []*pub_balancer.PartitionSlotToBroker{ { RangeStart: 0, RangeStop: 100, @@ -40,7 +40,7 @@ func Test_doBalanceSticky(t *testing.T) { { name: "2 consumer instances, 1 partition", args: args{ - partitions: []*topic.Partition{ + partitions: []*pub_balancer.PartitionSlotToBroker{ { RangeStart: 0, RangeStop: 100, @@ -60,7 +60,7 @@ func Test_doBalanceSticky(t *testing.T) { { name: "1 consumer instance, 2 partitions", args: args{ - partitions: []*topic.Partition{ + partitions: []*pub_balancer.PartitionSlotToBroker{ { RangeStart: 0, RangeStop: 50, @@ -89,7 +89,7 @@ func Test_doBalanceSticky(t *testing.T) { { name: "2 consumer instances, 2 partitions", args: args{ - partitions: []*topic.Partition{ + partitions: []*pub_balancer.PartitionSlotToBroker{ { RangeStart: 0, RangeStop: 50, @@ -118,7 +118,7 @@ func Test_doBalanceSticky(t *testing.T) { { name: "2 consumer instances, 2 partitions, 1 deleted consumer instance", args: args{ - partitions: []*topic.Partition{ + partitions: []*pub_balancer.PartitionSlotToBroker{ { RangeStart: 0, RangeStop: 50, @@ -160,7 +160,7 @@ func Test_doBalanceSticky(t *testing.T) { { name: "2 consumer instances, 2 partitions, 1 new consumer instance", args: args{ - partitions: []*topic.Partition{ + partitions: []*pub_balancer.PartitionSlotToBroker{ { RangeStart: 0, RangeStop: 50, @@ -202,7 +202,7 @@ func Test_doBalanceSticky(t *testing.T) { { name: "2 consumer instances, 2 partitions, 1 new partition", args: args{ - partitions: []*topic.Partition{ + partitions: []*pub_balancer.PartitionSlotToBroker{ { RangeStart: 0, RangeStop: 50, @@ -253,7 +253,7 @@ func Test_doBalanceSticky(t *testing.T) { { name: "2 consumer instances, 2 partitions, 1 new partition, 1 new consumer instance", args: args{ - partitions: []*topic.Partition{ + partitions: []*pub_balancer.PartitionSlotToBroker{ { RangeStart: 0, RangeStop: 50, diff --git a/weed/mq/sub_coordinator/partition_list.go b/weed/mq/sub_coordinator/partition_list.go index 1c3123bfc..b559007b5 100644 --- a/weed/mq/sub_coordinator/partition_list.go +++ b/weed/mq/sub_coordinator/partition_list.go @@ -5,6 +5,7 @@ import "github.com/seaweedfs/seaweedfs/weed/mq/topic" type PartitionSlotToConsumerInstance struct { RangeStart int32 RangeStop int32 + Broker string AssignedInstanceId string } @@ -21,16 +22,6 @@ func NewPartitionSlotToConsumerInstanceList(ringSize int32, version int64) *Part } } -func ToPartitionSlots(partitions []*topic.Partition) (partitionSlots []*PartitionSlotToConsumerInstance) { - for _, partition := range partitions { - partitionSlots = append(partitionSlots, &PartitionSlotToConsumerInstance{ - RangeStart: partition.RangeStart, - RangeStop: partition.RangeStop, - }) - } - return -} - func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance, unixTimeNs int64) []*topic.Partition { partitions := make([]*topic.Partition, 0, len(slots)) for _, slot := range slots {