You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
|
|
package agent
import ( "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" "google.golang.org/grpc" "sync" )
type SessionId int64 type SessionEntry[T any] struct { entry T lastActiveTsNs int64 }
type MessageQueueAgentOptions struct { SeedBrokers []pb.ServerAddress }
type MessageQueueAgent struct { mq_agent_pb.UnimplementedSeaweedMessagingAgentServer option *MessageQueueAgentOptions brokers []pb.ServerAddress grpcDialOption grpc.DialOption publishers map[SessionId]*SessionEntry[*pub_client.TopicPublisher] publishersLock sync.RWMutex subscribers map[SessionId]*SessionEntry[*sub_client.TopicSubscriber] subscribersLock sync.RWMutex }
func NewMessageQueueAgent(option *MessageQueueAgentOptions, grpcDialOption grpc.DialOption) *MessageQueueAgent {
// check masters to list all brokers
return &MessageQueueAgent{ option: option, brokers: []pb.ServerAddress{}, grpcDialOption: grpcDialOption, publishers: make(map[SessionId]*SessionEntry[*pub_client.TopicPublisher]), subscribers: make(map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]), } }
func (a *MessageQueueAgent) brokersList() []string { var brokers []string for _, broker := range a.brokers { brokers = append(brokers, broker.String()) } return brokers }
|