Browse Source

can publish records via agent

mq
chrislu 7 days ago
parent
commit
b5a53314cb
  1. 3
      docker/Makefile
  2. 2
      docker/compose/local-mq-test.yml
  3. 4
      weed/Makefile
  4. 6
      weed/command/mq_agent.go
  5. 5
      weed/mq/agent/agent_grpc_pub_session.go
  6. 4
      weed/mq/agent/agent_grpc_publish.go
  7. 2
      weed/mq/agent/agent_grpc_subscribe.go
  8. 8
      weed/mq/agent/agent_server.go
  9. 5
      weed/mq/client/agent_client/publish_session.go
  10. 5
      weed/mq/client/cmd/weed_pub_kv/publisher_kv.go
  11. 5
      weed/mq/client/cmd/weed_pub_record/publisher_record.go
  12. 8
      weed/mq/client/pub_client/publisher.go
  13. 10
      weed/mq/client/pub_client/scheduler.go

3
docker/Makefile

@ -99,6 +99,9 @@ s3tests: build s3tests_build
brokers: build brokers: build
docker compose -f compose/local-brokers-compose.yml -p seaweedfs up docker compose -f compose/local-brokers-compose.yml -p seaweedfs up
agent: build
docker compose -f compose/local-mq-test.yml -p seaweedfs up
filer_etcd: build filer_etcd: build
docker stack deploy -c compose/swarm-etcd.yml fs docker stack deploy -c compose/swarm-etcd.yml fs

2
docker/compose/local-mq-test.yml

@ -21,7 +21,7 @@ services:
image: chrislusf/seaweedfs:local image: chrislusf/seaweedfs:local
ports: ports:
- 16777:16777 - 16777:16777
command: "mq.agent -broker=mq_broker:17777"
command: "mq.agent -broker=mq_broker:17777 -port=16777"
depends_on: depends_on:
- mq_broker - mq_broker
mq_client: mq_client:

4
weed/Makefile

@ -41,6 +41,10 @@ debug_mq:
go build -gcflags="all=-N -l" go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec ./weed -- -v=4 mq.broker dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec ./weed -- -v=4 mq.broker
debug_mq_agent:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec ./weed -- -v=4 mq.agent -broker=localhost:17777
debug_filer_copy: debug_filer_copy:
go build -gcflags="all=-N -l" go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec ./weed -- -v=4 filer.backup -filer=localhost:8888 -filerProxy -timeAgo=10h dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec ./weed -- -v=4 filer.backup -filer=localhost:8888 -filerProxy -timeAgo=10h

6
weed/command/mq_agent.go

