diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 1253420ac..6e8d05a33 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/peer" "io" @@ -63,51 +62,11 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis return stream.Send(response) } - // connect to follower brokers - if localTopicPartition.FollowerStream == nil && len(initMessage.FollowerBrokers) > 0 { - follower := initMessage.FollowerBrokers[0] - ctx := context.Background() - localTopicPartition.FollowerGrpcConnection, err = pb.GrpcDial(ctx, follower, true, b.grpcDialOption) - if err != nil { - response.Error = fmt.Sprintf("fail to dial %s: %v", follower, err) - glog.Errorf("fail to dial %s: %v", follower, err) - return stream.Send(response) - } - followerClient := mq_pb.NewSeaweedMessagingClient(localTopicPartition.FollowerGrpcConnection) - localTopicPartition.FollowerStream, err = followerClient.PublishFollowMe(ctx) - if err != nil { - response.Error = fmt.Sprintf("fail to create publish client: %v", err) - glog.Errorf("fail to create publish client: %v", err) - return stream.Send(response) - } - if err = localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ - Message: &mq_pb.PublishFollowMeRequest_Init{ - Init: &mq_pb.PublishFollowMeRequest_InitMessage{ - Topic: initMessage.Topic, - Partition: initMessage.Partition, - }, - }, - }); err != nil { - return err - } - - // start receiving ack from follower - go func() { - defer func() { - println("stop receiving ack from follower") - }() - - for { - ack, err := localTopicPartition.FollowerStream.Recv() - if err != nil { - glog.Errorf("Error receiving follower ack: %v", err) - return - } - atomic.StoreInt64(&localTopicPartition.AckTsNs, ack.AckTsNs) - println("recv ack", ack.AckTsNs) - } - }() + if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil { + response.Error = followerErr.Error() + glog.Errorf("MaybeConnectToFollowers: %v", followerErr) + return stream.Send(response) } var receivedSequence, acknowledgedSequence int64 diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index a6562fd5c..3c902a503 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -1,6 +1,7 @@ package topic import ( + "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -128,6 +129,55 @@ func (p *LocalPartition) WaitUntilNoPublishers() { } } +func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) { + if p.FollowerStream != nil { + return nil + } + if len(initMessage.FollowerBrokers) == 0 { + return nil + } + + follower := initMessage.FollowerBrokers[0] + ctx := context.Background() + p.FollowerGrpcConnection, err = pb.GrpcDial(ctx, follower, true, grpcDialOption) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", follower, err) + } + followerClient := mq_pb.NewSeaweedMessagingClient(p.FollowerGrpcConnection) + p.FollowerStream, err = followerClient.PublishFollowMe(ctx) + if err != nil { + return fmt.Errorf("fail to create publish client: %v", err) + } + if err = p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Init{ + Init: &mq_pb.PublishFollowMeRequest_InitMessage{ + Topic: initMessage.Topic, + Partition: initMessage.Partition, + }, + }, + }); err != nil { + return err + } + + // start receiving ack from follower + go func() { + defer func() { + println("stop receiving ack from follower") + }() + + for { + ack, err := p.FollowerStream.Recv() + if err != nil { + glog.Errorf("Error receiving follower ack: %v", err) + return + } + atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs) + println("recv ack", ack.AckTsNs) + } + }() + return nil +} + func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { if p.MaybeShutdownLocalPartition() { if p.FollowerStream != nil {