From b5a53314cb4814e9b536a96e962f82d669643cc0 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 24 Feb 2025 22:51:03 -0800 Subject: [PATCH] can publish records via agent --- docker/Makefile | 3 +++ docker/compose/local-mq-test.yml | 2 +- weed/Makefile | 4 ++++ weed/command/mq_agent.go | 6 +++--- weed/mq/agent/agent_grpc_pub_session.go | 5 ++++- weed/mq/agent/agent_grpc_publish.go | 4 ++-- weed/mq/agent/agent_grpc_subscribe.go | 2 +- weed/mq/agent/agent_server.go | 8 ++++++-- weed/mq/client/agent_client/publish_session.go | 5 ++--- weed/mq/client/cmd/weed_pub_kv/publisher_kv.go | 5 ++++- weed/mq/client/cmd/weed_pub_record/publisher_record.go | 5 ++++- weed/mq/client/pub_client/publisher.go | 8 ++++---- weed/mq/client/pub_client/scheduler.go | 10 +++++++--- 13 files changed, 45 insertions(+), 22 deletions(-) diff --git a/docker/Makefile b/docker/Makefile index 61ec70917..0f6b0b8b1 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -99,6 +99,9 @@ s3tests: build s3tests_build brokers: build docker compose -f compose/local-brokers-compose.yml -p seaweedfs up +agent: build + docker compose -f compose/local-mq-test.yml -p seaweedfs up + filer_etcd: build docker stack deploy -c compose/swarm-etcd.yml fs diff --git a/docker/compose/local-mq-test.yml b/docker/compose/local-mq-test.yml index 4149e38f1..fef68cac8 100644 --- a/docker/compose/local-mq-test.yml +++ b/docker/compose/local-mq-test.yml @@ -21,7 +21,7 @@ services: image: chrislusf/seaweedfs:local ports: - 16777:16777 - command: "mq.agent -broker=mq_broker:17777" + command: "mq.agent -broker=mq_broker:17777 -port=16777" depends_on: - mq_broker mq_client: diff --git a/weed/Makefile b/weed/Makefile index 7d1a0d2ed..0241d8dea 100644 --- a/weed/Makefile +++ b/weed/Makefile @@ -41,6 +41,10 @@ debug_mq: go build -gcflags="all=-N -l" dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec ./weed -- -v=4 mq.broker +debug_mq_agent: + go build -gcflags="all=-N -l" + dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec ./weed -- -v=4 mq.agent -broker=localhost:17777 + debug_filer_copy: go build -gcflags="all=-N -l" dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec ./weed -- -v=4 filer.backup -filer=localhost:8888 -filerProxy -timeAgo=10h diff --git a/weed/command/mq_agent.go b/weed/command/mq_agent.go index 2bbd898bc..4a59dcf33 100644 --- a/weed/command/mq_agent.go +++ b/weed/command/mq_agent.go @@ -26,12 +26,12 @@ type MessageQueueAgentOptions struct { func init() { cmdMqAgent.Run = runMqAgent // break init cycle mqAgentOptions.brokersString = cmdMqAgent.Flag.String("broker", "localhost:17777", "comma-separated message queue brokers") - mqAgentOptions.ip = cmdMqAgent.Flag.String("ip", "localhost", "message queue agent host address") + mqAgentOptions.ip = cmdMqAgent.Flag.String("ip", "", "message queue agent host address") mqAgentOptions.port = cmdMqAgent.Flag.Int("port", 16777, "message queue agent gRPC server port") } var cmdMqAgent = &Command{ - UsageLine: "mq.agent [-port=6377] [-master=]", + UsageLine: "mq.agent [-port=16777] [-master=]", Short: " start a message queue agent", Long: `start a message queue agent @@ -64,7 +64,7 @@ func (mqAgentOpt *MessageQueueAgentOptions) startQueueAgent() bool { if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", *mqAgentOpt.port, err) } - glog.Info("Start Seaweed Message Queue Agent on ", grpcL.Addr().String()) + glog.Infof("Start Seaweed Message Queue Agent on %s:%d", *mqAgentOpt.ip, *mqAgentOpt.port) grpcS := pb.NewGrpcServer() mq_agent_pb.RegisterSeaweedMessagingAgentServer(grpcS, agentServer) reflection.Register(grpcS) diff --git a/weed/mq/agent/agent_grpc_pub_session.go b/weed/mq/agent/agent_grpc_pub_session.go index d5c6d0813..9c7497609 100644 --- a/weed/mq/agent/agent_grpc_pub_session.go +++ b/weed/mq/agent/agent_grpc_pub_session.go @@ -13,7 +13,7 @@ import ( func (a *MessageQueueAgent) StartPublishSession(ctx context.Context, req *mq_agent_pb.StartPublishSessionRequest) (*mq_agent_pb.StartPublishSessionResponse, error) { sessionId := rand.Int64() - topicPublisher := pub_client.NewTopicPublisher( + topicPublisher, err := pub_client.NewTopicPublisher( &pub_client.PublisherConfiguration{ Topic: topic.NewTopic(req.Topic.Namespace, req.Topic.Name), PartitionCount: req.PartitionCount, @@ -21,6 +21,9 @@ func (a *MessageQueueAgent) StartPublishSession(ctx context.Context, req *mq_age PublisherName: req.PublisherName, RecordType: req.RecordType, }) + if err != nil { + return nil, err + } a.publishersLock.Lock() // remove inactive publishers to avoid memory leak diff --git a/weed/mq/agent/agent_grpc_publish.go b/weed/mq/agent/agent_grpc_publish.go index 485e1d24e..dc3b5ee35 100644 --- a/weed/mq/agent/agent_grpc_publish.go +++ b/weed/mq/agent/agent_grpc_publish.go @@ -6,7 +6,7 @@ import ( "time" ) -func (a *MessageQueueAgent) PublishRecordRequest(stream mq_agent_pb.SeaweedMessagingAgent_PublishRecordServer) error { +func (a *MessageQueueAgent) PublishRecord(stream mq_agent_pb.SeaweedMessagingAgent_PublishRecordServer) error { m, err := stream.Recv() if err != nil { return err @@ -29,7 +29,7 @@ func (a *MessageQueueAgent) PublishRecordRequest(stream mq_agent_pb.SeaweedMessa } for { - m, err := stream.Recv() + m, err = stream.Recv() if err != nil { return err } diff --git a/weed/mq/agent/agent_grpc_subscribe.go b/weed/mq/agent/agent_grpc_subscribe.go index feb5bd47c..5706a3774 100644 --- a/weed/mq/agent/agent_grpc_subscribe.go +++ b/weed/mq/agent/agent_grpc_subscribe.go @@ -11,7 +11,7 @@ import ( "time" ) -func (a *MessageQueueAgent) SubscribeRecordRequest(stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordServer) error { +func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordServer) error { // the first message is the subscribe request // it should only contain the session id m, err := stream.Recv() diff --git a/weed/mq/agent/agent_server.go b/weed/mq/agent/agent_server.go index 6fc61bbdb..f98c62d97 100644 --- a/weed/mq/agent/agent_server.go +++ b/weed/mq/agent/agent_server.go @@ -32,11 +32,15 @@ type MessageQueueAgent struct { func NewMessageQueueAgent(option *MessageQueueAgentOptions, grpcDialOption grpc.DialOption) *MessageQueueAgent { - // check masters to list all brokers + // initialize brokers which may change later + var brokers []pb.ServerAddress + for _, broker := range option.SeedBrokers { + brokers = append(brokers, broker) + } return &MessageQueueAgent{ option: option, - brokers: []pb.ServerAddress{}, + brokers: brokers, grpcDialOption: grpcDialOption, publishers: make(map[SessionId]*SessionEntry[*pub_client.TopicPublisher]), subscribers: make(map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]), diff --git a/weed/mq/client/agent_client/publish_session.go b/weed/mq/client/agent_client/publish_session.go index a26ded32e..9337c06ab 100644 --- a/weed/mq/client/agent_client/publish_session.go +++ b/weed/mq/client/agent_client/publish_session.go @@ -2,13 +2,12 @@ package agent_client import ( "context" - "crypto/tls" "fmt" "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "google.golang.org/grpc" - "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" ) type PublishSession struct { @@ -22,7 +21,7 @@ type PublishSession struct { func NewPublishSession(agentAddress string, topicSchema *schema.Schema, partitionCount int, publisherName string) (*PublishSession, error) { // call local agent grpc server to create a new session - clientConn, err := grpc.NewClient(agentAddress, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) + clientConn, err := grpc.NewClient(agentAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err) } diff --git a/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go b/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go index b4d07ae02..801374244 100644 --- a/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go +++ b/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go @@ -54,7 +54,10 @@ func main() { Brokers: strings.Split(*seedBrokers, ","), PublisherName: *clientName, } - publisher := pub_client.NewTopicPublisher(config) + publisher, err := pub_client.NewTopicPublisher(config) + if err != nil { + log.Fatalf("Failed to create publisher: %v", err) + } startTime := time.Now() diff --git a/weed/mq/client/cmd/weed_pub_record/publisher_record.go b/weed/mq/client/cmd/weed_pub_record/publisher_record.go index 9b28200bc..1efc2ea33 100644 --- a/weed/mq/client/cmd/weed_pub_record/publisher_record.go +++ b/weed/mq/client/cmd/weed_pub_record/publisher_record.go @@ -114,7 +114,10 @@ func main() { PublisherName: *clientName, RecordType: recordType, } - publisher := pub_client.NewTopicPublisher(config) + publisher, err := pub_client.NewTopicPublisher(config) + if err != nil { + log.Fatalf("Failed to create publisher: %v", err) + } startTime := time.Now() diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index 04c2ebe08..f95d00602 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -34,8 +34,8 @@ type TopicPublisher struct { jobs []*EachPartitionPublishJob } -func NewTopicPublisher(config *PublisherConfiguration) *TopicPublisher { - tp := &TopicPublisher{ +func NewTopicPublisher(config *PublisherConfiguration) (tp *TopicPublisher, err error) { + tp = &TopicPublisher{ partition2Buffer: interval.NewSearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage]](func(a, b int32) int { return int(a - b) }), @@ -46,7 +46,7 @@ func NewTopicPublisher(config *PublisherConfiguration) *TopicPublisher { wg := sync.WaitGroup{} wg.Add(1) go func() { - if err := tp.startSchedulerThread(&wg); err != nil { + if err = tp.startSchedulerThread(&wg); err != nil { log.Println(err) return } @@ -54,7 +54,7 @@ func NewTopicPublisher(config *PublisherConfiguration) *TopicPublisher { wg.Wait() - return tp + return } func (p *TopicPublisher) Shutdown() error { diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index df2270b2c..a768fa7f8 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -7,7 +7,9 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" + "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" "log" "sort" @@ -33,6 +35,7 @@ type EachPartitionPublishJob struct { func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error { if err := p.doConfigureTopic(); err != nil { + wg.Done() return fmt.Errorf("configure topic %s: %v", p.config.Topic, err) } @@ -111,6 +114,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb. go func(job *EachPartitionPublishJob) { defer job.wg.Done() if err := p.doPublishToPartition(job); err != nil { + log.Printf("publish to %s partition %v: %v", p.config.Topic, job.Partition, err) errChan <- EachPartitionError{assignment, err, generation} } }(job) @@ -126,7 +130,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition) - grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption) + grpcConnection, err := grpc.NewClient(job.LeaderBroker, grpc.WithTransportCredentials(insecure.NewCredentials()), p.grpcDialOption) if err != nil { return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err) } @@ -225,7 +229,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro func (p *TopicPublisher) doConfigureTopic() (err error) { if len(p.config.Brokers) == 0 { - return fmt.Errorf("no bootstrap brokers") + return fmt.Errorf("topic configuring found no bootstrap brokers") } var lastErr error for _, brokerAddress := range p.config.Brokers { @@ -256,7 +260,7 @@ func (p *TopicPublisher) doConfigureTopic() (err error) { func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) { if len(p.config.Brokers) == 0 { - return nil, fmt.Errorf("no bootstrap brokers") + return nil, fmt.Errorf("lookup found no bootstrap brokers") } var lastErr error for _, brokerAddress := range p.config.Brokers {