@ -26,12 +26,12 @@ type MessageQueueAgentOptions struct {
func init() { func init() {
cmdMqAgent.Run = runMqAgent // break init cycle cmdMqAgent.Run = runMqAgent // break init cycle
mqAgentOptions.brokersString = cmdMqAgent.Flag.String("broker", "localhost:17777", "comma-separated message queue brokers") mqAgentOptions.brokersString = cmdMqAgent.Flag.String("broker", "localhost:17777", "comma-separated message queue brokers")
mqAgentOptions.ip = cmdMqAgent.Flag.String("ip", "localhost", "message queue agent host address")
mqAgentOptions.ip = cmdMqAgent.Flag.String("ip", "", "message queue agent host address")
mqAgentOptions.port = cmdMqAgent.Flag.Int("port", 16777, "message queue agent gRPC server port") mqAgentOptions.port = cmdMqAgent.Flag.Int("port", 16777, "message queue agent gRPC server port")
} }
var cmdMqAgent = &Command{ var cmdMqAgent = &Command{
UsageLine: "mq.agent [-port=6377] [-master=<ip:port>]",
UsageLine: "mq.agent [-port=16777] [-master=<ip:port>]",
Short: "<WIP> start a message queue agent", Short: "<WIP> start a message queue agent",
Long: `start a message queue agent Long: `start a message queue agent
@ -64,7 +64,7 @@ func (mqAgentOpt *MessageQueueAgentOptions) startQueueAgent() bool {
if err != nil { if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", *mqAgentOpt.port, err) glog.Fatalf("failed to listen on grpc port %d: %v", *mqAgentOpt.port, err)
} }
glog.Info("Start Seaweed Message Queue Agent on ", grpcL.Addr().String())
glog.Infof("Start Seaweed Message Queue Agent on %s:%d", *mqAgentOpt.ip, *mqAgentOpt.port)
grpcS := pb.NewGrpcServer() grpcS := pb.NewGrpcServer()
mq_agent_pb.RegisterSeaweedMessagingAgentServer(grpcS, agentServer) mq_agent_pb.RegisterSeaweedMessagingAgentServer(grpcS, agentServer)
reflection.Register(grpcS) reflection.Register(grpcS)

5
weed/mq/agent/agent_grpc_pub_session.go

@ -13,7 +13,7 @@ import (
func (a *MessageQueueAgent) StartPublishSession(ctx context.Context, req *mq_agent_pb.StartPublishSessionRequest) (*mq_agent_pb.StartPublishSessionResponse, error) { func (a *MessageQueueAgent) StartPublishSession(ctx context.Context, req *mq_agent_pb.StartPublishSessionRequest) (*mq_agent_pb.StartPublishSessionResponse, error) {
sessionId := rand.Int64() sessionId := rand.Int64()
topicPublisher := pub_client.NewTopicPublisher(
topicPublisher, err := pub_client.NewTopicPublisher(
&pub_client.PublisherConfiguration{ &pub_client.PublisherConfiguration{
Topic: topic.NewTopic(req.Topic.Namespace, req.Topic.Name), Topic: topic.NewTopic(req.Topic.Namespace, req.Topic.Name),
PartitionCount: req.PartitionCount, PartitionCount: req.PartitionCount,
@ -21,6 +21,9 @@ func (a *MessageQueueAgent) StartPublishSession(ctx context.Context, req *mq_age
PublisherName: req.PublisherName, PublisherName: req.PublisherName,
RecordType: req.RecordType, RecordType: req.RecordType,
}) })
if err != nil {
return nil, err
}
a.publishersLock.Lock() a.publishersLock.Lock()
// remove inactive publishers to avoid memory leak // remove inactive publishers to avoid memory leak

4
weed/mq/agent/agent_grpc_publish.go

@ -6,7 +6,7 @@ import (
"time" "time"
) )
func (a *MessageQueueAgent) PublishRecordRequest(stream mq_agent_pb.SeaweedMessagingAgent_PublishRecordServer) error {
func (a *MessageQueueAgent) PublishRecord(stream mq_agent_pb.SeaweedMessagingAgent_PublishRecordServer) error {
m, err := stream.Recv() m, err := stream.Recv()
if err != nil { if err != nil {
return err return err
@ -29,7 +29,7 @@ func (a *MessageQueueAgent) PublishRecordRequest(stream mq_agent_pb.SeaweedMessa
} }
for { for {
m, err := stream.Recv()
m, err = stream.Recv()
if err != nil { if err != nil {
return err return err
} }

2
weed/mq/agent/agent_grpc_subscribe.go

@ -11,7 +11,7 @@ import (
"time" "time"
) )
func (a *MessageQueueAgent) SubscribeRecordRequest(stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordServer) error {
func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordServer) error {
// the first message is the subscribe request // the first message is the subscribe request
// it should only contain the session id // it should only contain the session id
m, err := stream.Recv() m, err := stream.Recv()

8
weed/mq/agent/agent_server.go

@ -32,11 +32,15 @@ type MessageQueueAgent struct {
func NewMessageQueueAgent(option *MessageQueueAgentOptions, grpcDialOption grpc.DialOption) *MessageQueueAgent { func NewMessageQueueAgent(option *MessageQueueAgentOptions, grpcDialOption grpc.DialOption) *MessageQueueAgent {
// check masters to list all brokers
// initialize brokers which may change later
var brokers []pb.ServerAddress
for _, broker := range option.SeedBrokers {
brokers = append(brokers, broker)
}
return &MessageQueueAgent{ return &MessageQueueAgent{
option: option, option: option,
brokers: []pb.ServerAddress{},
brokers: brokers,
grpcDialOption: grpcDialOption, grpcDialOption: grpcDialOption,
publishers: make(map[SessionId]*SessionEntry[*pub_client.TopicPublisher]), publishers: make(map[SessionId]*SessionEntry[*pub_client.TopicPublisher]),
subscribers: make(map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]), subscribers: make(map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]),

5
weed/mq/client/agent_client/publish_session.go

@ -2,13 +2,12 @@ package agent_client
import ( import (
"context" "context"
"crypto/tls"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
) )
type PublishSession struct { type PublishSession struct {
@ -22,7 +21,7 @@ type PublishSession struct {
func NewPublishSession(agentAddress string, topicSchema *schema.Schema, partitionCount int, publisherName string) (*PublishSession, error) { func NewPublishSession(agentAddress string, topicSchema *schema.Schema, partitionCount int, publisherName string) (*PublishSession, error) {
// call local agent grpc server to create a new session // call local agent grpc server to create a new session
clientConn, err := grpc.NewClient(agentAddress, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
clientConn, err := grpc.NewClient(agentAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil { if err != nil {
return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err) return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err)
} }

5
weed/mq/client/cmd/weed_pub_kv/publisher_kv.go

@ -54,7 +54,10 @@ func main() {
Brokers: strings.Split(*seedBrokers, ","), Brokers: strings.Split(*seedBrokers, ","),
PublisherName: *clientName, PublisherName: *clientName,
} }
publisher := pub_client.NewTopicPublisher(config)
publisher, err := pub_client.NewTopicPublisher(config)
if err != nil {
log.Fatalf("Failed to create publisher: %v", err)
}
startTime := time.Now() startTime := time.Now()

5
weed/mq/client/cmd/weed_pub_record/publisher_record.go

@ -114,7 +114,10 @@ func main() {
PublisherName: *clientName, PublisherName: *clientName,
RecordType: recordType, RecordType: recordType,
} }
publisher := pub_client.NewTopicPublisher(config)
publisher, err := pub_client.NewTopicPublisher(config)
if err != nil {
log.Fatalf("Failed to create publisher: %v", err)
}
startTime := time.Now() startTime := time.Now()

8
weed/mq/client/pub_client/publisher.go

@ -34,8 +34,8 @@ type TopicPublisher struct {
jobs []*EachPartitionPublishJob jobs []*EachPartitionPublishJob
} }
func NewTopicPublisher(config *PublisherConfiguration) *TopicPublisher {
tp := &TopicPublisher{
func NewTopicPublisher(config *PublisherConfiguration) (tp *TopicPublisher, err error) {
tp = &TopicPublisher{
partition2Buffer: interval.NewSearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage]](func(a, b int32) int { partition2Buffer: interval.NewSearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage]](func(a, b int32) int {
return int(a - b) return int(a - b)
}), }),
@ -46,7 +46,7 @@ func NewTopicPublisher(config *PublisherConfiguration) *TopicPublisher {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func() {
if err := tp.startSchedulerThread(&wg); err != nil {
if err = tp.startSchedulerThread(&wg); err != nil {
log.Println(err) log.Println(err)
return return
} }
@ -54,7 +54,7 @@ func NewTopicPublisher(config *PublisherConfiguration) *TopicPublisher {
wg.Wait() wg.Wait()
return tp
return
} }
func (p *TopicPublisher) Shutdown() error { func (p *TopicPublisher) Shutdown() error {

10
weed/mq/client/pub_client/scheduler.go

@ -7,7 +7,9 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
"google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"log" "log"
"sort" "sort"
@ -33,6 +35,7 @@ type EachPartitionPublishJob struct {
func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error { func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
if err := p.doConfigureTopic(); err != nil { if err := p.doConfigureTopic(); err != nil {
wg.Done()
return fmt.Errorf("configure topic %s: %v", p.config.Topic, err) return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
} }
@ -111,6 +114,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.
go func(job *EachPartitionPublishJob) { go func(job *EachPartitionPublishJob) {
defer job.wg.Done() defer job.wg.Done()
if err := p.doPublishToPartition(job); err != nil { if err := p.doPublishToPartition(job); err != nil {
log.Printf("publish to %s partition %v: %v", p.config.Topic, job.Partition, err)
errChan <- EachPartitionError{assignment, err, generation} errChan <- EachPartitionError{assignment, err, generation}
} }
}(job) }(job)
@ -126,7 +130,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition) log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption)
grpcConnection, err := grpc.NewClient(job.LeaderBroker, grpc.WithTransportCredentials(insecure.NewCredentials()), p.grpcDialOption)
if err != nil { if err != nil {
return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err) return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err)
} }
@ -225,7 +229,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
func (p *TopicPublisher) doConfigureTopic() (err error) { func (p *TopicPublisher) doConfigureTopic() (err error) {
if len(p.config.Brokers) == 0 { if len(p.config.Brokers) == 0 {
return fmt.Errorf("no bootstrap brokers")
return fmt.Errorf("topic configuring found no bootstrap brokers")
} }
var lastErr error var lastErr error
for _, brokerAddress := range p.config.Brokers { for _, brokerAddress := range p.config.Brokers {
@ -256,7 +260,7 @@ func (p *TopicPublisher) doConfigureTopic() (err error) {
func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) { func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
if len(p.config.Brokers) == 0 { if len(p.config.Brokers) == 0 {
return nil, fmt.Errorf("no bootstrap brokers")
return nil, fmt.Errorf("lookup found no bootstrap brokers")
} }
var lastErr error var lastErr error
for _, brokerAddress := range p.config.Brokers { for _, brokerAddress := range p.config.Brokers {

Loading…
Cancel
Save