diff --git a/weed/mq/client/pub_client/connect.go b/weed/mq/client/pub_client/connect.go deleted file mode 100644 index 045c9593c..000000000 --- a/weed/mq/client/pub_client/connect.go +++ /dev/null @@ -1,71 +0,0 @@ -package pub_client - -import ( - "context" - "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "log" -) - -// broker => publish client -// send init message -// save the publishing client -func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, err error) { - log.Printf("connecting to %v for topic partition %+v", brokerAddress, partition) - - grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption) - if err != nil { - return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err) - } - brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) - stream, err := brokerClient.PublishMessage(context.Background()) - if err != nil { - return publishClient, fmt.Errorf("create publish client: %v", err) - } - publishClient = &PublishClient{ - SeaweedMessaging_PublishMessageClient: stream, - Broker: brokerAddress, - } - if err = publishClient.Send(&mq_pb.PublishMessageRequest{ - Message: &mq_pb.PublishMessageRequest_Init{ - Init: &mq_pb.PublishMessageRequest_InitMessage{ - Topic: p.config.Topic.ToPbTopic(), - Partition: &mq_pb.Partition{ - RingSize: partition.RingSize, - RangeStart: partition.RangeStart, - RangeStop: partition.RangeStop, - UnixTimeNs: partition.UnixTimeNs, - }, - AckInterval: 128, - }, - }, - }); err != nil { - return publishClient, fmt.Errorf("send init message: %v", err) - } - resp, err := stream.Recv() - if err != nil { - return publishClient, fmt.Errorf("recv init response: %v", err) - } - if resp.Error != "" { - return publishClient, fmt.Errorf("init response error: %v", resp.Error) - } - - go func() { - for { - _, err := publishClient.Recv() - if err != nil { - e, ok := status.FromError(err) - if ok && e.Code() == codes.Unknown && e.Message() == "EOF" { - return - } - publishClient.Err = err - fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err) - return - } - } - }() - return publishClient, nil -}