From 7598922b416948a970e17b5ee99f43bd8143d7e5 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 29 Feb 2024 14:51:06 -0800 Subject: [PATCH] assign followers --- .../mq/broker/broker_topic_conf_read_write.go | 6 +- weed/mq/pub_balancer/allocate.go | 101 +++++++++++--- weed/mq/pub_balancer/allocate_test.go | 132 ++++++++++++++++++ 3 files changed, 214 insertions(+), 25 deletions(-) diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index dbd0d97c7..4bcb62931 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -88,9 +88,9 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) { // also fix assignee broker if invalid - addedAssignments, updatedAssignments := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, conf.BrokerPartitionAssignments) - if len(addedAssignments) > 0 || len(updatedAssignments) > 0 { - glog.V(0).Infof("topic %v partition assignments added: %v updated: %v", t, addedAssignments, updatedAssignments) + hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, 1, conf.BrokerPartitionAssignments) + if hasChanges { + glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments) if err = b.saveTopicConfToFiler(t.ToPbTopic(), conf); err != nil { return err } diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go index 520f6bcf2..bd1ccb6dd 100644 --- a/weed/mq/pub_balancer/allocate.go +++ b/weed/mq/pub_balancer/allocate.go @@ -38,7 +38,7 @@ func AllocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], p return } -// for now: randomly pick brokers +// randomly pick n brokers, which may contain duplicates // TODO pick brokers based on the broker stats func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32) []string { candidates := make([]string, 0, brokers.Count()) @@ -47,39 +47,96 @@ func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32) } pickedBrokers := make([]string, 0, count) for i := int32(0); i < count; i++ { - p := rand.Int() % len(candidates) - if p < 0 { - p = -p - } + p := rand.Intn(len(candidates)) pickedBrokers = append(pickedBrokers, candidates[p]) } return pickedBrokers } -func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], assignments []*mq_pb.BrokerPartitionAssignment) (addedAssignments, updatedAssignments []*mq_pb.BrokerPartitionAssignment) { - for _, assignment := range assignments { - if assignment.LeaderBroker == "" { - addedAssignments = append(addedAssignments, assignment) +// reservoir sampling select N brokers from the active brokers, with exclusion of the excluded brokers +func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string, excludedBrokers []string) []string { + // convert the excluded brokers to a map + excludedBrokerMap := make(map[string]bool) + for _, broker := range excludedBrokers { + excludedBrokerMap[broker] = true + } + if excludedLeadBroker != "" { + excludedBrokerMap[excludedLeadBroker] = true + } + + pickedBrokers := make([]string, 0, count) + for i, broker := range brokers { + if _, found := excludedBrokerMap[broker]; found { continue } - if _, found := activeBrokers.Get(assignment.LeaderBroker); !found { - updatedAssignments = append(updatedAssignments, assignment) - continue + if len(pickedBrokers) < count { + pickedBrokers = append(pickedBrokers, broker) + } else { + j := rand.Intn(i + 1) + if j < count { + pickedBrokers[j] = broker + } } } - // pick the brokers with the least number of partitions - if len(addedAssignments) > 0 { - pickedBrokers := pickBrokers(activeBrokers, int32(len(addedAssignments))) - for i, assignment := range addedAssignments { - assignment.LeaderBroker = pickedBrokers[i] - } + // shuffle the picked brokers + count = len(pickedBrokers) + for i := 0; i < count; i++ { + j := rand.Intn(count) + pickedBrokers[i], pickedBrokers[j] = pickedBrokers[j], pickedBrokers[i] + } + + return pickedBrokers +} + +func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) { + candidates := make([]string, 0, activeBrokers.Count()) + for brokerStatsItem := range activeBrokers.IterBuffered() { + candidates = append(candidates, brokerStatsItem.Key) } - if len(updatedAssignments) == 0 { - pickedBrokers := pickBrokers(activeBrokers, int32(len(updatedAssignments))) - for i, assignment := range updatedAssignments { - assignment.LeaderBroker = pickedBrokers[i] + + for _, assignment := range assignments { + // count how many brokers are needed + count := 0 + if assignment.LeaderBroker == "" { + count++ + } else if _, found := activeBrokers.Get(assignment.LeaderBroker); !found { + assignment.LeaderBroker = "" + count++ + } + for i:=0; i= len(assignment.FollowerBrokers) { + count++ + continue + } + if assignment.FollowerBrokers[i] == "" { + count++ + } else if _, found := activeBrokers.Get(assignment.FollowerBrokers[i]); !found { + assignment.FollowerBrokers[i] = "" + count++ + } } + + if count > 0 { + pickedBrokers := pickBrokersExcluded(candidates, count, assignment.LeaderBroker, assignment.FollowerBrokers) + i := 0 + if assignment.LeaderBroker == "" { + assignment.LeaderBroker = pickedBrokers[i] + i++ + } + j := 0 + for ; j 0 } return diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go index 3a1598fa0..2f298d3e5 100644 --- a/weed/mq/pub_balancer/allocate_test.go +++ b/weed/mq/pub_balancer/allocate_test.go @@ -1,6 +1,7 @@ package pub_balancer import ( + "fmt" cmap "github.com/orcaman/concurrent-map/v2" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/stretchr/testify/assert" @@ -65,3 +66,134 @@ func testThem(t *testing.T, tests []struct { }) } } + +func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { + type args struct { + activeBrokers cmap.ConcurrentMap[string, *BrokerStats] + followerCount int + assignments []*mq_pb.BrokerPartitionAssignment + } + activeBrokers := cmap.New[*BrokerStats]() + activeBrokers.SetIfAbsent("localhost:1", &BrokerStats{}) + activeBrokers.SetIfAbsent("localhost:2", &BrokerStats{}) + activeBrokers.SetIfAbsent("localhost:3", &BrokerStats{}) + activeBrokers.SetIfAbsent("localhost:4", &BrokerStats{}) + activeBrokers.SetIfAbsent("localhost:5", &BrokerStats{}) + activeBrokers.SetIfAbsent("localhost:6", &BrokerStats{}) + tests := []struct { + name string + args args + hasChanges bool + }{ + { + name: "test empty leader", + args: args{ + activeBrokers: activeBrokers, + followerCount: 1, + assignments: []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "", + Partition: &mq_pb.Partition{}, + FollowerBrokers: []string{ + "localhost:2", + }, + }, + }, + }, + hasChanges: true, + }, + { + name: "test empty follower", + args: args{ + activeBrokers: activeBrokers, + followerCount: 1, + assignments: []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:1", + Partition: &mq_pb.Partition{}, + FollowerBrokers: []string{ + "", + }, + }, + }, + }, + hasChanges: true, + }, + { + name: "test dead follower", + args: args{ + activeBrokers: activeBrokers, + followerCount: 1, + assignments: []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:1", + Partition: &mq_pb.Partition{}, + FollowerBrokers: []string{ + "localhost:200", + }, + }, + }, + }, + hasChanges: true, + }, + { + name: "test dead leader and follower", + args: args{ + activeBrokers: activeBrokers, + followerCount: 1, + assignments: []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:100", + Partition: &mq_pb.Partition{}, + FollowerBrokers: []string{ + "localhost:200", + }, + }, + }, + }, + hasChanges: true, + }, + { + name: "test missing two followers", + args: args{ + activeBrokers: activeBrokers, + followerCount: 3, + assignments: []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:1", + Partition: &mq_pb.Partition{}, + FollowerBrokers: []string{ + "localhost:2", + }, + }, + }, + }, + hasChanges: true, + }, + { + name: "test missing some followers", + args: args{ + activeBrokers: activeBrokers, + followerCount: 10, + assignments: []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:1", + Partition: &mq_pb.Partition{}, + FollowerBrokers: []string{ + "localhost:2", + }, + }, + }, + }, + hasChanges: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fmt.Printf("%v before %v\n", tt.name, tt.args.assignments) + hasChanges := EnsureAssignmentsToActiveBrokers(tt.args.activeBrokers, tt.args.followerCount, tt.args.assignments) + assert.Equalf(t, tt.hasChanges, hasChanges, "EnsureAssignmentsToActiveBrokers(%v, %v, %v)", tt.args.activeBrokers, tt.args.followerCount, tt.args.assignments) + fmt.Printf("%v after %v\n", tt.name, tt.args.assignments) + }) + } +}