You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

56 lines
2.0 KiB

  1. package broker
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. "google.golang.org/grpc/codes"
  8. "google.golang.org/grpc/status"
  9. )
  10. // CreateTopic Runs on any broker, but proxied to the balancer if not the balancer
  11. func (broker *MessageQueueBroker) CreateTopic(ctx context.Context, request *mq_pb.CreateTopicRequest) (resp *mq_pb.CreateTopicResponse, err error) {
  12. if broker.currentBalancer == "" {
  13. return nil, status.Errorf(codes.Unavailable, "no balancer")
  14. }
  15. if !broker.lockAsBalancer.IsLocked() {
  16. proxyErr := broker.withBrokerClient(false, broker.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
  17. resp, err = client.CreateTopic(ctx, request)
  18. return nil
  19. })
  20. if proxyErr != nil {
  21. return nil, proxyErr
  22. }
  23. return resp, err
  24. }
  25. ret := &mq_pb.CreateTopicResponse{}
  26. ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount)
  27. for _, bpa := range ret.BrokerPartitionAssignments {
  28. if doCreateErr := broker.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
  29. _, doCreateErr := client.DoCreateTopic(ctx, &mq_pb.DoCreateTopicRequest{
  30. Topic: request.Topic,
  31. Partition: bpa.Partition,
  32. })
  33. return doCreateErr
  34. }); doCreateErr != nil {
  35. return nil, doCreateErr
  36. }
  37. }
  38. return ret, err
  39. }
  40. func (broker *MessageQueueBroker) DoCreateTopic(ctx context.Context, req *mq_pb.DoCreateTopicRequest) (resp *mq_pb.DoCreateTopicResponse, err error) {
  41. ret := &mq_pb.DoCreateTopicResponse{}
  42. t, p := topic.FromPbTopic(req.Topic), topic.FromPbPartition(req.Partition)
  43. localTopicPartition := broker.localTopicManager.GetTopicPartition(t, p)
  44. if localTopicPartition == nil {
  45. localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
  46. broker.localTopicManager.AddTopicPartition(t, localTopicPartition)
  47. }
  48. return ret, err
  49. }