You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

100 lines
2.6 KiB

6 years ago
6 years ago
6 years ago
6 years ago
  1. package util
  2. import (
  3. "fmt"
  4. "strconv"
  5. "strings"
  6. "sync"
  7. "time"
  8. "google.golang.org/grpc"
  9. "google.golang.org/grpc/keepalive"
  10. )
  11. var (
  12. // cache grpc connections
  13. grpcClients = make(map[string]*grpc.ClientConn)
  14. grpcClientsLock sync.Mutex
  15. )
  16. func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
  17. var options []grpc.ServerOption
  18. options = append(options, grpc.KeepaliveParams(keepalive.ServerParameters{
  19. Time: 10 * time.Second, // wait time before ping if no activity
  20. Timeout: 20 * time.Second, // ping timeout
  21. }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  22. MinTime: 60 * time.Second, // min time a client should wait before sending a ping
  23. }))
  24. for _, opt := range opts {
  25. if opt != nil {
  26. options = append(options, opt)
  27. }
  28. }
  29. return grpc.NewServer(options...)
  30. }
  31. func GrpcDial(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
  32. // opts = append(opts, grpc.WithBlock())
  33. // opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second)))
  34. var options []grpc.DialOption
  35. options = append(options,
  36. // grpc.WithInsecure(),
  37. grpc.WithKeepaliveParams(keepalive.ClientParameters{
  38. Time: 30 * time.Second, // client ping server if no activity for this long
  39. Timeout: 20 * time.Second,
  40. }))
  41. for _, opt := range opts {
  42. if opt != nil {
  43. options = append(options, opt)
  44. }
  45. }
  46. return grpc.Dial(address, options...)
  47. }
  48. func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
  49. grpcClientsLock.Lock()
  50. existingConnection, found := grpcClients[address]
  51. if found {
  52. grpcClientsLock.Unlock()
  53. return fn(existingConnection)
  54. }
  55. grpcConnection, err := GrpcDial(address, opts...)
  56. if err != nil {
  57. grpcClientsLock.Unlock()
  58. return fmt.Errorf("fail to dial %s: %v", address, err)
  59. }
  60. grpcClients[address] = grpcConnection
  61. grpcClientsLock.Unlock()
  62. err = fn(grpcConnection)
  63. if err != nil {
  64. grpcClientsLock.Lock()
  65. delete(grpcClients, address)
  66. grpcClientsLock.Unlock()
  67. }
  68. return err
  69. }
  70. func ParseServerToGrpcAddress(server string, optionalGrpcPort int) (serverGrpcAddress string, err error) {
  71. hostnameAndPort := strings.Split(server, ":")
  72. if len(hostnameAndPort) != 2 {
  73. return "", fmt.Errorf("The server should have hostname:port format: %v", hostnameAndPort)
  74. }
  75. filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
  76. if parseErr != nil {
  77. return "", fmt.Errorf("The server port parse error: %v", parseErr)
  78. }
  79. filerGrpcPort := int(filerPort) + 10000
  80. if optionalGrpcPort != 0 {
  81. filerGrpcPort = optionalGrpcPort
  82. }
  83. return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil
  84. }