From 5aa111708d9fdf4708e8aec960699947c1fbacd4 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 26 Dec 2025 10:58:18 -0800 Subject: [PATCH] grpc: reduce client idle pings to avoid ENHANCE_YOUR_CALM (#7885) * grpc: reduce client idle pings to avoid ENHANCE_YOUR_CALM (too_many_pings) * test: use context.WithTimeout and pb constants for keepalive * test(kafka): use separate dial and client contexts in NewDirectBrokerClient * test(kafka): fix client context usage in NewDirectBrokerClient --- .../seaweedfs/client/FilerGrpcClient.java | 2 +- .../loadtest/mock_million_record_test.go | 30 ++++++++++++------- weed/pb/grpc_client_server.go | 8 +++-- 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index f8334b734..e559431dd 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -87,7 +87,7 @@ public class FilerGrpcClient { .maxHeaderListSize(16 * 1024 * 1024) .keepAliveTime(KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS) .keepAliveTimeout(KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS) - .keepAliveWithoutCalls(true) + .keepAliveWithoutCalls(false) .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_RCVBUF, 16 * 1024 * 1024) .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_SNDBUF, 16 * 1024 * 1024); diff --git a/test/kafka/loadtest/mock_million_record_test.go b/test/kafka/loadtest/mock_million_record_test.go index ada018cbb..83518fb74 100644 --- a/test/kafka/loadtest/mock_million_record_test.go +++ b/test/kafka/loadtest/mock_million_record_test.go @@ -14,6 +14,8 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" @@ -119,22 +121,28 @@ type PublisherSession struct { } func NewDirectBrokerClient(brokerAddr string) (*DirectBrokerClient, error) { - ctx, cancel := context.WithCancel(context.Background()) - - // Add connection timeout and keepalive settings - conn, err := grpc.DialContext(ctx, brokerAddr, + // Use a short-lived context for dialing so we don't store a canceled + // context in the returned client. The client's operational context + // (used by methods) should be cancellable independently. + dialCtx, dialCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer dialCancel() + + // Add keepalive settings; use exported server constants to keep values in sync. + conn, err := grpc.DialContext(dialCtx, brokerAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithTimeout(30*time.Second), grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 30 * time.Second, // Increased from 10s to 30s - Timeout: 10 * time.Second, // Increased from 5s to 10s - PermitWithoutStream: false, // Changed to false to reduce pings + Time: pb.GrpcKeepAliveTime, // align with server MinTime + Timeout: pb.GrpcKeepAliveTimeout, // align with server timeout + PermitWithoutStream: false, // reduce pings when idle })) if err != nil { - cancel() return nil, fmt.Errorf("failed to connect to broker: %v", err) } + // Create a long-lived context for the client's lifetime and store it + // in the returned DirectBrokerClient so callers can cancel when done. + clientCtx, clientCancel := context.WithCancel(context.Background()) + client := mq_pb.NewSeaweedMessagingClient(conn) return &DirectBrokerClient{ @@ -142,8 +150,8 @@ func NewDirectBrokerClient(brokerAddr string) (*DirectBrokerClient, error) { conn: conn, client: client, publishers: make(map[string]*PublisherSession), - ctx: ctx, - cancel: cancel, + ctx: clientCtx, + cancel: clientCancel, }, nil } diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index 4a869bb95..6d09b7f6e 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -94,9 +94,11 @@ func GrpcDial(ctx context.Context, address string, waitForReady bool, opts ...gr grpc.WaitForReady(waitForReady), ), grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: GrpcKeepAliveTime, // client ping server if no activity for this long - Timeout: GrpcKeepAliveTimeout, // ping timeout - PermitWithoutStream: true, + Time: GrpcKeepAliveTime, // client ping server if no activity for this long + Timeout: GrpcKeepAliveTimeout, // ping timeout + // Disable pings when there are no active streams to avoid triggering + // server enforcement for too-frequent pings from idle clients. + PermitWithoutStream: false, })) for _, opt := range opts { if opt != nil {