From 107e8a56ea0fee9eff996177003b32a0179d7651 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 24 Jan 2020 01:40:51 -0800 Subject: [PATCH] retry context canceled request --- weed/filesys/wfs.go | 24 +++++++++++++++++++----- weed/util/grpc_client_server.go | 9 ++++++++- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 4cfab811b..bc78a0dbe 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "os" + "strings" "sync" "time" @@ -47,8 +48,8 @@ type WFS struct { listDirectoryEntriesCache *ccache.Cache // contains all open handles, protected by handlesLock - handlesLock sync.Mutex - handles []*FileHandle + handlesLock sync.Mutex + handles []*FileHandle pathToHandleIndex map[filer2.FullPath]int bufPool sync.Pool @@ -89,11 +90,24 @@ func (wfs *WFS) Root() (fs.Node, error) { func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + err := util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) + if err == nil { + return nil + } + if strings.Contains(err.Error(), "context canceled") { + time.Sleep(1337 * time.Millisecond) + glog.V(2).Infoln("retry context canceled request...") + return util.WithCachedGrpcClient(context.Background(), func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) + } + return err + } func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) { @@ -116,7 +130,7 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand wfs.handles[i] = fileHandle fileHandle.handle = uint64(i) wfs.pathToHandleIndex[fullpath] = i - glog.V(4).Infof( "%s reuse fh %d", fullpath,fileHandle.handle) + glog.V(4).Infof("%s reuse fh %d", fullpath, fileHandle.handle) return } } @@ -124,7 +138,7 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand wfs.handles = append(wfs.handles, fileHandle) fileHandle.handle = uint64(len(wfs.handles) - 1) wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle) - glog.V(4).Infof( "%s new fh %d", fullpath,fileHandle.handle) + glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle) return } diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go index 31497ad35..63519d97a 100644 --- a/weed/util/grpc_client_server.go +++ b/weed/util/grpc_client_server.go @@ -64,7 +64,14 @@ func WithCachedGrpcClient(ctx context.Context, fn func(*grpc.ClientConn) error, existingConnection, found := grpcClients[address] if found { grpcClientsLock.Unlock() - return fn(existingConnection) + err := fn(existingConnection) + if err != nil { + grpcClientsLock.Lock() + delete(grpcClients, address) + grpcClientsLock.Unlock() + existingConnection.Close() + } + return err } grpcConnection, err := GrpcDial(ctx, address, opts...)