Browse Source

pub messages

mq
chrislu 1 week ago
parent
commit
dd805e789d
  1. 1
      .gitignore
  2. 2
      docker/Makefile
  3. 11
      docker/compose/local-mq-test.yml
  4. 1
      weed/command/command.go
  5. 4
      weed/mq/client/agent_client/publish_session.go

1
.gitignore

@ -99,3 +99,4 @@ docker/weed_pub_kv
docker/weed_pub_record
docker/weed_sub_kv
docker/weed_sub_record
docker/agent_sub_record

2
docker/Makefile

@ -13,6 +13,8 @@ binary:
cd ../weed/mq/client/cmd/weed_pub_record && CGO_ENABLED=$(cgo) GOOS=linux go build && mv weed_pub_record ../../../../../docker/
cd ../weed/mq/client/cmd/weed_sub_kv && CGO_ENABLED=$(cgo) GOOS=linux go build && mv weed_sub_kv ../../../../../docker/
cd ../weed/mq/client/cmd/weed_sub_record && CGO_ENABLED=$(cgo) GOOS=linux go build && mv weed_sub_record ../../../../../docker/
cd ../weed/mq/client/cmd/agent_pub_record && CGO_ENABLED=$(cgo) GOOS=linux go build && mv agent_pub_record ../../../../../docker/
cd ../weed/mq/client/cmd/agent_sub_record && CGO_ENABLED=$(cgo) GOOS=linux go build && mv agent_sub_record ../../../../../docker/
binary_race: options = -race
binary_race: cgo = 1

11
docker/compose/local-mq-test.yml

@ -1,5 +1,3 @@
version: '3.9'
services:
server:
image: chrislusf/seaweedfs:local
@ -19,9 +17,16 @@ services:
depends_on:
server:
condition: service_healthy
mq_agent:
image: chrislusf/seaweedfs:local
ports:
- 16777:16777
command: "mq.agent -broker=mq_broker:17777"
depends_on:
- mq_broker
mq_client:
image: chrislusf/seaweedfs:local
# run a custom command instead of entrypoint
command: "ls -al"
depends_on:
- mq_broker
- mq_agent

1
weed/command/command.go

@ -32,6 +32,7 @@ var Commands = []*Command{
cmdMaster,
cmdMasterFollower,
cmdMount,
cmdMqAgent,
cmdMqBroker,
cmdS3,
cmdScaffold,

4
weed/mq/client/agent_client/publish_session.go

@ -2,12 +2,14 @@ package agent_client
import (
"context"
"crypto/tls"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
type PublishSession struct {
@ -21,7 +23,7 @@ type PublishSession struct {
func NewPublishSession(agentAddress string, topicSchema *schema.Schema, partitionCount int, publisherName string) (*PublishSession, error) {
// call local agent grpc server to create a new session
clientConn, err := pb.GrpcDial(context.Background(), agentAddress, true, grpc.WithInsecure())
clientConn, err := pb.GrpcDial(context.Background(), agentAddress, true, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
if err != nil {
return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err)
}

Loading…
Cancel
Save