You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

43 lines
1.2 KiB

  1. package agent
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
  7. "math/rand/v2"
  8. "time"
  9. )
  10. func (a *MessageQueueAgent) StartPublishSession(ctx context.Context, req *mq_agent_pb.StartPublishSessionRequest) (*mq_agent_pb.StartPublishSessionResponse, error) {
  11. sessionId := rand.Int64()
  12. topicPublisher := pub_client.NewTopicPublisher(
  13. &pub_client.PublisherConfiguration{
  14. Topic: topic.NewTopic(req.Topic.Namespace, req.Topic.Name),
  15. PartitionCount: req.PartitionCount,
  16. Brokers: a.brokersList(),
  17. PublisherName: req.PublisherName,
  18. RecordType: req.RecordType,
  19. })
  20. a.publishersLock.Lock()
  21. // remove inactive publishers to avoid memory leak
  22. for k, entry := range a.publishers {
  23. if entry.lastActiveTsNs == 0 {
  24. // this is an active session
  25. continue
  26. }
  27. if time.Unix(0, entry.lastActiveTsNs).Add(10 * time.Hour).Before(time.Now()) {
  28. delete(a.publishers, k)
  29. }
  30. }
  31. a.publishers[SessionId(sessionId)] = &PublisherEntry{
  32. publisher: topicPublisher,
  33. }
  34. a.publishersLock.Unlock()
  35. return &mq_agent_pb.StartPublishSessionResponse{
  36. SessionId: sessionId,
  37. }, nil
  38. }