You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

113 lines
3.0 KiB

5 years ago
5 years ago
  1. package msgclient
  2. import (
  3. "context"
  4. "io"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/messaging/broker"
  7. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  8. "google.golang.org/grpc"
  9. )
  10. type Subscriber struct {
  11. subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
  12. subscriberCancels []context.CancelFunc
  13. subscriberId string
  14. }
  15. func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) {
  16. // read topic configuration
  17. topicConfiguration := &messaging_pb.TopicConfiguration{
  18. PartitionCount: 4,
  19. }
  20. subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
  21. subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount)
  22. for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
  23. if partitionId>=0 && i != partitionId {
  24. continue
  25. }
  26. tp := broker.TopicPartition{
  27. Namespace: namespace,
  28. Topic: topic,
  29. Partition: int32(i),
  30. }
  31. grpcClientConn, err := mc.findBroker(tp)
  32. if err != nil {
  33. return nil, err
  34. }
  35. ctx, cancel := context.WithCancel(context.Background())
  36. client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime)
  37. if err != nil {
  38. return nil, err
  39. }
  40. subscriberClients[i] = client
  41. subscriberCancels[i] = cancel
  42. }
  43. return &Subscriber{
  44. subscriberClients: subscriberClients,
  45. subscriberCancels: subscriberCancels,
  46. subscriberId: subscriberId,
  47. }, nil
  48. }
  49. func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
  50. stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx)
  51. if err != nil {
  52. return
  53. }
  54. // send init message
  55. err = stream.Send(&messaging_pb.SubscriberMessage{
  56. Init: &messaging_pb.SubscriberMessage_InitMessage{
  57. Namespace: tp.Namespace,
  58. Topic: tp.Topic,
  59. Partition: tp.Partition,
  60. StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
  61. TimestampNs: startTime.UnixNano(),
  62. SubscriberId: subscriberId,
  63. },
  64. })
  65. if err != nil {
  66. return
  67. }
  68. return stream, nil
  69. }
  70. func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error {
  71. for {
  72. resp, listenErr := subscriberClient.Recv()
  73. if listenErr == io.EOF {
  74. return nil
  75. }
  76. if listenErr != nil {
  77. println(listenErr.Error())
  78. return listenErr
  79. }
  80. if resp.Data == nil {
  81. // this could be heartbeat from broker
  82. continue
  83. }
  84. processFn(resp.Data)
  85. }
  86. }
  87. // Subscribe starts goroutines to process the messages
  88. func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
  89. for i := 0; i < len(s.subscriberClients); i++ {
  90. if s.subscriberClients[i] != nil {
  91. go doSubscribe(s.subscriberClients[i], processFn)
  92. }
  93. }
  94. }
  95. func (s *Subscriber) Shutdown() {
  96. for i := 0; i < len(s.subscriberClients); i++ {
  97. if s.subscriberCancels[i] != nil {
  98. s.subscriberCancels[i]()
  99. }
  100. }
  101. }