|
@ -67,7 +67,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis |
|
|
// connect to follower brokers
|
|
|
// connect to follower brokers
|
|
|
if localTopicPartition.FollowerStream == nil && len(initMessage.FollowerBrokers) > 0 { |
|
|
if localTopicPartition.FollowerStream == nil && len(initMessage.FollowerBrokers) > 0 { |
|
|
follower := initMessage.FollowerBrokers[0] |
|
|
follower := initMessage.FollowerBrokers[0] |
|
|
ctx := stream.Context() |
|
|
|
|
|
|
|
|
ctx := context.Background() |
|
|
localTopicPartition.FollowerGrpcConnection, err = pb.GrpcDial(ctx, follower, true, b.grpcDialOption) |
|
|
localTopicPartition.FollowerGrpcConnection, err = pb.GrpcDial(ctx, follower, true, b.grpcDialOption) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
response.Error = fmt.Sprintf("fail to dial %s: %v", follower, err) |
|
|
response.Error = fmt.Sprintf("fail to dial %s: %v", follower, err) |
|
|