You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

117 lines
3.1 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package util
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. "time"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/keepalive"
  11. )
  12. var (
  13. // cache grpc connections
  14. grpcClients = make(map[string]*grpc.ClientConn)
  15. grpcClientsLock sync.Mutex
  16. )
  17. func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
  18. var options []grpc.ServerOption
  19. options = append(options, grpc.KeepaliveParams(keepalive.ServerParameters{
  20. Time: 10 * time.Second, // wait time before ping if no activity
  21. Timeout: 20 * time.Second, // ping timeout
  22. }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  23. MinTime: 60 * time.Second, // min time a client should wait before sending a ping
  24. }))
  25. for _, opt := range opts {
  26. if opt != nil {
  27. options = append(options, opt)
  28. }
  29. }
  30. return grpc.NewServer(options...)
  31. }
  32. func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
  33. // opts = append(opts, grpc.WithBlock())
  34. // opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second)))
  35. var options []grpc.DialOption
  36. options = append(options,
  37. // grpc.WithInsecure(),
  38. grpc.WithKeepaliveParams(keepalive.ClientParameters{
  39. Time: 30 * time.Second, // client ping server if no activity for this long
  40. Timeout: 20 * time.Second,
  41. }))
  42. for _, opt := range opts {
  43. if opt != nil {
  44. options = append(options, opt)
  45. }
  46. }
  47. return grpc.DialContext(ctx, address, options...)
  48. }
  49. func WithCachedGrpcClient(ctx context.Context, fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
  50. grpcClientsLock.Lock()
  51. existingConnection, found := grpcClients[address]
  52. if found {
  53. grpcClientsLock.Unlock()
  54. return fn(existingConnection)
  55. }
  56. grpcConnection, err := GrpcDial(ctx, address, opts...)
  57. if err != nil {
  58. grpcClientsLock.Unlock()
  59. return fmt.Errorf("fail to dial %s: %v", address, err)
  60. }
  61. grpcClients[address] = grpcConnection
  62. grpcClientsLock.Unlock()
  63. err = fn(grpcConnection)
  64. if err != nil {
  65. grpcClientsLock.Lock()
  66. delete(grpcClients, address)
  67. grpcClientsLock.Unlock()
  68. }
  69. return err
  70. }
  71. func ParseServerToGrpcAddress(server string, optionalGrpcPort int) (serverGrpcAddress string, err error) {
  72. hostnameAndPort := strings.Split(server, ":")
  73. if len(hostnameAndPort) != 2 {
  74. return "", fmt.Errorf("server should have hostname:port format: %v", hostnameAndPort)
  75. }
  76. port, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
  77. if parseErr != nil {
  78. return "", fmt.Errorf("server port parse error: %v", parseErr)
  79. }
  80. grpcPort := int(port) + 10000
  81. if optionalGrpcPort != 0 {
  82. grpcPort = optionalGrpcPort
  83. }
  84. return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort), nil
  85. }
  86. func ServerToGrpcAddress(server string, defaultGrpcPort int) (serverGrpcAddress string) {
  87. hostnameAndPort := strings.Split(server, ":")
  88. if len(hostnameAndPort) != 2 {
  89. return fmt.Sprintf("%s:%d", server, defaultGrpcPort)
  90. }
  91. port, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
  92. if parseErr != nil {
  93. return fmt.Sprintf("%s:%d", hostnameAndPort[0], defaultGrpcPort)
  94. }
  95. grpcPort := int(port) + 10000
  96. return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort)
  97. }