chrislu
1 year ago
21 changed files with 290 additions and 199 deletions
-
12.github/workflows/container_dev.yml
-
12.github/workflows/container_latest.yml
-
10.github/workflows/container_release1.yml
-
10.github/workflows/container_release2.yml
-
10.github/workflows/container_release3.yml
-
10.github/workflows/container_release4.yml
-
10.github/workflows/container_release5.yml
-
39go.mod
-
73go.sum
-
2weed/cluster/lock_manager/lock_manager.go
-
7weed/mount/dirty_pages_chunked.go
-
20weed/mq/balancer/allocate.go
-
62weed/mq/balancer/allocate_test.go
-
59weed/mq/balancer/balancer.go
-
43weed/mq/balancer/lookup.go
-
4weed/mq/broker/broker_grpc_admin.go
-
81weed/mq/topic_allocation/allocation.go
-
7weed/server/filer_server.go
-
12weed/server/filer_server_handlers.go
-
2weed/server/filer_server_handlers_write.go
-
4weed/storage/store_ec_delete.go
@ -0,0 +1,20 @@ |
|||
package balancer |
|||
|
|||
import ( |
|||
cmap "github.com/orcaman/concurrent-map/v2" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|||
) |
|||
|
|||
func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int) (assignments []*mq_pb.BrokerPartitionAssignment) { |
|||
return []*mq_pb.BrokerPartitionAssignment{ |
|||
{ |
|||
LeaderBroker: "localhost:17777", |
|||
FollowerBrokers: []string{"localhost:17777"}, |
|||
Partition: &mq_pb.Partition{ |
|||
RingSize: MaxPartitionCount, |
|||
RangeStart: 0, |
|||
RangeStop: MaxPartitionCount, |
|||
}, |
|||
}, |
|||
} |
|||
} |
@ -0,0 +1,62 @@ |
|||
package balancer |
|||
|
|||
import ( |
|||
cmap "github.com/orcaman/concurrent-map/v2" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|||
"reflect" |
|||
"testing" |
|||
) |
|||
|
|||
func Test_allocateOneBroker(t *testing.T) { |
|||
brokers := cmap.New[*BrokerStats]() |
|||
brokers.SetIfAbsent("localhost:17777", &BrokerStats{ |
|||
TopicPartitionCount: 0, |
|||
ConsumerCount: 0, |
|||
CpuUsagePercent: 0, |
|||
}) |
|||
|
|||
tests := []struct { |
|||
name string |
|||
args args |
|||
wantAssignments []*mq_pb.BrokerPartitionAssignment |
|||
}{ |
|||
{ |
|||
name: "test only one broker", |
|||
args: args{ |
|||
brokers: brokers, |
|||
partitionCount: 6, |
|||
}, |
|||
wantAssignments: []*mq_pb.BrokerPartitionAssignment{ |
|||
{ |
|||
LeaderBroker: "localhost:17777", |
|||
FollowerBrokers: []string{"localhost:17777"}, |
|||
Partition: &mq_pb.Partition{ |
|||
RingSize: MaxPartitionCount, |
|||
RangeStart: 0, |
|||
RangeStop: MaxPartitionCount, |
|||
}, |
|||
}, |
|||
}, |
|||
}, |
|||
} |
|||
testThem(t, tests) |
|||
} |
|||
|
|||
type args struct { |
|||
brokers cmap.ConcurrentMap[string, *BrokerStats] |
|||
partitionCount int |
|||
} |
|||
|
|||
func testThem(t *testing.T, tests []struct { |
|||
name string |
|||
args args |
|||
wantAssignments []*mq_pb.BrokerPartitionAssignment |
|||
}) { |
|||
for _, tt := range tests { |
|||
t.Run(tt.name, func(t *testing.T) { |
|||
if gotAssignments := allocateTopicPartitions(tt.args.brokers, tt.args.partitionCount); !reflect.DeepEqual(gotAssignments, tt.wantAssignments) { |
|||
t.Errorf("allocateTopicPartitions() = %v, want %v", gotAssignments, tt.wantAssignments) |
|||
} |
|||
}) |
|||
} |
|||
} |
@ -0,0 +1,43 @@ |
|||
package balancer |
|||
|
|||
import ( |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|||
) |
|||
|
|||
func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) (assignments []*mq_pb.BrokerPartitionAssignment, err error) { |
|||
// find existing topic partition assignments
|
|||
for brokerStatsItem := range b.Brokers.IterBuffered() { |
|||
broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val |
|||
for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() { |
|||
topicPartitionStat := topicPartitionStatsItem.Val |
|||
if topicPartitionStat.TopicPartition.Namespace == topic.Namespace && |
|||
topicPartitionStat.TopicPartition.Topic == topic.Name { |
|||
assignment := &mq_pb.BrokerPartitionAssignment{ |
|||
Partition: &mq_pb.Partition{ |
|||
RingSize: MaxPartitionCount, |
|||
RangeStart: topicPartitionStat.RangeStart, |
|||
RangeStop: topicPartitionStat.RangeStop, |
|||
}, |
|||
} |
|||
if topicPartitionStat.IsLeader { |
|||
assignment.LeaderBroker = broker |
|||
} else { |
|||
assignment.FollowerBrokers = append(assignment.FollowerBrokers, broker) |
|||
} |
|||
assignments = append(assignments, assignment) |
|||
} |
|||
} |
|||
} |
|||
if len(assignments) > 0 { |
|||
return assignments, nil |
|||
} |
|||
|
|||
// find the topic partitions on the filer
|
|||
// if the topic is not found
|
|||
// if the request is_for_publish
|
|||
// create the topic
|
|||
// if the request is_for_subscribe
|
|||
// return error not found
|
|||
// t := topic.FromPbTopic(request.Topic)
|
|||
return allocateTopicPartitions(b.Brokers, 6), nil |
|||
} |
@ -1,81 +0,0 @@ |
|||
package topic_allocation |
|||
|
|||
import ( |
|||
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|||
"modernc.org/mathutil" |
|||
) |
|||
|
|||
const ( |
|||
DefaultBrokerCount = 4 |
|||
) |
|||
|
|||
// AllocateBrokersForTopicPartitions allocate brokers for a topic's all partitions
|
|||
func AllocateBrokersForTopicPartitions(t topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment, candidateBrokers []pb.ServerAddress) (assignment *mq_pb.TopicPartitionsAssignment, err error) { |
|||
// create a previous assignment if not exists
|
|||
if prevAssignment == nil || len(prevAssignment.BrokerPartitions) == 0 { |
|||
prevAssignment = &mq_pb.TopicPartitionsAssignment{ |
|||
PartitionCount: topic.PartitionCount, |
|||
} |
|||
partitionCountForEachBroker := topic.PartitionCount / DefaultBrokerCount |
|||
for i := 0; i < DefaultBrokerCount; i++ { |
|||
prevAssignment.BrokerPartitions = append(prevAssignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{ |
|||
PartitionStart: int32(i * partitionCountForEachBroker), |
|||
PartitionStop: mathutil.MaxInt32(int32((i+1)*partitionCountForEachBroker), topic.PartitionCount), |
|||
}) |
|||
} |
|||
} |
|||
|
|||
// create a new assignment
|
|||
assignment = &mq_pb.TopicPartitionsAssignment{ |
|||
PartitionCount: prevAssignment.PartitionCount, |
|||
} |
|||
|
|||
// allocate partitions for each partition range
|
|||
for _, brokerPartition := range prevAssignment.BrokerPartitions { |
|||
// allocate partitions for each partition range
|
|||
leader, followers, err := allocateBrokersForOneTopicPartition(t, brokerPartition, candidateBrokers) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
followerBrokers := make([]string, len(followers)) |
|||
for i, follower := range followers { |
|||
followerBrokers[i] = string(follower) |
|||
} |
|||
|
|||
assignment.BrokerPartitions = append(assignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{ |
|||
PartitionStart: brokerPartition.PartitionStart, |
|||
PartitionStop: brokerPartition.PartitionStop, |
|||
LeaderBroker: string(leader), |
|||
FollowerBrokers: followerBrokers, |
|||
}) |
|||
} |
|||
|
|||
return |
|||
} |
|||
|
|||
func allocateBrokersForOneTopicPartition(t topic.Topic, brokerPartition *mq_pb.BrokerPartitionsAssignment, candidateBrokers []pb.ServerAddress) (leader pb.ServerAddress, followers []pb.ServerAddress, err error) { |
|||
// allocate leader
|
|||
leader, err = allocateLeaderForOneTopicPartition(t, brokerPartition, candidateBrokers) |
|||
if err != nil { |
|||
return |
|||
} |
|||
|
|||
// allocate followers
|
|||
followers, err = allocateFollowersForOneTopicPartition(t, brokerPartition, candidateBrokers) |
|||
if err != nil { |
|||
return |
|||
} |
|||
|
|||
return |
|||
} |
|||
|
|||
func allocateFollowersForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (followers []pb.ServerAddress, err error) { |
|||
return |
|||
} |
|||
|
|||
func allocateLeaderForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (leader pb.ServerAddress, err error) { |
|||
return |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue