From 279cb2b85e923e3012bd8a68d21dad1d28a07588 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 May 2024 20:28:19 -0700 Subject: [PATCH] consumer acks received messages --- weed/mq/broker/broker_grpc_sub.go | 30 ++++++++++++--- .../broker_topic_partition_read_write.go | 3 +- .../sub_client/connect_to_sub_coordinator.go | 38 +++++++++++++------ weed/mq/client/sub_client/subscribe.go | 7 +++- 4 files changed, 57 insertions(+), 21 deletions(-) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 02488b2b0..f2214b9df 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -8,20 +8,24 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "io" "time" ) -func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) (err error) { +func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error { - ctx := stream.Context() - clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId) - - initMessage := req.GetInit() - if initMessage == nil { + req, err := stream.Recv() + if err != nil { + return err + } + if req.GetInit() == nil { glog.Errorf("missing init message") return fmt.Errorf("missing init message") } + ctx := stream.Context() + clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId) + t := topic.FromPbTopic(req.GetInit().Topic) partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition()) @@ -52,6 +56,20 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest startPosition = getRequestPosition(req.GetInit().GetPartitionOffset()) } + go func() { + for { + ack, err := stream.Recv() + if err != nil { + if err == io.EOF { + break + } + glog.V(0).Infof("topic %v partition %v subscriber %s error: %v", t, partition, clientName, err) + break + } + println(clientName, "ack =>", ack.GetAck().Sequence) + } + }() + return localTopicPartition.Subscribe(clientName, startPosition, func() bool { if !isConnected { return false diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go index 50470f879..2442709d6 100644 --- a/weed/mq/broker/broker_topic_partition_read_write.go +++ b/weed/mq/broker/broker_topic_partition_read_write.go @@ -49,8 +49,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs) } - println("flushing at", logBuffer.LastFlushTsNs, "to", targetFile, "size", len(buf)) - + glog.V(0).Infof("flushing at %s to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf)) } } diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index 328968c89..e62ce5265 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -88,7 +88,13 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error { // connect to the partition broker return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { - subscribeClient, err := client.SubscribeMessage(context.Background(), &mq_pb.SubscribeMessageRequest{ + + subscribeClient, err := client.SubscribeMessage(context.Background()) + if err != nil { + return fmt.Errorf("create subscribe client: %v", err) + } + + if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Init{ Init: &mq_pb.SubscribeMessageRequest_InitMessage{ ConsumerGroup: sub.SubscriberConfig.ConsumerGroup, @@ -103,25 +109,32 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig FollowerBrokers: assigned.FollowerBrokers, }, }, - }) - - if err != nil { - return fmt.Errorf("create subscribe client: %v", err) + });err != nil { + glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err) } - glog.V(0).Infof("subscriber %s/%s connected to partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) + glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker) if sub.OnCompletionFunc != nil { defer sub.OnCompletionFunc() } + + partitionOffsetChan:= make(chan int64, 1024) + defer func() { - subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ - Message: &mq_pb.SubscribeMessageRequest_Ack{ - Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ - Sequence: 0, + close(partitionOffsetChan) + }() + + go func() { + for ack := range partitionOffsetChan { + subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Ack{ + Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ + Sequence: ack, + }, }, - }, - }) + }) + } subscribeClient.CloseSend() }() @@ -142,6 +155,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig return fmt.Errorf("process error: %v", processErr) } sub.alreadyProcessedTsNs = m.Data.TsNs + partitionOffsetChan <- m.Data.TsNs if !shouldContinue { return nil } diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index b788faeb5..950d2214c 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -73,15 +73,20 @@ func (sub *TopicSubscriber) waitUntilNoOverlappingPartitionInFlight(topicPartiti for foundOverlapping { sub.activeProcessorsLock.Lock() foundOverlapping = false + var overlappedPartition topic.Partition for partition, _ := range sub.activeProcessors { if partition.Overlaps(topicPartition) { + if partition.Equals(topicPartition) { + continue + } foundOverlapping = true + overlappedPartition = partition break } } sub.activeProcessorsLock.Unlock() if foundOverlapping { - glog.V(0).Infof("subscriber %s/%s waiting for partition %+v to complete", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, topicPartition) + glog.V(0).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition) time.Sleep(1 * time.Second) } }