|
|
@ -4,10 +4,10 @@ import ( |
|
|
|
"context" |
|
|
|
"fmt" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
|
|
|
"google.golang.org/grpc" |
|
|
|
"google.golang.org/grpc/credentials/insecure" |
|
|
|
) |
|
|
|
|
|
|
|
type SubscribeOption struct { |
|
|
@ -22,12 +22,11 @@ type SubscribeOption struct { |
|
|
|
type SubscribeSession struct { |
|
|
|
Option *SubscribeOption |
|
|
|
stream grpc.BidiStreamingClient[mq_agent_pb.SubscribeRecordRequest, mq_agent_pb.SubscribeRecordResponse] |
|
|
|
sessionId int64 |
|
|
|
} |
|
|
|
|
|
|
|
func NewSubscribeSession(agentAddress string, option *SubscribeOption) (*SubscribeSession, error) { |
|
|
|
// call local agent grpc server to create a new session
|
|
|
|
clientConn, err := pb.GrpcDial(context.Background(), agentAddress, true, grpc.WithInsecure()) |
|
|
|
clientConn, err := grpc.NewClient(agentAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err) |
|
|
|
} |
|
|
@ -55,10 +54,15 @@ func NewSubscribeSession(agentAddress string, option *SubscribeOption) (*Subscri |
|
|
|
return nil, fmt.Errorf("subscribe record: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
if err = stream.Send(&mq_agent_pb.SubscribeRecordRequest{ |
|
|
|
SessionId: resp.SessionId, |
|
|
|
}); err != nil { |
|
|
|
return nil, fmt.Errorf("send session id: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
return &SubscribeSession{ |
|
|
|
Option: option, |
|
|
|
stream: stream, |
|
|
|
sessionId: resp.SessionId, |
|
|
|
}, nil |
|
|
|
} |
|
|
|
|
|
|
|