You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

129 lines
3.4 KiB

6 years ago
6 years ago
6 years ago
5 years ago
5 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package util
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "google.golang.org/grpc"
  11. "google.golang.org/grpc/keepalive"
  12. )
  13. var (
  14. // cache grpc connections
  15. grpcClients = make(map[string]*grpc.ClientConn)
  16. grpcClientsLock sync.Mutex
  17. )
  18. func init() {
  19. http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 1024
  20. }
  21. func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
  22. var options []grpc.ServerOption
  23. options = append(options, grpc.KeepaliveParams(keepalive.ServerParameters{
  24. Time: 10 * time.Second, // wait time before ping if no activity
  25. Timeout: 20 * time.Second, // ping timeout
  26. }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  27. MinTime: 60 * time.Second, // min time a client should wait before sending a ping
  28. PermitWithoutStream: true,
  29. }))
  30. for _, opt := range opts {
  31. if opt != nil {
  32. options = append(options, opt)
  33. }
  34. }
  35. return grpc.NewServer(options...)
  36. }
  37. func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
  38. // opts = append(opts, grpc.WithBlock())
  39. // opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second)))
  40. var options []grpc.DialOption
  41. options = append(options,
  42. // grpc.WithInsecure(),
  43. grpc.WithKeepaliveParams(keepalive.ClientParameters{
  44. Time: 30 * time.Second, // client ping server if no activity for this long
  45. Timeout: 20 * time.Second,
  46. PermitWithoutStream: true,
  47. }))
  48. for _, opt := range opts {
  49. if opt != nil {
  50. options = append(options, opt)
  51. }
  52. }
  53. return grpc.DialContext(ctx, address, options...)
  54. }
  55. func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
  56. grpcClientsLock.Lock()
  57. existingConnection, found := grpcClients[address]
  58. if found {
  59. grpcClientsLock.Unlock()
  60. err := fn(existingConnection)
  61. if err != nil {
  62. grpcClientsLock.Lock()
  63. delete(grpcClients, address)
  64. grpcClientsLock.Unlock()
  65. existingConnection.Close()
  66. }
  67. return err
  68. }
  69. grpcConnection, err := GrpcDial(context.Background(), address, opts...)
  70. if err != nil {
  71. grpcClientsLock.Unlock()
  72. return fmt.Errorf("fail to dial %s: %v", address, err)
  73. }
  74. grpcClients[address] = grpcConnection
  75. grpcClientsLock.Unlock()
  76. err = fn(grpcConnection)
  77. if err != nil {
  78. grpcClientsLock.Lock()
  79. delete(grpcClients, address)
  80. grpcClientsLock.Unlock()
  81. grpcConnection.Close()
  82. }
  83. return err
  84. }
  85. func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) {
  86. colonIndex := strings.LastIndex(server, ":")
  87. if colonIndex < 0 {
  88. return "", fmt.Errorf("server should have hostname:port format: %v", server)
  89. }
  90. port, parseErr := strconv.ParseUint(server[colonIndex+1:], 10, 64)
  91. if parseErr != nil {
  92. return "", fmt.Errorf("server port parse error: %v", parseErr)
  93. }
  94. grpcPort := int(port) + 10000
  95. return fmt.Sprintf("%s:%d", server[:colonIndex], grpcPort), nil
  96. }
  97. func ServerToGrpcAddress(server string) (serverGrpcAddress string) {
  98. hostnameAndPort := strings.Split(server, ":")
  99. if len(hostnameAndPort) != 2 {
  100. return fmt.Sprintf("unexpected server address: %s", server)
  101. }
  102. port, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
  103. if parseErr != nil {
  104. return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1])
  105. }
  106. grpcPort := int(port) + 10000
  107. return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort)
  108. }