You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

56 lines
1.6 KiB

  1. package agent
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
  7. "google.golang.org/grpc"
  8. "sync"
  9. )
  10. type SessionId int64
  11. type SessionEntry[T any] struct {
  12. entry T
  13. lastActiveTsNs int64
  14. }
  15. type MessageQueueAgentOptions struct {
  16. SeedBrokers []pb.ServerAddress
  17. }
  18. type MessageQueueAgent struct {
  19. mq_agent_pb.UnimplementedSeaweedMessagingAgentServer
  20. option *MessageQueueAgentOptions
  21. brokers []pb.ServerAddress
  22. grpcDialOption grpc.DialOption
  23. publishers map[SessionId]*SessionEntry[*pub_client.TopicPublisher]
  24. publishersLock sync.RWMutex
  25. subscribers map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]
  26. subscribersLock sync.RWMutex
  27. }
  28. func NewMessageQueueAgent(option *MessageQueueAgentOptions, grpcDialOption grpc.DialOption) *MessageQueueAgent {
  29. // initialize brokers which may change later
  30. var brokers []pb.ServerAddress
  31. for _, broker := range option.SeedBrokers {
  32. brokers = append(brokers, broker)
  33. }
  34. return &MessageQueueAgent{
  35. option: option,
  36. brokers: brokers,
  37. grpcDialOption: grpcDialOption,
  38. publishers: make(map[SessionId]*SessionEntry[*pub_client.TopicPublisher]),
  39. subscribers: make(map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]),
  40. }
  41. }
  42. func (a *MessageQueueAgent) brokersList() []string {
  43. var brokers []string
  44. for _, broker := range a.brokers {
  45. brokers = append(brokers, broker.String())
  46. }
  47. return brokers
  48. }