Browse Source

change visibility

mq-subscribe
chrislu 10 months ago
parent
commit
7463058299
  1. 28
      weed/mq/topic/local_partition.go

28
weed/mq/topic/local_partition.go

@ -30,8 +30,8 @@ type LocalPartition struct {
Subscribers *LocalPartitionSubscribers Subscribers *LocalPartitionSubscribers
FollowerId int32 FollowerId int32
FollowerStream mq_pb.SeaweedMessaging_PublishFollowMeClient
FollowerGrpcConnection *grpc.ClientConn
followerStream mq_pb.SeaweedMessaging_PublishFollowMeClient
followerGrpcConnection *grpc.ClientConn
follower string follower string
} }
@ -59,9 +59,9 @@ func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
p.LogBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano()) p.LogBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
// maybe send to the follower // maybe send to the follower
if p.FollowerStream != nil {
if p.followerStream != nil {
println("recv", string(message.Key), message.TsNs) println("recv", string(message.Key), message.TsNs)
if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Data{ Message: &mq_pb.PublishFollowMeRequest_Data{
Data: message, Data: message,
}, },
@ -147,7 +147,7 @@ func (p *LocalPartition) WaitUntilNoPublishers() {
} }
func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) { func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) {
if p.FollowerStream != nil {
if p.followerStream != nil {
return nil return nil
} }
if len(initMessage.FollowerBrokers) == 0 { if len(initMessage.FollowerBrokers) == 0 {
@ -156,16 +156,16 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
p.follower = initMessage.FollowerBrokers[0] p.follower = initMessage.FollowerBrokers[0]
ctx := context.Background() ctx := context.Background()
p.FollowerGrpcConnection, err = pb.GrpcDial(ctx, p.follower, true, grpcDialOption)
p.followerGrpcConnection, err = pb.GrpcDial(ctx, p.follower, true, grpcDialOption)
if err != nil { if err != nil {
return fmt.Errorf("fail to dial %s: %v", p.follower, err) return fmt.Errorf("fail to dial %s: %v", p.follower, err)
} }
followerClient := mq_pb.NewSeaweedMessagingClient(p.FollowerGrpcConnection)
p.FollowerStream, err = followerClient.PublishFollowMe(ctx)
followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection)
p.followerStream, err = followerClient.PublishFollowMe(ctx)
if err != nil { if err != nil {
return fmt.Errorf("fail to create publish client: %v", err) return fmt.Errorf("fail to create publish client: %v", err)
} }
if err = p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
if err = p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Init{ Message: &mq_pb.PublishFollowMeRequest_Init{
Init: &mq_pb.PublishFollowMeRequest_InitMessage{ Init: &mq_pb.PublishFollowMeRequest_InitMessage{
Topic: initMessage.Topic, Topic: initMessage.Topic,
@ -183,7 +183,7 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
}() }()
for { for {
ack, err := p.FollowerStream.Recv()
ack, err := p.followerStream.Recv()
if err != nil { if err != nil {
glog.Errorf("Error receiving follower ack: %v", err) glog.Errorf("Error receiving follower ack: %v", err)
return return
@ -204,9 +204,9 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
} }
p.LogBuffer.ShutdownLogBuffer() p.LogBuffer.ShutdownLogBuffer()
if p.FollowerStream != nil {
if p.followerStream != nil {
// send close to the follower // send close to the follower
if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Close{ Message: &mq_pb.PublishFollowMeRequest_Close{
Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
}, },
@ -214,8 +214,8 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
glog.Errorf("Error closing follower stream: %v", followErr) glog.Errorf("Error closing follower stream: %v", followErr)
} }
glog.V(4).Infof("closing grpcConnection to follower") glog.V(4).Infof("closing grpcConnection to follower")
p.FollowerGrpcConnection.Close()
p.FollowerStream = nil
p.followerGrpcConnection.Close()
p.followerStream = nil
} }
return return

Loading…
Cancel
Save