You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

251 lines
7.1 KiB

5 years ago
6 years ago
4 years ago
6 years ago
5 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
5 years ago
6 years ago
5 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
5 years ago
6 years ago
5 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
  1. package pb
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "math/rand"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "google.golang.org/grpc"
  13. "google.golang.org/grpc/keepalive"
  14. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  15. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  16. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  17. )
  18. const (
  19. Max_Message_Size = 1 << 30 // 1 GB
  20. )
  21. var (
  22. // cache grpc connections
  23. grpcClients = make(map[string]*versionedGrpcClient)
  24. grpcClientsLock sync.Mutex
  25. )
  26. type versionedGrpcClient struct {
  27. *grpc.ClientConn
  28. version int
  29. }
  30. func init() {
  31. http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 1024
  32. http.DefaultTransport.(*http.Transport).MaxIdleConns = 1024
  33. }
  34. func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
  35. var options []grpc.ServerOption
  36. options = append(options,
  37. grpc.KeepaliveParams(keepalive.ServerParameters{
  38. Time: 10 * time.Second, // wait time before ping if no activity
  39. Timeout: 20 * time.Second, // ping timeout
  40. MaxConnectionAge: 10 * time.Hour,
  41. }),
  42. grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  43. MinTime: 60 * time.Second, // min time a client should wait before sending a ping
  44. PermitWithoutStream: false,
  45. }),
  46. grpc.MaxRecvMsgSize(Max_Message_Size),
  47. grpc.MaxSendMsgSize(Max_Message_Size),
  48. )
  49. for _, opt := range opts {
  50. if opt != nil {
  51. options = append(options, opt)
  52. }
  53. }
  54. return grpc.NewServer(options...)
  55. }
  56. func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
  57. // opts = append(opts, grpc.WithBlock())
  58. // opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second)))
  59. var options []grpc.DialOption
  60. options = append(options,
  61. // grpc.WithInsecure(),
  62. grpc.WithDefaultCallOptions(
  63. grpc.MaxCallSendMsgSize(Max_Message_Size),
  64. grpc.MaxCallRecvMsgSize(Max_Message_Size),
  65. ),
  66. grpc.WithKeepaliveParams(keepalive.ClientParameters{
  67. Time: 30 * time.Second, // client ping server if no activity for this long
  68. Timeout: 20 * time.Second,
  69. PermitWithoutStream: false,
  70. }))
  71. for _, opt := range opts {
  72. if opt != nil {
  73. options = append(options, opt)
  74. }
  75. }
  76. return grpc.DialContext(ctx, address, options...)
  77. }
  78. func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedGrpcClient, error) {
  79. grpcClientsLock.Lock()
  80. defer grpcClientsLock.Unlock()
  81. existingConnection, found := grpcClients[address]
  82. if found {
  83. return existingConnection, nil
  84. }
  85. grpcConnection, err := GrpcDial(context.Background(), address, opts...)
  86. if err != nil {
  87. return nil, fmt.Errorf("fail to dial %s: %v", address, err)
  88. }
  89. vgc := &versionedGrpcClient{
  90. grpcConnection,
  91. rand.Int(),
  92. }
  93. grpcClients[address] = vgc
  94. return vgc, nil
  95. }
  96. func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
  97. vgc, err := getOrCreateConnection(address, opts...)
  98. if err != nil {
  99. return fmt.Errorf("getOrCreateConnection %s: %v", address, err)
  100. }
  101. executionErr := fn(vgc.ClientConn)
  102. if executionErr != nil && strings.Contains(executionErr.Error(), "transport") {
  103. grpcClientsLock.Lock()
  104. if t, ok := grpcClients[address]; ok {
  105. if t.version == vgc.version {
  106. vgc.Close()
  107. delete(grpcClients, address)
  108. }
  109. }
  110. grpcClientsLock.Unlock()
  111. }
  112. return executionErr
  113. }
  114. func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) {
  115. return ParseServerAddress(server, 10000)
  116. }
  117. func ParseServersToGrpcAddresses(servers []string) (serverGrpcAddresses []string, err error) {
  118. for _, server := range servers {
  119. if serverGrpcAddress, parseErr := ParseServerToGrpcAddress(server); parseErr == nil {
  120. serverGrpcAddresses = append(serverGrpcAddresses, serverGrpcAddress)
  121. } else {
  122. return nil, parseErr
  123. }
  124. }
  125. return
  126. }
  127. func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) {
  128. host, port, parseErr := hostAndPort(server)
  129. if parseErr != nil {
  130. return "", fmt.Errorf("server port parse error: %v", parseErr)
  131. }
  132. newPort := int(port) + deltaPort
  133. return fmt.Sprintf("%s:%d", host, newPort), nil
  134. }
  135. func hostAndPort(address string) (host string, port uint64, err error) {
  136. colonIndex := strings.LastIndex(address, ":")
  137. if colonIndex < 0 {
  138. return "", 0, fmt.Errorf("server should have hostname:port format: %v", address)
  139. }
  140. port, err = strconv.ParseUint(address[colonIndex+1:], 10, 64)
  141. if err != nil {
  142. return "", 0, fmt.Errorf("server port parse error: %v", err)
  143. }
  144. return address[:colonIndex], port, err
  145. }
  146. func ServerToGrpcAddress(server string) (serverGrpcAddress string) {
  147. host, port, parseErr := hostAndPort(server)
  148. if parseErr != nil {
  149. glog.Fatalf("server address %s parse error: %v", server, parseErr)
  150. }
  151. grpcPort := int(port) + 10000
  152. return fmt.Sprintf("%s:%d", host, grpcPort)
  153. }
  154. func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) {
  155. host, grpcPort, parseErr := hostAndPort(grpcAddress)
  156. if parseErr != nil {
  157. glog.Fatalf("server grpc address %s parse error: %v", grpcAddress, parseErr)
  158. }
  159. port := int(grpcPort) - 10000
  160. return fmt.Sprintf("%s:%d", host, port)
  161. }
  162. func WithMasterClient(master string, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error {
  163. masterGrpcAddress, parseErr := ParseServerToGrpcAddress(master)
  164. if parseErr != nil {
  165. return fmt.Errorf("failed to parse master grpc %v: %v", master, parseErr)
  166. }
  167. return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  168. client := master_pb.NewSeaweedClient(grpcConnection)
  169. return fn(client)
  170. }, masterGrpcAddress, grpcDialOption)
  171. }
  172. func WithBrokerGrpcClient(brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error {
  173. return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  174. client := messaging_pb.NewSeaweedMessagingClient(grpcConnection)
  175. return fn(client)
  176. }, brokerGrpcAddress, grpcDialOption)
  177. }
  178. func WithFilerClient(filer string, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
  179. filerGrpcAddress, parseErr := ParseServerToGrpcAddress(filer)
  180. if parseErr != nil {
  181. return fmt.Errorf("failed to parse filer grpc %v: %v", filer, parseErr)
  182. }
  183. return WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, fn)
  184. }
  185. func WithGrpcFilerClient(filerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
  186. return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  187. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  188. return fn(client)
  189. }, filerGrpcAddress, grpcDialOption)
  190. }
  191. func WithOneOfGrpcFilerClients(filerGrpcAddresses []string, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) {
  192. for _, filerGrpcAddress := range filerGrpcAddresses {
  193. err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  194. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  195. return fn(client)
  196. }, filerGrpcAddress, grpcDialOption)
  197. if err == nil {
  198. return nil
  199. }
  200. }
  201. return err
  202. }