You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

100 lines
2.6 KiB

5 years ago
5 years ago
  1. package msgclient
  2. import (
  3. "context"
  4. "io"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/messaging/broker"
  7. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  8. "google.golang.org/grpc"
  9. )
  10. type Subscriber struct {
  11. subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
  12. subscriberId string
  13. }
  14. func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) {
  15. // read topic configuration
  16. topicConfiguration := &messaging_pb.TopicConfiguration{
  17. PartitionCount: 4,
  18. }
  19. subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
  20. for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
  21. tp := broker.TopicPartition{
  22. Namespace: namespace,
  23. Topic: topic,
  24. Partition: int32(i),
  25. }
  26. grpcClientConn, err := mc.findBroker(tp)
  27. if err != nil {
  28. return nil, err
  29. }
  30. client, err := setupSubscriberClient(grpcClientConn, tp, subscriberId, startTime)
  31. if err != nil {
  32. return nil, err
  33. }
  34. subscriberClients[i] = client
  35. }
  36. return &Subscriber{
  37. subscriberClients: subscriberClients,
  38. subscriberId: subscriberId,
  39. }, nil
  40. }
  41. func setupSubscriberClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
  42. stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background())
  43. if err != nil {
  44. return
  45. }
  46. // send init message
  47. err = stream.Send(&messaging_pb.SubscriberMessage{
  48. Init: &messaging_pb.SubscriberMessage_InitMessage{
  49. Namespace: tp.Namespace,
  50. Topic: tp.Topic,
  51. Partition: tp.Partition,
  52. StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
  53. TimestampNs: startTime.UnixNano(),
  54. SubscriberId: subscriberId,
  55. },
  56. })
  57. if err != nil {
  58. return
  59. }
  60. // process init response
  61. _, err = stream.Recv()
  62. if err != nil {
  63. return
  64. }
  65. return stream, nil
  66. }
  67. func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error {
  68. for {
  69. resp, listenErr := s.subscriberClients[partition].Recv()
  70. if listenErr == io.EOF {
  71. return nil
  72. }
  73. if listenErr != nil {
  74. println(listenErr.Error())
  75. return listenErr
  76. }
  77. if resp.Data == nil {
  78. // this could be heartbeat from broker
  79. continue
  80. }
  81. processFn(resp.Data)
  82. }
  83. }
  84. // Subscribe starts goroutines to process the messages
  85. func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
  86. for i := 0; i < len(s.subscriberClients); i++ {
  87. go s.doSubscribe(i, processFn)
  88. }
  89. }