From dab54543329c95361ab1b502c747caece5548c37 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 27 Mar 2024 23:10:24 -0700 Subject: [PATCH] publish and send to follower --- weed/mq/broker/broker_grpc_pub.go | 17 ++--------------- weed/mq/topic/local_partition.go | 25 +++++++++++++++++++++---- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 6e8d05a33..86e689b5a 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -143,21 +143,8 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } // send to the local partition - localTopicPartition.Publish(dataMessage) - receivedSequence = dataMessage.TsNs - - // maybe send to the follower - if localTopicPartition.FollowerStream != nil { - println("recv", string(dataMessage.Key), dataMessage.TsNs) - if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ - Message: &mq_pb.PublishFollowMeRequest_Data{ - Data: dataMessage, - }, - }); followErr != nil { - return followErr - } - } else { - atomic.StoreInt64(&localTopicPartition.AckTsNs, receivedSequence) + if err = localTopicPartition.Publish(dataMessage); err != nil { + return fmt.Errorf("topic %v partition %v publish error: %v", initMessage.Topic, initMessage.Partition, err) } } diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 9bedc5a15..3e5963855 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -32,6 +32,7 @@ type LocalPartition struct { FollowerStream mq_pb.SeaweedMessaging_PublishFollowMeClient FollowerGrpcConnection *grpc.ClientConn + follower string } var TIME_FORMAT = "2006-01-02-15-04-05" @@ -54,8 +55,24 @@ func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb. return lp } -func (p *LocalPartition) Publish(message *mq_pb.DataMessage) { +func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error { p.LogBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano()) + + // maybe send to the follower + if p.FollowerStream != nil { + println("recv", string(message.Key), message.TsNs) + if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Data{ + Data: message, + }, + }); followErr != nil { + return fmt.Errorf("send to follower %s: %v", p.follower, followErr) + } + } else { + atomic.StoreInt64(&p.AckTsNs, message.TsNs) + } + + return nil } func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.MessagePosition, @@ -137,11 +154,11 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa return nil } - follower := initMessage.FollowerBrokers[0] + p.follower = initMessage.FollowerBrokers[0] ctx := context.Background() - p.FollowerGrpcConnection, err = pb.GrpcDial(ctx, follower, true, grpcDialOption) + p.FollowerGrpcConnection, err = pb.GrpcDial(ctx, p.follower, true, grpcDialOption) if err != nil { - return fmt.Errorf("fail to dial %s: %v", follower, err) + return fmt.Errorf("fail to dial %s: %v", p.follower, err) } followerClient := mq_pb.NewSeaweedMessagingClient(p.FollowerGrpcConnection) p.FollowerStream, err = followerClient.PublishFollowMe(ctx)