You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

43 lines
1.5 KiB

  1. package broker
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. "google.golang.org/grpc/codes"
  7. "google.golang.org/grpc/status"
  8. )
  9. // CreateTopic Runs on any broker, but proxied to the balancer if not the balancer
  10. func (broker *MessageQueueBroker) CreateTopic(ctx context.Context, request *mq_pb.CreateTopicRequest) (resp *mq_pb.CreateTopicResponse, err error) {
  11. if broker.currentBalancer == "" {
  12. return nil, status.Errorf(codes.Unavailable, "no balancer")
  13. }
  14. if !broker.lockAsBalancer.IsLocked() {
  15. proxyErr := broker.withBrokerClient(false, broker.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
  16. resp, err = client.CreateTopic(ctx, request)
  17. return nil
  18. })
  19. if proxyErr != nil {
  20. return nil, proxyErr
  21. }
  22. return resp, err
  23. }
  24. ret := &mq_pb.CreateTopicResponse{}
  25. ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount)
  26. return ret, err
  27. }
  28. func (broker *MessageQueueBroker) DoCreateTopic(ctx context.Context, req *mq_pb.DoCreateTopicRequest) (resp *mq_pb.DoCreateTopicResponse, err error) {
  29. ret := &mq_pb.DoCreateTopicResponse{}
  30. t, p := topic.FromPbTopic(req.Topic), topic.FromPbPartition(req.Partition)
  31. localTopicPartition := broker.localTopicManager.GetTopicPartition(t, p)
  32. if localTopicPartition == nil {
  33. localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
  34. broker.localTopicManager.AddTopicPartition(t, localTopicPartition)
  35. }
  36. return ret, err
  37. }