You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

70 lines
1.9 KiB

1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
  1. package agent_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/schema"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  9. "google.golang.org/grpc"
  10. )
  11. type PublishSession struct {
  12. schema *schema.Schema
  13. partitionCount int
  14. publisherName string
  15. stream grpc.BidiStreamingClient[mq_agent_pb.PublishRecordRequest, mq_agent_pb.PublishRecordResponse]
  16. sessionId int64
  17. }
  18. func NewPublishSession(agentAddress string, topicSchema *schema.Schema, partitionCount int, publisherName string) (*PublishSession, error) {
  19. // call local agent grpc server to create a new session
  20. clientConn, err := pb.GrpcDial(context.Background(), agentAddress, true, grpc.WithInsecure())
  21. if err != nil {
  22. return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err)
  23. }
  24. agentClient := mq_agent_pb.NewSeaweedMessagingAgentClient(clientConn)
  25. resp, err := agentClient.StartPublishSession(context.Background(), &mq_agent_pb.StartPublishSessionRequest{
  26. Topic: &schema_pb.Topic{
  27. Namespace: topicSchema.Namespace,
  28. Name: topicSchema.Name,
  29. },
  30. PartitionCount: int32(partitionCount),
  31. RecordType: topicSchema.RecordType,
  32. PublisherName: publisherName,
  33. })
  34. if err != nil {
  35. return nil, err
  36. }
  37. if resp.Error != "" {
  38. return nil, fmt.Errorf("start publish session: %v", resp.Error)
  39. }
  40. stream, err := agentClient.PublishRecord(context.Background())
  41. if err != nil {
  42. return nil, fmt.Errorf("publish record: %v", err)
  43. }
  44. return &PublishSession{
  45. schema: topicSchema,
  46. partitionCount: partitionCount,
  47. publisherName: publisherName,
  48. stream: stream,
  49. sessionId: resp.SessionId,
  50. }, nil
  51. }
  52. func (a *PublishSession) CloseSession() error {
  53. if a.schema == nil {
  54. return nil
  55. }
  56. err := a.stream.CloseSend()
  57. if err != nil {
  58. return fmt.Errorf("close send: %v", err)
  59. }
  60. a.schema = nil
  61. return err
  62. }