diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 44977d186..d4e2b2ad4 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -1,17 +1,22 @@ package seaweedfs.client; +import io.grpc.Deadline; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.netty.shaded.io.grpc.netty.NegotiationType; import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Settings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public class FilerGrpcClient { @@ -29,10 +34,12 @@ public class FilerGrpcClient { public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2; public final Map vidLocations = new HashMap<>(); protected int randomClientId; - private final ManagedChannel channel; - private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; - private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; - private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub; + + // Connection pool to handle concurrent requests + private static final int CHANNEL_POOL_SIZE = 4; + private final List channelPool; + private final AtomicInteger channelIndex = new AtomicInteger(0); + private boolean cipher = false; private String collection = ""; private String replication = ""; @@ -45,26 +52,68 @@ public class FilerGrpcClient { public FilerGrpcClient(String host, int port, int grpcPort, String cn, SslContext sslContext) { - this(sslContext == null ? - ManagedChannelBuilder.forAddress(host, grpcPort) - .usePlaintext() - .maxInboundMessageSize(1024 * 1024 * 1024) : - cn.isEmpty() ? - NettyChannelBuilder.forAddress(host, grpcPort) - .maxInboundMessageSize(1024 * 1024 * 1024) - .negotiationType(NegotiationType.TLS) - .sslContext(sslContext) : - NettyChannelBuilder.forAddress(host, grpcPort) - .maxInboundMessageSize(1024 * 1024 * 1024) - .negotiationType(NegotiationType.TLS) - .overrideAuthority(cn) //will not check hostname of the filer server - .sslContext(sslContext) - ); - filerAddress = SeaweedUtil.joinHostPort(host, port); - FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = - this.getBlockingStub().getFilerConfiguration( + // Create a pool of channels for better concurrency handling + channelPool = new ArrayList<>(CHANNEL_POOL_SIZE); + + for (int i = 0; i < CHANNEL_POOL_SIZE; i++) { + ManagedChannel channel; + if (sslContext == null) { + // Use NettyChannelBuilder with optimized HTTP/2 settings + channel = NettyChannelBuilder.forAddress(host, grpcPort) + .usePlaintext() + .maxInboundMessageSize(1024 * 1024 * 1024) + .maxInboundMetadataSize(1024 * 1024) + .flowControlWindow(16 * 1024 * 1024) + .initialFlowControlWindow(16 * 1024 * 1024) + .maxHeaderListSize(16 * 1024 * 1024) + .keepAliveTime(30, TimeUnit.SECONDS) + .keepAliveTimeout(10, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true) + .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_RCVBUF, 16 * 1024 * 1024) + .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_SNDBUF, 16 * 1024 * 1024) + .build(); + } else if (cn.isEmpty()) { + // For TLS, we still need NettyChannelBuilder + channel = NettyChannelBuilder.forAddress(host, grpcPort) + .maxInboundMessageSize(1024 * 1024 * 1024) + .maxInboundMetadataSize(1024 * 1024) + .flowControlWindow(16 * 1024 * 1024) + .initialFlowControlWindow(16 * 1024 * 1024) + .maxHeaderListSize(16 * 1024 * 1024) + .keepAliveTime(30, TimeUnit.SECONDS) + .keepAliveTimeout(10, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true) + .negotiationType(NegotiationType.TLS) + .sslContext(sslContext) + .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_RCVBUF, 16 * 1024 * 1024) + .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_SNDBUF, 16 * 1024 * 1024) + .build(); + } else { + // For TLS with custom authority + channel = NettyChannelBuilder.forAddress(host, grpcPort) + .maxInboundMessageSize(1024 * 1024 * 1024) + .maxInboundMetadataSize(1024 * 1024) + .flowControlWindow(16 * 1024 * 1024) + .initialFlowControlWindow(16 * 1024 * 1024) + .maxHeaderListSize(16 * 1024 * 1024) + .keepAliveTime(30, TimeUnit.SECONDS) + .keepAliveTimeout(10, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true) + .negotiationType(NegotiationType.TLS) + .overrideAuthority(cn) + .sslContext(sslContext) + .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_RCVBUF, 16 * 1024 * 1024) + .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_SNDBUF, 16 * 1024 * 1024) + .build(); + } + channelPool.add(channel); + } + + // Get filer configuration using first channel + FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = SeaweedFilerGrpc + .newBlockingStub(channelPool.get(0)).getFilerConfiguration( FilerProto.GetFilerConfigurationRequest.newBuilder().build()); cipher = filerConfigurationResponse.getCipher(); collection = filerConfigurationResponse.getCollection(); @@ -73,11 +122,10 @@ public class FilerGrpcClient { } - private FilerGrpcClient(ManagedChannelBuilder channelBuilder) { - channel = channelBuilder.build(); - blockingStub = SeaweedFilerGrpc.newBlockingStub(channel); - asyncStub = SeaweedFilerGrpc.newStub(channel); - futureStub = SeaweedFilerGrpc.newFutureStub(channel); + // Get a channel from the pool using round-robin + private ManagedChannel getChannel() { + int index = Math.abs(channelIndex.getAndIncrement() % CHANNEL_POOL_SIZE); + return channelPool.get(index); } public boolean isCipher() { @@ -93,19 +141,25 @@ public class FilerGrpcClient { } public void shutdown() throws InterruptedException { - channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + for (ManagedChannel channel : channelPool) { + channel.shutdown(); + } + for (ManagedChannel channel : channelPool) { + channel.awaitTermination(5, TimeUnit.SECONDS); + } } public SeaweedFilerGrpc.SeaweedFilerBlockingStub getBlockingStub() { - return blockingStub; + // Return a new stub using a channel from the pool (round-robin) + return SeaweedFilerGrpc.newBlockingStub(getChannel()); } public SeaweedFilerGrpc.SeaweedFilerStub getAsyncStub() { - return asyncStub; + return SeaweedFilerGrpc.newStub(getChannel()); } public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() { - return futureStub; + return SeaweedFilerGrpc.newFutureStub(getChannel()); } public void setAccessVolumeServerDirectly() {