diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 3e5963855..5df2535a6 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -30,8 +30,8 @@ type LocalPartition struct { Subscribers *LocalPartitionSubscribers FollowerId int32 - FollowerStream mq_pb.SeaweedMessaging_PublishFollowMeClient - FollowerGrpcConnection *grpc.ClientConn + followerStream mq_pb.SeaweedMessaging_PublishFollowMeClient + followerGrpcConnection *grpc.ClientConn follower string } @@ -59,9 +59,9 @@ func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error { p.LogBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano()) // maybe send to the follower - if p.FollowerStream != nil { + if p.followerStream != nil { println("recv", string(message.Key), message.TsNs) - if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ + if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ Message: &mq_pb.PublishFollowMeRequest_Data{ Data: message, }, @@ -147,7 +147,7 @@ func (p *LocalPartition) WaitUntilNoPublishers() { } func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) { - if p.FollowerStream != nil { + if p.followerStream != nil { return nil } if len(initMessage.FollowerBrokers) == 0 { @@ -156,16 +156,16 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa p.follower = initMessage.FollowerBrokers[0] ctx := context.Background() - p.FollowerGrpcConnection, err = pb.GrpcDial(ctx, p.follower, true, grpcDialOption) + p.followerGrpcConnection, err = pb.GrpcDial(ctx, p.follower, true, grpcDialOption) if err != nil { return fmt.Errorf("fail to dial %s: %v", p.follower, err) } - followerClient := mq_pb.NewSeaweedMessagingClient(p.FollowerGrpcConnection) - p.FollowerStream, err = followerClient.PublishFollowMe(ctx) + followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection) + p.followerStream, err = followerClient.PublishFollowMe(ctx) if err != nil { return fmt.Errorf("fail to create publish client: %v", err) } - if err = p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ + if err = p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ Message: &mq_pb.PublishFollowMeRequest_Init{ Init: &mq_pb.PublishFollowMeRequest_InitMessage{ Topic: initMessage.Topic, @@ -183,7 +183,7 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa }() for { - ack, err := p.FollowerStream.Recv() + ack, err := p.followerStream.Recv() if err != nil { glog.Errorf("Error receiving follower ack: %v", err) return @@ -204,9 +204,9 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { } p.LogBuffer.ShutdownLogBuffer() - if p.FollowerStream != nil { + if p.followerStream != nil { // send close to the follower - if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ + if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ Message: &mq_pb.PublishFollowMeRequest_Close{ Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, }, @@ -214,8 +214,8 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { glog.Errorf("Error closing follower stream: %v", followErr) } glog.V(4).Infof("closing grpcConnection to follower") - p.FollowerGrpcConnection.Close() - p.FollowerStream = nil + p.followerGrpcConnection.Close() + p.followerStream = nil } return