You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

71 lines
2.0 KiB

1 week ago
1 week ago
  1. package agent_client
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/schema"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/credentials"
  11. )
  12. type PublishSession struct {
  13. schema *schema.Schema
  14. partitionCount int
  15. publisherName string
  16. stream grpc.BidiStreamingClient[mq_agent_pb.PublishRecordRequest, mq_agent_pb.PublishRecordResponse]
  17. sessionId int64
  18. }
  19. func NewPublishSession(agentAddress string, topicSchema *schema.Schema, partitionCount int, publisherName string) (*PublishSession, error) {
  20. // call local agent grpc server to create a new session
  21. clientConn, err := grpc.NewClient(agentAddress, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
  22. if err != nil {
  23. return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err)
  24. }
  25. agentClient := mq_agent_pb.NewSeaweedMessagingAgentClient(clientConn)
  26. resp, err := agentClient.StartPublishSession(context.Background(), &mq_agent_pb.StartPublishSessionRequest{
  27. Topic: &schema_pb.Topic{
  28. Namespace: topicSchema.Namespace,
  29. Name: topicSchema.Name,
  30. },
  31. PartitionCount: int32(partitionCount),
  32. RecordType: topicSchema.RecordType,
  33. PublisherName: publisherName,
  34. })
  35. if err != nil {
  36. return nil, err
  37. }
  38. if resp.Error != "" {
  39. return nil, fmt.Errorf("start publish session: %v", resp.Error)
  40. }
  41. stream, err := agentClient.PublishRecord(context.Background())
  42. if err != nil {
  43. return nil, fmt.Errorf("publish record: %v", err)
  44. }
  45. return &PublishSession{
  46. schema: topicSchema,
  47. partitionCount: partitionCount,
  48. publisherName: publisherName,
  49. stream: stream,
  50. sessionId: resp.SessionId,
  51. }, nil
  52. }
  53. func (a *PublishSession) CloseSession() error {
  54. if a.schema == nil {
  55. return nil
  56. }
  57. err := a.stream.CloseSend()
  58. if err != nil {
  59. return fmt.Errorf("close send: %v", err)
  60. }
  61. a.schema = nil
  62. return err
  63. }