From dd805e789d32584ec806e0b0d3e940ccc4aa4d30 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 24 Feb 2025 00:15:04 -0800 Subject: [PATCH] pub messages --- .gitignore | 1 + docker/Makefile | 2 ++ docker/compose/local-mq-test.yml | 11 ++++++++--- weed/command/command.go | 1 + weed/mq/client/agent_client/publish_session.go | 4 +++- 5 files changed, 15 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 89f3b8a0c..bee3563b4 100644 --- a/.gitignore +++ b/.gitignore @@ -99,3 +99,4 @@ docker/weed_pub_kv docker/weed_pub_record docker/weed_sub_kv docker/weed_sub_record +docker/agent_sub_record diff --git a/docker/Makefile b/docker/Makefile index b4a61fb01..61ec70917 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -13,6 +13,8 @@ binary: cd ../weed/mq/client/cmd/weed_pub_record && CGO_ENABLED=$(cgo) GOOS=linux go build && mv weed_pub_record ../../../../../docker/ cd ../weed/mq/client/cmd/weed_sub_kv && CGO_ENABLED=$(cgo) GOOS=linux go build && mv weed_sub_kv ../../../../../docker/ cd ../weed/mq/client/cmd/weed_sub_record && CGO_ENABLED=$(cgo) GOOS=linux go build && mv weed_sub_record ../../../../../docker/ + cd ../weed/mq/client/cmd/agent_pub_record && CGO_ENABLED=$(cgo) GOOS=linux go build && mv agent_pub_record ../../../../../docker/ + cd ../weed/mq/client/cmd/agent_sub_record && CGO_ENABLED=$(cgo) GOOS=linux go build && mv agent_sub_record ../../../../../docker/ binary_race: options = -race binary_race: cgo = 1 diff --git a/docker/compose/local-mq-test.yml b/docker/compose/local-mq-test.yml index d4b071039..4149e38f1 100644 --- a/docker/compose/local-mq-test.yml +++ b/docker/compose/local-mq-test.yml @@ -1,5 +1,3 @@ -version: '3.9' - services: server: image: chrislusf/seaweedfs:local @@ -19,9 +17,16 @@ services: depends_on: server: condition: service_healthy + mq_agent: + image: chrislusf/seaweedfs:local + ports: + - 16777:16777 + command: "mq.agent -broker=mq_broker:17777" + depends_on: + - mq_broker mq_client: image: chrislusf/seaweedfs:local # run a custom command instead of entrypoint command: "ls -al" depends_on: - - mq_broker + - mq_agent diff --git a/weed/command/command.go b/weed/command/command.go index 9fdf057e7..33cdb12d1 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -32,6 +32,7 @@ var Commands = []*Command{ cmdMaster, cmdMasterFollower, cmdMount, + cmdMqAgent, cmdMqBroker, cmdS3, cmdScaffold, diff --git a/weed/mq/client/agent_client/publish_session.go b/weed/mq/client/agent_client/publish_session.go index 45d46f553..6d5c7bdf8 100644 --- a/weed/mq/client/agent_client/publish_session.go +++ b/weed/mq/client/agent_client/publish_session.go @@ -2,12 +2,14 @@ package agent_client import ( "context" + "crypto/tls" "fmt" "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) type PublishSession struct { @@ -21,7 +23,7 @@ type PublishSession struct { func NewPublishSession(agentAddress string, topicSchema *schema.Schema, partitionCount int, publisherName string) (*PublishSession, error) { // call local agent grpc server to create a new session - clientConn, err := pb.GrpcDial(context.Background(), agentAddress, true, grpc.WithInsecure()) + clientConn, err := pb.GrpcDial(context.Background(), agentAddress, true, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) if err != nil { return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err) }