You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

50 lines
1.7 KiB

1 year ago
1 year ago
1 year ago
  1. package broker
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  5. )
  6. // FindTopicBrokers returns the brokers that are serving the topic
  7. //
  8. // 1. lock the topic
  9. //
  10. // 2. find the topic partitions on the filer
  11. // 2.1 if the topic is not found, return error
  12. // 2.2 if the request is_for_publish, create the topic
  13. // 2.2.1 if the request is_for_subscribe, return error not found
  14. // 2.2.2 if the request is_for_publish, create the topic
  15. // 2.2 if the topic is found, return the brokers
  16. //
  17. // 3. unlock the topic
  18. func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (*mq_pb.LookupTopicBrokersResponse, error) {
  19. ret := &mq_pb.LookupTopicBrokersResponse{}
  20. // TODO lock the topic
  21. // find the topic partitions on the filer
  22. // if the topic is not found
  23. // if the request is_for_publish
  24. // create the topic
  25. // if the request is_for_subscribe
  26. // return error not found
  27. // t := topic.FromPbTopic(request.Topic)
  28. ret.Topic = request.Topic
  29. ret.BrokerPartitionAssignments = []*mq_pb.BrokerPartitionAssignment{
  30. {
  31. LeaderBroker: "localhost:17777",
  32. FollowerBrokers: []string{"localhost:17777"},
  33. Partition: &mq_pb.Partition{
  34. RingSize: MaxPartitionCount,
  35. RangeStart: 0,
  36. RangeStop: MaxPartitionCount,
  37. },
  38. },
  39. }
  40. return ret, nil
  41. }
  42. // CheckTopicPartitionsStatus check the topic partitions on the broker
  43. func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) {
  44. ret := &mq_pb.CheckTopicPartitionsStatusResponse{}
  45. return ret, nil
  46. }