@ -56,25 +56,26 @@ public class FilerGrpcClient {
channelPool = new ArrayList < > ( CHANNEL_POOL_SIZE ) ;
channelPool = new ArrayList < > ( CHANNEL_POOL_SIZE ) ;
for ( int i = 0 ; i < CHANNEL_POOL_SIZE ; i + + ) {
for ( int i = 0 ; i < CHANNEL_POOL_SIZE ; i + + ) {
ManagedChannel channel ;
if ( sslContext = = null ) {
/ / Use NettyChannelBuilder with optimized HTTP / 2 settings
channel = NettyChannelBuilder . forAddress ( host , grpcPort )
. usePlaintext ( )
. maxInboundMessageSize ( 1024 * 1024 * 1024 )
. maxInboundMetadataSize ( 1024 * 1024 )
. flowControlWindow ( 16 * 1024 * 1024 )
. initialFlowControlWindow ( 16 * 1024 * 1024 )
. maxHeaderListSize ( 16 * 1024 * 1024 )
. keepAliveTime ( 30 , TimeUnit . SECONDS )
. keepAliveTimeout ( 10 , TimeUnit . SECONDS )
. keepAliveWithoutCalls ( true )
. withOption ( io . grpc . netty . shaded . io . netty . channel . ChannelOption . SO_RCVBUF , 16 * 1024 * 1024 )
. withOption ( io . grpc . netty . shaded . io . netty . channel . ChannelOption . SO_SNDBUF , 16 * 1024 * 1024 )
. build ( ) ;
} else if ( cn . isEmpty ( ) ) {
/ / For TLS , we still need NettyChannelBuilder
channel = NettyChannelBuilder . forAddress ( host , grpcPort )
channelPool . add ( createChannelBuilder ( host , grpcPort , sslContext , cn ) . build ( ) ) ;
}
/ / Get filer configuration using first channel
FilerProto . GetFilerConfigurationResponse filerConfigurationResponse = SeaweedFilerGrpc
. newBlockingStub ( channelPool . get ( 0 ) ) . getFilerConfiguration (
FilerProto . GetFilerConfigurationRequest . newBuilder ( ) . build ( ) ) ;
cipher = filerConfigurationResponse . getCipher ( ) ;
collection = filerConfigurationResponse . getCollection ( ) ;
replication = filerConfigurationResponse . getReplication ( ) ;
randomClientId = new Random ( ) . nextInt ( ) ;
}
/ * *
* Creates a NettyChannelBuilder with common gRPC configuration .
* Supports plaintext and TLS modes with optional authority override .
* /
private NettyChannelBuilder createChannelBuilder ( String host , int grpcPort , SslContext sslContext , String cn ) {
NettyChannelBuilder builder = NettyChannelBuilder . forAddress ( host , grpcPort )
. maxInboundMessageSize ( 1024 * 1024 * 1024 )
. maxInboundMessageSize ( 1024 * 1024 * 1024 )
. maxInboundMetadataSize ( 1024 * 1024 )
. maxInboundMetadataSize ( 1024 * 1024 )
. flowControlWindow ( 16 * 1024 * 1024 )
. flowControlWindow ( 16 * 1024 * 1024 )
@ -83,41 +84,18 @@ public class FilerGrpcClient {
. keepAliveTime ( 30 , TimeUnit . SECONDS )
. keepAliveTime ( 30 , TimeUnit . SECONDS )
. keepAliveTimeout ( 10 , TimeUnit . SECONDS )
. keepAliveTimeout ( 10 , TimeUnit . SECONDS )
. keepAliveWithoutCalls ( true )
. keepAliveWithoutCalls ( true )
. negotiationType ( NegotiationType . TLS )
. sslContext ( sslContext )
. withOption ( io . grpc . netty . shaded . io . netty . channel . ChannelOption . SO_RCVBUF , 16 * 1024 * 1024 )
. withOption ( io . grpc . netty . shaded . io . netty . channel . ChannelOption . SO_RCVBUF , 16 * 1024 * 1024 )
. withOption ( io . grpc . netty . shaded . io . netty . channel . ChannelOption . SO_SNDBUF , 16 * 1024 * 1024 )
. build ( ) ;
. withOption ( io . grpc . netty . shaded . io . netty . channel . ChannelOption . SO_SNDBUF , 16 * 1024 * 1024 ) ;
if ( sslContext = = null ) {
builder . usePlaintext ( ) ;
} else {
} else {
/ / For TLS with custom authority
channel = NettyChannelBuilder . forAddress ( host , grpcPort )
. maxInboundMessageSize ( 1024 * 1024 * 1024 )
. maxInboundMetadataSize ( 1024 * 1024 )
. flowControlWindow ( 16 * 1024 * 1024 )
. initialFlowControlWindow ( 16 * 1024 * 1024 )
. maxHeaderListSize ( 16 * 1024 * 1024 )
. keepAliveTime ( 30 , TimeUnit . SECONDS )
. keepAliveTimeout ( 10 , TimeUnit . SECONDS )
. keepAliveWithoutCalls ( true )
. negotiationType ( NegotiationType . TLS )
. overrideAuthority ( cn )
. sslContext ( sslContext )
. withOption ( io . grpc . netty . shaded . io . netty . channel . ChannelOption . SO_RCVBUF , 16 * 1024 * 1024 )
. withOption ( io . grpc . netty . shaded . io . netty . channel . ChannelOption . SO_SNDBUF , 16 * 1024 * 1024 )
. build ( ) ;
builder . negotiationType ( NegotiationType . TLS ) . sslContext ( sslContext ) ;
if ( ! cn . isEmpty ( ) ) {
builder . overrideAuthority ( cn ) ;
}
}
channelPool . add ( channel ) ;
}
}
/ / Get filer configuration using first channel
FilerProto . GetFilerConfigurationResponse filerConfigurationResponse = SeaweedFilerGrpc
. newBlockingStub ( channelPool . get ( 0 ) ) . getFilerConfiguration (
FilerProto . GetFilerConfigurationRequest . newBuilder ( ) . build ( ) ) ;
cipher = filerConfigurationResponse . getCipher ( ) ;
collection = filerConfigurationResponse . getCollection ( ) ;
replication = filerConfigurationResponse . getReplication ( ) ;
randomClientId = new Random ( ) . nextInt ( ) ;
return builder ;
}
}
/ / Get a channel from the pool using round - robin
/ / Get a channel from the pool using round - robin