You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

102 lines
2.7 KiB

5 years ago
  1. package msgclient
  2. import (
  3. "context"
  4. "io"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  7. )
  8. type Subscriber struct {
  9. subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
  10. subscriberId string
  11. }
  12. func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) {
  13. // read topic configuration
  14. topicConfiguration := &messaging_pb.TopicConfiguration{
  15. PartitionCount: 4,
  16. }
  17. subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
  18. for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
  19. client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime)
  20. if err != nil {
  21. return nil, err
  22. }
  23. subscriberClients[i] = client
  24. }
  25. return &Subscriber{
  26. subscriberClients: subscriberClients,
  27. subscriberId: subscriberId,
  28. }, nil
  29. }
  30. func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) {
  31. stream, newBroker, err := mc.initSubscriberClient(subscriberId, namespace, topic, partition, startTime)
  32. if err != nil {
  33. return client, err
  34. }
  35. if newBroker != nil {
  36. }
  37. return stream, nil
  38. }
  39. func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, namespace string, topic string, partition int32, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
  40. stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background())
  41. if err != nil {
  42. return
  43. }
  44. // send init message
  45. err = stream.Send(&messaging_pb.SubscriberMessage{
  46. Init: &messaging_pb.SubscriberMessage_InitMessage{
  47. Namespace: namespace,
  48. Topic: topic,
  49. Partition: partition,
  50. StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
  51. TimestampNs: startTime.UnixNano(),
  52. SubscriberId: subscriberId,
  53. },
  54. })
  55. if err != nil {
  56. return
  57. }
  58. // process init response
  59. initResponse, err := stream.Recv()
  60. if err != nil {
  61. return
  62. }
  63. if initResponse.Redirect != nil {
  64. // TODO follow redirection
  65. }
  66. return stream, nil
  67. }
  68. func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error {
  69. for {
  70. resp, listenErr := s.subscriberClients[partition].Recv()
  71. if listenErr == io.EOF {
  72. return nil
  73. }
  74. if listenErr != nil {
  75. println(listenErr.Error())
  76. return listenErr
  77. }
  78. processFn(resp.Data)
  79. }
  80. }
  81. // Subscribe starts goroutines to process the messages
  82. func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
  83. for i := 0; i < len(s.subscriberClients); i++ {
  84. go s.doSubscribe(i, processFn)
  85. }
  86. }