From 841fafd0a81e4d85d6156ff55d8e9d02d6efe7c3 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 27 Jan 2024 23:43:22 -0800 Subject: [PATCH] publish to input buffer currently, the input buffer may not exist when start to publish --- weed/mq/broker/broker_grpc_lookup.go | 1 - .../mq/broker/broker_topic_conf_read_write.go | 5 +- weed/mq/client/cmd/weed_pub/publisher.go | 14 +-- weed/mq/client/pub_client/lookup.go | 86 ------------------- weed/mq/client/pub_client/publish.go | 32 ++----- weed/mq/client/pub_client/publisher.go | 25 +++--- weed/mq/client/pub_client/scheduler.go | 74 +++++++++++++++- 7 files changed, 103 insertions(+), 134 deletions(-) delete mode 100644 weed/mq/client/pub_client/lookup.go diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index 4ba1a0f75..6f0485c0c 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -32,7 +32,6 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq ret.Topic = request.Topic if conf, err = b.readTopicConfFromFiler(t); err != nil { glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err) - ret.BrokerPartitionAssignments = conf.BrokerPartitionAssignments } else { err = b.ensureTopicActiveAssignments(t, conf) } diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index 0eeefbdf0..c5d8e3b78 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -37,8 +37,11 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") + if err == filer_pb.ErrNotFound { + return err + } if err != nil { - return fmt.Errorf("read topic %v partition %v conf: %v", t, err) + return fmt.Errorf("read topic.conf of %v: %v", t, err) } // parse into filer conf object conf = &mq_pb.ConfigureTopicResponse{} diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index 59469e66b..3ac037973 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -30,7 +30,7 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) { fmt.Println(err) break } - // println("Published", string(key), string(value)) + println("Published", string(key), string(value)) } elapsed := time.Since(startTime) log.Printf("Publisher %d finished in %s", id, elapsed) @@ -43,11 +43,13 @@ func main() { CreateTopicPartitionCount: int32(*partitionCount), } publisher := pub_client.NewTopicPublisher(*namespace, *topic, config) - brokers := strings.Split(*seedBrokers, ",") - if err := publisher.Connect(brokers); err != nil { - fmt.Println(err) - return - } + go func() { + brokers := strings.Split(*seedBrokers, ",") + if err := publisher.StartSchedulerThread(brokers); err != nil { + fmt.Println(err) + return + } + }() startTime := time.Now() diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go deleted file mode 100644 index ce65bbc92..000000000 --- a/weed/mq/client/pub_client/lookup.go +++ /dev/null @@ -1,86 +0,0 @@ -package pub_client - -import ( - "context" - "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" -) - -func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error { - if p.config.CreateTopic { - err := pb.WithBrokerGrpcClient(true, - brokerAddress, - p.grpcDialOption, - func(client mq_pb.SeaweedMessagingClient) error { - _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ - Topic: &mq_pb.Topic{ - Namespace: p.namespace, - Name: p.topic, - }, - PartitionCount: p.config.CreateTopicPartitionCount, - }) - return err - }) - if err != nil { - return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err) - } - } - - err := pb.WithBrokerGrpcClient(true, - brokerAddress, - p.grpcDialOption, - func(client mq_pb.SeaweedMessagingClient) error { - lookupResp, err := client.LookupTopicBrokers(context.Background(), - &mq_pb.LookupTopicBrokersRequest{ - Topic: &mq_pb.Topic{ - Namespace: p.namespace, - Name: p.topic, - }, - }) - glog.V(0).Infof("lookup1 topic %s/%s: %v", p.namespace, p.topic, lookupResp) - if p.config.CreateTopic && err != nil { - _, err = client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ - Topic: &mq_pb.Topic{ - Namespace: p.namespace, - Name: p.topic, - }, - PartitionCount: p.config.CreateTopicPartitionCount, - }) - if err != nil { - return err - } - lookupResp, err = client.LookupTopicBrokers(context.Background(), - &mq_pb.LookupTopicBrokersRequest{ - Topic: &mq_pb.Topic{ - Namespace: p.namespace, - Name: p.topic, - }, - }) - glog.V(0).Infof("lookup2 topic %s/%s: %v", p.namespace, p.topic, lookupResp) - } - if err != nil { - return err - } - - for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments { - glog.V(0).Infof("topic %s/%s partition %v leader %s followers %v", p.namespace, p.topic, brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker, brokerPartitionAssignment.FollowerBrokers) - // partition => publishClient - publishClient, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker) - if err != nil { - return err - } - p.partition2Broker.Insert( - brokerPartitionAssignment.Partition.RangeStart, - brokerPartitionAssignment.Partition.RangeStop, - publishClient) - } - return nil - }) - - if err != nil { - return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err) - } - return nil -} diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go index 4b0dfade9..3b9817e74 100644 --- a/weed/mq/client/pub_client/publish.go +++ b/weed/mq/client/pub_client/publish.go @@ -7,35 +7,19 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) + func (p *TopicPublisher) Publish(key, value []byte) error { hashKey := util.HashToInt32(key) % pub_balancer.MaxPartitionCount if hashKey < 0 { hashKey = -hashKey } - publishClient, found := p.partition2Broker.Floor(hashKey+1, hashKey+1) + inputBuffer, found := p.partition2Buffer.Floor(hashKey+1, hashKey+1) if !found { - return fmt.Errorf("no broker found for key %d", hashKey) - } - p.Lock() - defer p.Unlock() - // dead lock here - //google.golang.org/grpc/internal/transport.(*writeQuota).get(flowcontrol.go:59) - //google.golang.org/grpc/internal/transport.(*http2Client).Write(http2_client.go:1047) - //google.golang.org/grpc.(*csAttempt).sendMsg(stream.go:1040) - //google.golang.org/grpc.(*clientStream).SendMsg.func2(stream.go:892) - //google.golang.org/grpc.(*clientStream).withRetry(stream.go:752) - //google.golang.org/grpc.(*clientStream).SendMsg(stream.go:894) - //github.com/seaweedfs/seaweedfs/weed/pb/mq_pb.(*seaweedMessagingPublishClient).Send(mq_grpc.pb.go:141) - //github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client.(*TopicPublisher).Publish(publish.go:19) - if err := publishClient.Send(&mq_pb.PublishMessageRequest{ - Message: &mq_pb.PublishMessageRequest_Data{ - Data: &mq_pb.DataMessage{ - Key: key, - Value: value, - }, - }, - }); err != nil { - return fmt.Errorf("send publish request: %v", err) + return fmt.Errorf("no input buffer found for key %d", hashKey) } - return nil + + return inputBuffer.Enqueue(&mq_pb.DataMessage{ + Key: key, + Value: value, + }) } diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index 7dd3ab4d1..be29efa1c 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -1,10 +1,10 @@ package pub_client import ( - "fmt" "github.com/rdleal/intervalst/interval" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "sync" @@ -25,6 +25,7 @@ type TopicPublisher struct { namespace string topic string partition2Broker *interval.SearchTree[*PublishClient, int32] + partition2Buffer *interval.SearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage], int32] grpcDialOption grpc.DialOption sync.Mutex // protects grpc config *PublisherConfiguration @@ -38,25 +39,14 @@ func NewTopicPublisher(namespace, topic string, config *PublisherConfiguration) partition2Broker: interval.NewSearchTree[*PublishClient](func(a, b int32) int { return int(a - b) }), + partition2Buffer: interval.NewSearchTree[*buffered_queue.BufferedQueue[*mq_pb.DataMessage]](func(a, b int32) int { + return int(a - b) + }), grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), config: config, } } -func (p *TopicPublisher) Connect(bootstrapBrokers []string) (err error) { - if len(bootstrapBrokers) == 0 { - return nil - } - for _, b := range bootstrapBrokers { - err = p.doLookupAndConnect(b) - if err == nil { - return nil - } - fmt.Printf("failed to connect to %s: %v\n\n", b, err) - } - return err -} - func (p *TopicPublisher) Shutdown() error { if clients, found := p.partition2Broker.AllIntersections(0, pub_balancer.MaxPartitionCount); found { @@ -64,6 +54,11 @@ func (p *TopicPublisher) Shutdown() error { client.CloseSend() } } + if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found { + for _, inputBuffer := range inputBuffers { + inputBuffer.CloseInput() + } + } time.Sleep(1100 * time.Millisecond) return nil diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index 226bc7272..e617af09f 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -6,6 +6,10 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "log" "sort" "sync" "time" @@ -22,13 +26,16 @@ type EachPartitionPublishJob struct { stopChan chan bool wg sync.WaitGroup generation int + inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage] } func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string) error { if err := p.doEnsureConfigureTopic(bootstrapBrokers); err != nil { - return err + return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err) } + log.Printf("start scheduler thread for topic %s/%s", p.namespace, p.topic) + generation := 0 var errChan chan EachPartitionError for { @@ -92,6 +99,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb. BrokerPartitionAssignment: assignment, stopChan: make(chan bool, 1), generation: generation, + inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024, true), } job.wg.Add(1) go func(job *EachPartitionPublishJob) { @@ -101,12 +109,76 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb. } }(job) jobs = append(jobs, job) + // TODO assuming this is not re-configured so the partitions are fixed. + // better just re-use the existing job + p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue) } p.jobs = jobs } func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error { + log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition) + + grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption) + if err != nil { + return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err) + } + brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) + stream, err := brokerClient.PublishMessage(context.Background()) + if err != nil { + return fmt.Errorf("create publish client: %v", err) + } + publishClient := &PublishClient{ + SeaweedMessaging_PublishMessageClient: stream, + Broker: job.LeaderBroker, + } + if err = publishClient.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Init{ + Init: &mq_pb.PublishMessageRequest_InitMessage{ + Topic: &mq_pb.Topic{ + Namespace: p.namespace, + Name: p.topic, + }, + Partition: job.Partition, + AckInterval: 128, + }, + }, + }); err != nil { + return fmt.Errorf("send init message: %v", err) + } + resp, err := stream.Recv() + if err != nil { + return fmt.Errorf("recv init response: %v", err) + } + if resp.Error != "" { + return fmt.Errorf("init response error: %v", resp.Error) + } + + go func() { + for { + _, err := publishClient.Recv() + if err != nil { + e, ok := status.FromError(err) + if ok && e.Code() == codes.Unknown && e.Message() == "EOF" { + return + } + publishClient.Err = err + fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err) + return + } + } + }() + + for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() { + if err := publishClient.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Data{ + Data: data, + }, + }); err != nil { + return fmt.Errorf("send publish data: %v", err) + } + } return nil }