You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

69 lines
1.9 KiB

  1. package agent_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/schema"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
  8. "google.golang.org/grpc"
  9. )
  10. type AgentSession struct {
  11. schema *schema.Schema
  12. partitionCount int
  13. publisherName string
  14. stream grpc.BidiStreamingClient[mq_agent_pb.PublishRecordRequest, mq_agent_pb.PublishRecordResponse]
  15. sessionId int64
  16. }
  17. func NewAgentSession(address string, topicSchema *schema.Schema, partitionCount int, publisherName string) (*AgentSession, error) {
  18. // call local agent grpc server to create a new session
  19. clientConn, err := pb.GrpcDial(context.Background(), address, true, grpc.WithInsecure())
  20. if err != nil {
  21. return nil, fmt.Errorf("dial agent server %s: %v", address, err)
  22. }
  23. agentClient := mq_agent_pb.NewSeaweedMessagingAgentClient(clientConn)
  24. resp, err := agentClient.StartPublishSession(context.Background(), &mq_agent_pb.StartPublishSessionRequest{
  25. Topic: &mq_agent_pb.Topic{
  26. Namespace: topicSchema.Namespace,
  27. Name: topicSchema.Name,
  28. },
  29. PartitionCount: int32(partitionCount),
  30. RecordType: topicSchema.RecordType,
  31. PublisherName: publisherName,
  32. })
  33. if err != nil {
  34. return nil, err
  35. }
  36. if resp.Error != "" {
  37. return nil, fmt.Errorf("start publish session: %v", resp.Error)
  38. }
  39. stream, err := agentClient.PublishRecord(context.Background())
  40. if err != nil {
  41. return nil, fmt.Errorf("publish record: %v", err)
  42. }
  43. return &AgentSession{
  44. schema: topicSchema,
  45. partitionCount: partitionCount,
  46. publisherName: publisherName,
  47. stream: stream,
  48. sessionId: resp.SessionId,
  49. }, nil
  50. }
  51. func (a *AgentSession) CloseSession() error {
  52. if a.schema == nil {
  53. return nil
  54. }
  55. err := a.stream.CloseSend()
  56. if err != nil {
  57. return fmt.Errorf("close send: %v", err)
  58. }
  59. a.schema = nil
  60. return err
  61. }