You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

53 lines
1.6 KiB

  1. package agent
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
  7. "log/slog"
  8. "math/rand/v2"
  9. )
  10. func (a *MessageQueueAgent) StartPublishSession(ctx context.Context, req *mq_agent_pb.StartPublishSessionRequest) (*mq_agent_pb.StartPublishSessionResponse, error) {
  11. sessionId := rand.Int64()
  12. topicPublisher, err := pub_client.NewTopicPublisher(
  13. &pub_client.PublisherConfiguration{
  14. Topic: topic.NewTopic(req.Topic.Namespace, req.Topic.Name),
  15. PartitionCount: req.PartitionCount,
  16. Brokers: a.brokersList(),
  17. PublisherName: req.PublisherName,
  18. RecordType: req.RecordType,
  19. })
  20. if err != nil {
  21. return nil, err
  22. }
  23. a.publishersLock.Lock()
  24. a.publishers[SessionId(sessionId)] = &SessionEntry[*pub_client.TopicPublisher]{
  25. entry: topicPublisher,
  26. }
  27. a.publishersLock.Unlock()
  28. return &mq_agent_pb.StartPublishSessionResponse{
  29. SessionId: sessionId,
  30. }, nil
  31. }
  32. func (a *MessageQueueAgent) ClosePublishSession(ctx context.Context, req *mq_agent_pb.ClosePublishSessionRequest) (*mq_agent_pb.ClosePublishSessionResponse, error) {
  33. var finishErr string
  34. a.publishersLock.Lock()
  35. publisherEntry, found := a.publishers[SessionId(req.SessionId)]
  36. if found {
  37. if err := publisherEntry.entry.FinishPublish(); err != nil {
  38. finishErr = err.Error()
  39. slog.Warn("failed to finish publish", "error", err)
  40. }
  41. delete(a.publishers, SessionId(req.SessionId))
  42. }
  43. a.publishersLock.Unlock()
  44. return &mq_agent_pb.ClosePublishSessionResponse{
  45. Error: finishErr,
  46. }, nil
  47. }