You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

72 lines
2.7 KiB

  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/status"
  11. )
  12. // ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer
  13. func (broker *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error) {
  14. if broker.currentBalancer == "" {
  15. return nil, status.Errorf(codes.Unavailable, "no balancer")
  16. }
  17. if !broker.lockAsBalancer.IsLocked() {
  18. proxyErr := broker.withBrokerClient(false, broker.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
  19. resp, err = client.ConfigureTopic(ctx, request)
  20. return nil
  21. })
  22. if proxyErr != nil {
  23. return nil, proxyErr
  24. }
  25. return resp, err
  26. }
  27. ret := &mq_pb.ConfigureTopicResponse{}
  28. ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount)
  29. for _, bpa := range ret.BrokerPartitionAssignments {
  30. // fmt.Printf("create topic %s on %s\n", request.Topic, bpa.LeaderBroker)
  31. if doCreateErr := broker.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
  32. _, doCreateErr := client.DoConfigureTopic(ctx, &mq_pb.DoConfigureTopicRequest{
  33. Topic: request.Topic,
  34. Partition: bpa.Partition,
  35. })
  36. if doCreateErr != nil {
  37. return fmt.Errorf("do create topic %s on %s: %v", request.Topic, bpa.LeaderBroker, doCreateErr)
  38. }
  39. brokerStats, found := broker.Balancer.Brokers.Get(bpa.LeaderBroker)
  40. if !found {
  41. brokerStats = balancer.NewBrokerStats()
  42. if !broker.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
  43. brokerStats, _ = broker.Balancer.Brokers.Get(bpa.LeaderBroker)
  44. }
  45. }
  46. brokerStats.RegisterAssignment(request.Topic, bpa.Partition)
  47. return nil
  48. }); doCreateErr != nil {
  49. return nil, doCreateErr
  50. }
  51. }
  52. // TODO revert if some error happens in the middle of the assignments
  53. return ret, err
  54. }
  55. func (broker *MessageQueueBroker) DoConfigureTopic(ctx context.Context, req *mq_pb.DoConfigureTopicRequest) (resp *mq_pb.DoConfigureTopicResponse, err error) {
  56. ret := &mq_pb.DoConfigureTopicResponse{}
  57. t, p := topic.FromPbTopic(req.Topic), topic.FromPbPartition(req.Partition)
  58. localTopicPartition := broker.localTopicManager.GetTopicPartition(t, p)
  59. if localTopicPartition == nil {
  60. localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
  61. broker.localTopicManager.AddTopicPartition(t, localTopicPartition)
  62. }
  63. return ret, err
  64. }