Browse Source

Connection pooling, HTTP/2 tuning, keepalive

pull/7526/head
chrislu 1 week ago
parent
commit
a428aeb5d7
  1. 116
      other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java

116
other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java

@ -1,17 +1,22 @@
package seaweedfs.client;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.NegotiationType;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class FilerGrpcClient {
@ -29,10 +34,12 @@ public class FilerGrpcClient {
public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2;
public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>();
protected int randomClientId;
private final ManagedChannel channel;
private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub;
// Connection pool to handle concurrent requests
private static final int CHANNEL_POOL_SIZE = 4;
private final List<ManagedChannel> channelPool;
private final AtomicInteger channelIndex = new AtomicInteger(0);
private boolean cipher = false;
private String collection = "";
private String replication = "";
@ -45,26 +52,68 @@ public class FilerGrpcClient {
public FilerGrpcClient(String host, int port, int grpcPort, String cn, SslContext sslContext) {
this(sslContext == null ?
ManagedChannelBuilder.forAddress(host, grpcPort)
.usePlaintext()
.maxInboundMessageSize(1024 * 1024 * 1024) :
cn.isEmpty() ?
NettyChannelBuilder.forAddress(host, grpcPort)
.maxInboundMessageSize(1024 * 1024 * 1024)
.negotiationType(NegotiationType.TLS)
.sslContext(sslContext) :
NettyChannelBuilder.forAddress(host, grpcPort)
.maxInboundMessageSize(1024 * 1024 * 1024)
.negotiationType(NegotiationType.TLS)
.overrideAuthority(cn) //will not check hostname of the filer server
.sslContext(sslContext)
);
filerAddress = SeaweedUtil.joinHostPort(host, port);
FilerProto.GetFilerConfigurationResponse filerConfigurationResponse =
this.getBlockingStub().getFilerConfiguration(
// Create a pool of channels for better concurrency handling
channelPool = new ArrayList<>(CHANNEL_POOL_SIZE);
for (int i = 0; i < CHANNEL_POOL_SIZE; i++) {
ManagedChannel channel;
if (sslContext == null) {
// Use NettyChannelBuilder with optimized HTTP/2 settings
channel = NettyChannelBuilder.forAddress(host, grpcPort)
.usePlaintext()
.maxInboundMessageSize(1024 * 1024 * 1024)
.maxInboundMetadataSize(1024 * 1024)
.flowControlWindow(16 * 1024 * 1024)
.initialFlowControlWindow(16 * 1024 * 1024)
.maxHeaderListSize(16 * 1024 * 1024)
.keepAliveTime(30, TimeUnit.SECONDS)
.keepAliveTimeout(10, TimeUnit.SECONDS)
.keepAliveWithoutCalls(true)
.withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_RCVBUF, 16 * 1024 * 1024)
.withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_SNDBUF, 16 * 1024 * 1024)
.build();
} else if (cn.isEmpty()) {
// For TLS, we still need NettyChannelBuilder
channel = NettyChannelBuilder.forAddress(host, grpcPort)
.maxInboundMessageSize(1024 * 1024 * 1024)
.maxInboundMetadataSize(1024 * 1024)
.flowControlWindow(16 * 1024 * 1024)
.initialFlowControlWindow(16 * 1024 * 1024)
.maxHeaderListSize(16 * 1024 * 1024)
.keepAliveTime(30, TimeUnit.SECONDS)
.keepAliveTimeout(10, TimeUnit.SECONDS)
.keepAliveWithoutCalls(true)
.negotiationType(NegotiationType.TLS)
.sslContext(sslContext)
.withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_RCVBUF, 16 * 1024 * 1024)
.withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_SNDBUF, 16 * 1024 * 1024)
.build();
} else {
// For TLS with custom authority
channel = NettyChannelBuilder.forAddress(host, grpcPort)
.maxInboundMessageSize(1024 * 1024 * 1024)
.maxInboundMetadataSize(1024 * 1024)
.flowControlWindow(16 * 1024 * 1024)
.initialFlowControlWindow(16 * 1024 * 1024)
.maxHeaderListSize(16 * 1024 * 1024)
.keepAliveTime(30, TimeUnit.SECONDS)
.keepAliveTimeout(10, TimeUnit.SECONDS)
.keepAliveWithoutCalls(true)
.negotiationType(NegotiationType.TLS)
.overrideAuthority(cn)
.sslContext(sslContext)
.withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_RCVBUF, 16 * 1024 * 1024)
.withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_SNDBUF, 16 * 1024 * 1024)
.build();
}
channelPool.add(channel);
}
// Get filer configuration using first channel
FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = SeaweedFilerGrpc
.newBlockingStub(channelPool.get(0)).getFilerConfiguration(
FilerProto.GetFilerConfigurationRequest.newBuilder().build());
cipher = filerConfigurationResponse.getCipher();
collection = filerConfigurationResponse.getCollection();
@ -73,11 +122,10 @@ public class FilerGrpcClient {
}
private FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = SeaweedFilerGrpc.newBlockingStub(channel);
asyncStub = SeaweedFilerGrpc.newStub(channel);
futureStub = SeaweedFilerGrpc.newFutureStub(channel);
// Get a channel from the pool using round-robin
private ManagedChannel getChannel() {
int index = Math.abs(channelIndex.getAndIncrement() % CHANNEL_POOL_SIZE);
return channelPool.get(index);
}
public boolean isCipher() {
@ -93,19 +141,25 @@ public class FilerGrpcClient {
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
for (ManagedChannel channel : channelPool) {
channel.shutdown();
}
for (ManagedChannel channel : channelPool) {
channel.awaitTermination(5, TimeUnit.SECONDS);
}
}
public SeaweedFilerGrpc.SeaweedFilerBlockingStub getBlockingStub() {
return blockingStub;
// Return a new stub using a channel from the pool (round-robin)
return SeaweedFilerGrpc.newBlockingStub(getChannel());
}
public SeaweedFilerGrpc.SeaweedFilerStub getAsyncStub() {
return asyncStub;
return SeaweedFilerGrpc.newStub(getChannel());
}
public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() {
return futureStub;
return SeaweedFilerGrpc.newFutureStub(getChannel());
}
public void setAccessVolumeServerDirectly() {

Loading…
Cancel
Save