You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

61 lines
1.8 KiB

  1. package agent
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
  7. "log/slog"
  8. "math/rand/v2"
  9. "time"
  10. )
  11. func (a *MessageQueueAgent) StartPublishSession(ctx context.Context, req *mq_agent_pb.StartPublishSessionRequest) (*mq_agent_pb.StartPublishSessionResponse, error) {
  12. sessionId := rand.Int64()
  13. topicPublisher := pub_client.NewTopicPublisher(
  14. &pub_client.PublisherConfiguration{
  15. Topic: topic.NewTopic(req.Topic.Namespace, req.Topic.Name),
  16. PartitionCount: req.PartitionCount,
  17. Brokers: a.brokersList(),
  18. PublisherName: req.PublisherName,
  19. RecordType: req.RecordType,
  20. })
  21. a.publishersLock.Lock()
  22. // remove inactive publishers to avoid memory leak
  23. for k, entry := range a.publishers {
  24. if entry.lastActiveTsNs == 0 {
  25. // this is an active session
  26. continue
  27. }
  28. if time.Unix(0, entry.lastActiveTsNs).Add(10 * time.Hour).Before(time.Now()) {
  29. delete(a.publishers, k)
  30. }
  31. }
  32. a.publishers[SessionId(sessionId)] = &SessionEntry[*pub_client.TopicPublisher]{
  33. entry: topicPublisher,
  34. }
  35. a.publishersLock.Unlock()
  36. return &mq_agent_pb.StartPublishSessionResponse{
  37. SessionId: sessionId,
  38. }, nil
  39. }
  40. func (a *MessageQueueAgent) ClosePublishSession(ctx context.Context, req *mq_agent_pb.ClosePublishSessionRequest) (*mq_agent_pb.ClosePublishSessionResponse, error) {
  41. var finishErr string
  42. a.publishersLock.Lock()
  43. publisherEntry, found := a.publishers[SessionId(req.SessionId)]
  44. if found {
  45. if err := publisherEntry.entry.FinishPublish(); err != nil {
  46. finishErr = err.Error()
  47. slog.Warn("failed to finish publish", "error", err)
  48. }
  49. delete(a.publishers, SessionId(req.SessionId))
  50. }
  51. a.publishersLock.Unlock()
  52. return &mq_agent_pb.ClosePublishSessionResponse{
  53. Error: finishErr,
  54. }, nil
  55. }