You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

81 lines
2.8 KiB

  1. package topic_allocation
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  4. "github.com/seaweedfs/seaweedfs/weed/pb"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. "modernc.org/mathutil"
  7. )
  8. const (
  9. DefaultBrokerCount = 4
  10. )
  11. // AllocateBrokersForTopicPartitions allocate brokers for a topic's all partitions
  12. func AllocateBrokersForTopicPartitions(t topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment, candidateBrokers []pb.ServerAddress) (assignment *mq_pb.TopicPartitionsAssignment, err error) {
  13. // create a previous assignment if not exists
  14. if prevAssignment == nil || len(prevAssignment.BrokerPartitions) == 0 {
  15. prevAssignment = &mq_pb.TopicPartitionsAssignment{
  16. PartitionCount: topic.PartitionCount,
  17. }
  18. partitionCountForEachBroker := topic.PartitionCount / DefaultBrokerCount
  19. for i := 0; i < DefaultBrokerCount; i++ {
  20. prevAssignment.BrokerPartitions = append(prevAssignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
  21. PartitionStart: int32(i * partitionCountForEachBroker),
  22. PartitionStop: mathutil.MaxInt32(int32((i+1)*partitionCountForEachBroker), topic.PartitionCount),
  23. })
  24. }
  25. }
  26. // create a new assignment
  27. assignment = &mq_pb.TopicPartitionsAssignment{
  28. PartitionCount: prevAssignment.PartitionCount,
  29. }
  30. // allocate partitions for each partition range
  31. for _, brokerPartition := range prevAssignment.BrokerPartitions {
  32. // allocate partitions for each partition range
  33. leader, followers, err := allocateBrokersForOneTopicPartition(t, brokerPartition, candidateBrokers)
  34. if err != nil {
  35. return nil, err
  36. }
  37. followerBrokers := make([]string, len(followers))
  38. for i, follower := range followers {
  39. followerBrokers[i] = string(follower)
  40. }
  41. assignment.BrokerPartitions = append(assignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
  42. PartitionStart: brokerPartition.PartitionStart,
  43. PartitionStop: brokerPartition.PartitionStop,
  44. LeaderBroker: string(leader),
  45. FollowerBrokers: followerBrokers,
  46. })
  47. }
  48. return
  49. }
  50. func allocateBrokersForOneTopicPartition(t topic.Topic, brokerPartition *mq_pb.BrokerPartitionsAssignment, candidateBrokers []pb.ServerAddress) (leader pb.ServerAddress, followers []pb.ServerAddress, err error) {
  51. // allocate leader
  52. leader, err = allocateLeaderForOneTopicPartition(t, brokerPartition, candidateBrokers)
  53. if err != nil {
  54. return
  55. }
  56. // allocate followers
  57. followers, err = allocateFollowersForOneTopicPartition(t, brokerPartition, candidateBrokers)
  58. if err != nil {
  59. return
  60. }
  61. return
  62. }
  63. func allocateFollowersForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (followers []pb.ServerAddress, err error) {
  64. return
  65. }
  66. func allocateLeaderForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (leader pb.ServerAddress, err error) {
  67. return
  68. }