You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

71 lines
2.1 KiB

7 days ago
7 days ago
1 week ago
  1. package agent_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  8. "google.golang.org/grpc"
  9. "google.golang.org/grpc/credentials/insecure"
  10. )
  11. type SubscribeOption struct {
  12. ConsumerGroup string
  13. ConsumerGroupInstanceId string
  14. Topic topic.Topic
  15. OffsetType schema_pb.OffsetType
  16. OffsetTsNs int64
  17. Filter string
  18. MaxSubscribedPartitions int32
  19. SlidingWindowSize int32
  20. }
  21. type SubscribeSession struct {
  22. Option *SubscribeOption
  23. stream grpc.BidiStreamingClient[mq_agent_pb.SubscribeRecordRequest, mq_agent_pb.SubscribeRecordResponse]
  24. }
  25. func NewSubscribeSession(agentAddress string, option *SubscribeOption) (*SubscribeSession, error) {
  26. // call local agent grpc server to create a new session
  27. clientConn, err := grpc.NewClient(agentAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
  28. if err != nil {
  29. return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err)
  30. }
  31. agentClient := mq_agent_pb.NewSeaweedMessagingAgentClient(clientConn)
  32. initRequest := &mq_agent_pb.SubscribeRecordRequest_InitSubscribeRecordRequest{
  33. ConsumerGroup: option.ConsumerGroup,
  34. ConsumerGroupInstanceId: option.ConsumerGroupInstanceId,
  35. Topic: &schema_pb.Topic{
  36. Namespace: option.Topic.Namespace,
  37. Name: option.Topic.Name,
  38. },
  39. OffsetType: option.OffsetType,
  40. OffsetTsNs: option.OffsetTsNs,
  41. MaxSubscribedPartitions: option.MaxSubscribedPartitions,
  42. Filter: option.Filter,
  43. SlidingWindowSize: option.SlidingWindowSize,
  44. }
  45. stream, err := agentClient.SubscribeRecord(context.Background())
  46. if err != nil {
  47. return nil, fmt.Errorf("subscribe record: %v", err)
  48. }
  49. if err = stream.Send(&mq_agent_pb.SubscribeRecordRequest{
  50. Init: initRequest,
  51. }); err != nil {
  52. return nil, fmt.Errorf("send session id: %v", err)
  53. }
  54. return &SubscribeSession{
  55. Option: option,
  56. stream: stream,
  57. }, nil
  58. }
  59. func (s *SubscribeSession) CloseSession() error {
  60. err := s.stream.CloseSend()
  61. return err
  62. }