From 3d3fa43542dabab77a2c1ade868fcd8dc0e68cd9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 22 May 2021 17:46:53 -0700 Subject: [PATCH] filer: re-create grpc connections if having transport error fix https://github.com/chrislusf/seaweedfs/issues/2070 --- weed/pb/grpc_client_server.go | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index cdac0ba99..edb60e4fa 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/chrislusf/seaweedfs/weed/glog" + "math/rand" "net/http" "strconv" "strings" @@ -24,10 +25,15 @@ const ( var ( // cache grpc connections - grpcClients = make(map[string]*grpc.ClientConn) + grpcClients = make(map[string]*versionedGrpcClient) grpcClientsLock sync.Mutex ) +type versionedGrpcClient struct { + *grpc.ClientConn + version int +} + func init() { http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 1024 http.DefaultTransport.(*http.Transport).MaxIdleConns = 1024 @@ -79,7 +85,7 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr return grpc.DialContext(ctx, address, options...) } -func getOrCreateConnection(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { +func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedGrpcClient, error) { grpcClientsLock.Lock() defer grpcClientsLock.Unlock() @@ -94,18 +100,34 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*grpc.Clien return nil, fmt.Errorf("fail to dial %s: %v", address, err) } - grpcClients[address] = grpcConnection + vgc := &versionedGrpcClient{ + grpcConnection, + rand.Int(), + } + grpcClients[address] = vgc - return grpcConnection, nil + return vgc, nil } func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { - grpcConnection, err := getOrCreateConnection(address, opts...) + vgc, err := getOrCreateConnection(address, opts...) if err != nil { return fmt.Errorf("getOrCreateConnection %s: %v", address, err) } - return fn(grpcConnection) + executionErr := fn(vgc.ClientConn) + if executionErr != nil && strings.Contains(executionErr.Error(), "transport") { + grpcClientsLock.Lock() + if t, ok := grpcClients[address]; ok { + if t.version == vgc.version { + vgc.Close() + delete(grpcClients, address) + } + } + grpcClientsLock.Unlock() + } + + return executionErr } func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) {