You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

102 lines
2.6 KiB

5 years ago
5 years ago
  1. package msgclient
  2. import (
  3. "context"
  4. "io"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  7. "google.golang.org/grpc"
  8. )
  9. type Subscriber struct {
  10. subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
  11. subscriberId string
  12. }
  13. /*
  14. func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) {
  15. // read topic configuration
  16. topicConfiguration := &messaging_pb.TopicConfiguration{
  17. PartitionCount: 4,
  18. }
  19. subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
  20. for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
  21. client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime)
  22. if err != nil {
  23. return nil, err
  24. }
  25. subscriberClients[i] = client
  26. }
  27. return &Subscriber{
  28. subscriberClients: subscriberClients,
  29. subscriberId: subscriberId,
  30. }, nil
  31. }
  32. func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) {
  33. stream, err := setupSubscriberClient(subscriberId, namespace, topic, partition, startTime)
  34. if err != nil {
  35. return stream, err
  36. }
  37. if newBroker != nil {
  38. }
  39. return stream, nil
  40. }
  41. */
  42. func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, namespace string, topic string, partition int32, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
  43. stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background())
  44. if err != nil {
  45. return
  46. }
  47. // send init message
  48. err = stream.Send(&messaging_pb.SubscriberMessage{
  49. Init: &messaging_pb.SubscriberMessage_InitMessage{
  50. Namespace: namespace,
  51. Topic: topic,
  52. Partition: partition,
  53. StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
  54. TimestampNs: startTime.UnixNano(),
  55. SubscriberId: subscriberId,
  56. },
  57. })
  58. if err != nil {
  59. return
  60. }
  61. // process init response
  62. _, err = stream.Recv()
  63. if err != nil {
  64. return
  65. }
  66. return stream, nil
  67. }
  68. func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error {
  69. for {
  70. resp, listenErr := s.subscriberClients[partition].Recv()
  71. if listenErr == io.EOF {
  72. return nil
  73. }
  74. if listenErr != nil {
  75. println(listenErr.Error())
  76. return listenErr
  77. }
  78. processFn(resp.Data)
  79. }
  80. }
  81. // Subscribe starts goroutines to process the messages
  82. func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
  83. for i := 0; i < len(s.subscriberClients); i++ {
  84. go s.doSubscribe(i, processFn)
  85. }
  86. }