You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

99 lines
3.7 KiB

12 months ago
12 months ago
10 months ago
10 months ago
10 months ago
10 months ago
12 months ago
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  8. "github.com/seaweedfs/seaweedfs/weed/pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  10. "sync"
  11. )
  12. // AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
  13. func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
  14. ret := &mq_pb.AssignTopicPartitionsResponse{}
  15. // drain existing topic partition subscriptions
  16. for _, assignment := range request.BrokerPartitionAssignments {
  17. t := topic.FromPbTopic(request.Topic)
  18. partition := topic.FromPbPartition(assignment.Partition)
  19. b.accessLock.Lock()
  20. if request.IsDraining {
  21. // TODO drain existing topic partition subscriptions
  22. b.localTopicManager.RemoveLocalPartition(t, partition)
  23. } else {
  24. var localPartition *topic.LocalPartition
  25. if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil {
  26. localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
  27. b.localTopicManager.AddLocalPartition(t, localPartition)
  28. }
  29. }
  30. b.accessLock.Unlock()
  31. }
  32. // if is leader, notify the followers to drain existing topic partition subscriptions
  33. if request.IsLeader {
  34. for _, brokerPartition := range request.BrokerPartitionAssignments {
  35. for _, follower := range brokerPartition.FollowerBrokers {
  36. err := pb.WithBrokerGrpcClient(false, follower, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  37. _, err := client.AssignTopicPartitions(context.Background(), request)
  38. return err
  39. })
  40. if err != nil {
  41. return ret, err
  42. }
  43. }
  44. }
  45. }
  46. glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments)
  47. return ret, nil
  48. }
  49. // called by broker leader to drain existing partitions.
  50. // new/updated partitions will be detected by broker from the filer
  51. func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, t *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment, isAdd bool) error {
  52. // notify the brokers to create the topic partitions in parallel
  53. var wg sync.WaitGroup
  54. for _, bpa := range assignments {
  55. wg.Add(1)
  56. go func(bpa *mq_pb.BrokerPartitionAssignment) {
  57. defer wg.Done()
  58. if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
  59. _, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{
  60. Topic: t,
  61. BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{
  62. {
  63. Partition: bpa.Partition,
  64. },
  65. },
  66. IsLeader: true,
  67. IsDraining: !isAdd,
  68. })
  69. if doCreateErr != nil {
  70. if !isAdd {
  71. return fmt.Errorf("drain topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
  72. } else {
  73. return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
  74. }
  75. }
  76. brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker)
  77. if !found {
  78. brokerStats = pub_balancer.NewBrokerStats()
  79. if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
  80. brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker)
  81. }
  82. }
  83. brokerStats.RegisterAssignment(t, bpa.Partition, isAdd)
  84. return nil
  85. }); doCreateErr != nil {
  86. glog.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr)
  87. }
  88. }(bpa)
  89. }
  90. wg.Wait()
  91. return nil
  92. }