chrislu
12 months ago
1 changed files with 0 additions and 71 deletions
@ -1,71 +0,0 @@ |
|||
package pub_client |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|||
"google.golang.org/grpc/codes" |
|||
"google.golang.org/grpc/status" |
|||
"log" |
|||
) |
|||
|
|||
// broker => publish client
|
|||
// send init message
|
|||
// save the publishing client
|
|||
func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, err error) { |
|||
log.Printf("connecting to %v for topic partition %+v", brokerAddress, partition) |
|||
|
|||
grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption) |
|||
if err != nil { |
|||
return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err) |
|||
} |
|||
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) |
|||
stream, err := brokerClient.PublishMessage(context.Background()) |
|||
if err != nil { |
|||
return publishClient, fmt.Errorf("create publish client: %v", err) |
|||
} |
|||
publishClient = &PublishClient{ |
|||
SeaweedMessaging_PublishMessageClient: stream, |
|||
Broker: brokerAddress, |
|||
} |
|||
if err = publishClient.Send(&mq_pb.PublishMessageRequest{ |
|||
Message: &mq_pb.PublishMessageRequest_Init{ |
|||
Init: &mq_pb.PublishMessageRequest_InitMessage{ |
|||
Topic: p.config.Topic.ToPbTopic(), |
|||
Partition: &mq_pb.Partition{ |
|||
RingSize: partition.RingSize, |
|||
RangeStart: partition.RangeStart, |
|||
RangeStop: partition.RangeStop, |
|||
UnixTimeNs: partition.UnixTimeNs, |
|||
}, |
|||
AckInterval: 128, |
|||
}, |
|||
}, |
|||
}); err != nil { |
|||
return publishClient, fmt.Errorf("send init message: %v", err) |
|||
} |
|||
resp, err := stream.Recv() |
|||
if err != nil { |
|||
return publishClient, fmt.Errorf("recv init response: %v", err) |
|||
} |
|||
if resp.Error != "" { |
|||
return publishClient, fmt.Errorf("init response error: %v", resp.Error) |
|||
} |
|||
|
|||
go func() { |
|||
for { |
|||
_, err := publishClient.Recv() |
|||
if err != nil { |
|||
e, ok := status.FromError(err) |
|||
if ok && e.Code() == codes.Unknown && e.Message() == "EOF" { |
|||
return |
|||
} |
|||
publishClient.Err = err |
|||
fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err) |
|||
return |
|||
} |
|||
} |
|||
}() |
|||
return publishClient, nil |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue