diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index e1f369e5b..bb655c256 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -40,10 +40,6 @@ type WFS struct { pathToHandleIndex map[string]int pathToHandleLock sync.Mutex - // cache grpc connections - grpcClients map[string]*grpc.ClientConn - grpcClientsLock sync.Mutex - stats statsCache } type statsCache struct { @@ -56,7 +52,6 @@ func NewSeaweedFileSystem(option *Option) *WFS { option: option, listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(int64(option.DirListingLimit) + 200).ItemsToPrune(100)), pathToHandleIndex: make(map[string]int), - grpcClients: make(map[string]*grpc.ClientConn), } } @@ -66,27 +61,11 @@ func (wfs *WFS) Root() (fs.Node, error) { func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - wfs.grpcClientsLock.Lock() - - existingConnection, found := wfs.grpcClients[wfs.option.FilerGrpcAddress] - if found { - wfs.grpcClientsLock.Unlock() - client := filer_pb.NewSeaweedFilerClient(existingConnection) + return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - } - - grpcConnection, err := util.GrpcDial(wfs.option.FilerGrpcAddress) - if err != nil { - wfs.grpcClientsLock.Unlock() - return fmt.Errorf("fail to dial %s: %v", wfs.option.FilerGrpcAddress, err) - } - - wfs.grpcClients[wfs.option.FilerGrpcAddress] = grpcConnection - wfs.grpcClientsLock.Unlock() - - client := filer_pb.NewSeaweedFilerClient(grpcConnection) + }, wfs.option.FilerGrpcAddress) - return fn(client) } func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) { diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index b1d6a633e..300f78b58 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -25,27 +25,11 @@ func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.Volume return err } - grpcClientsLock.Lock() - - existingConnection, found := grpcClients[grpcAddress] - if found { - grpcClientsLock.Unlock() - client := volume_server_pb.NewVolumeServerClient(existingConnection) + return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) - } - - grpcConnection, err := util.GrpcDial(grpcAddress) - if err != nil { - grpcClientsLock.Unlock() - return fmt.Errorf("fail to dial %s: %v", grpcAddress, err) - } - - grpcClients[grpcAddress] = grpcConnection - grpcClientsLock.Unlock() - - client := volume_server_pb.NewVolumeServerClient(grpcConnection) + }, grpcAddress) - return fn(client) } func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err error) { @@ -60,25 +44,9 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err func withMasterServerClient(masterServer string, fn func(masterClient master_pb.SeaweedClient) error) error { - grpcClientsLock.Lock() - - existingConnection, found := grpcClients[masterServer] - if found { - grpcClientsLock.Unlock() - client := master_pb.NewSeaweedClient(existingConnection) + return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) - } - - grpcConnection, err := util.GrpcDial(masterServer) - if err != nil { - grpcClientsLock.Unlock() - return fmt.Errorf("fail to dial %s: %v", masterServer, err) - } - - grpcClients[masterServer] = grpcConnection - grpcClientsLock.Unlock() - - client := master_pb.NewSeaweedClient(grpcConnection) + }, masterServer) - return fn(client) } diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go index 8dbb4c0cd..18d5c02c9 100644 --- a/weed/util/grpc_client_server.go +++ b/weed/util/grpc_client_server.go @@ -1,12 +1,20 @@ package util import ( + "fmt" + "sync" "time" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" ) +var ( + // cache grpc connections + grpcClients = make(map[string]*grpc.ClientConn) + grpcClientsLock sync.Mutex +) + func NewGrpcServer() *grpc.Server { return grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{ Time: 10 * time.Second, // wait time before ping if no activity @@ -26,3 +34,32 @@ func GrpcDial(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) return grpc.Dial(address, opts...) } + +func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { + + grpcClientsLock.Lock() + + existingConnection, found := grpcClients[address] + if found { + grpcClientsLock.Unlock() + return fn(existingConnection) + } + + grpcConnection, err := GrpcDial(address, opts...) + if err != nil { + grpcClientsLock.Unlock() + return fmt.Errorf("fail to dial %s: %v", address, err) + } + + grpcClients[address] = grpcConnection + grpcClientsLock.Unlock() + + err = fn(grpcConnection) + if err != nil { + grpcClientsLock.Lock() + delete(grpcClients, address) + grpcClientsLock.Unlock() + } + + return err +}