You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

130 lines
3.7 KiB

6 years ago
6 years ago
6 years ago
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "google.golang.org/grpc"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. )
  15. var (
  16. connectionPool = make(map[string]*sync.Pool)
  17. connectionPoolLock sync.Mutex
  18. )
  19. func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(context.Context, volume_server_pb.VolumeServerClient) error) error {
  20. ctx := context.Background()
  21. grpcAddress, err := toVolumeServerGrpcAddress(volumeServer)
  22. if err != nil {
  23. return err
  24. }
  25. return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
  26. client := volume_server_pb.NewVolumeServerClient(grpcConnection)
  27. return fn(ctx2, client)
  28. }, grpcAddress, grpcDialOption)
  29. }
  30. func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err error) {
  31. sepIndex := strings.LastIndex(volumeServer, ":")
  32. port, err := strconv.Atoi(volumeServer[sepIndex+1:])
  33. if err != nil {
  34. glog.Errorf("failed to parse volume server address: %v", volumeServer)
  35. return "", err
  36. }
  37. return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil
  38. }
  39. func WithVolumeServerTcpConnection(volumeServer string, fn func(conn net.Conn) error) error {
  40. tcpAddress, err := toVolumeServerTcpAddress(volumeServer)
  41. if err != nil {
  42. return err
  43. }
  44. conn := getConnection(tcpAddress)
  45. defer releaseConnection(conn, tcpAddress)
  46. err = fn(conn)
  47. return err
  48. }
  49. func getConnection(tcpAddress string) net.Conn {
  50. connectionPoolLock.Lock()
  51. defer connectionPoolLock.Unlock()
  52. pool, found := connectionPool[tcpAddress]
  53. if !found {
  54. println("creating pool for", tcpAddress)
  55. pool = &sync.Pool{New: func() interface{} {
  56. raddr, err := net.ResolveTCPAddr("tcp", tcpAddress)
  57. if err != nil {
  58. glog.Fatal(err)
  59. }
  60. conn, err := net.DialTCP("tcp", nil, raddr)
  61. if err != nil {
  62. glog.Errorf("failed to connect to %s: %v", tcpAddress, err)
  63. return conn
  64. }
  65. conn.SetKeepAlive(true)
  66. conn.SetNoDelay(true)
  67. println("connected", tcpAddress, "=>", conn.LocalAddr().String())
  68. return conn
  69. }}
  70. connectionPool[tcpAddress] = pool
  71. }
  72. conn := pool.Get().(net.Conn)
  73. // println("get connection", tcpAddress, "=>", conn.LocalAddr().String())
  74. return conn
  75. }
  76. func releaseConnection(conn net.Conn, tcpAddress string) {
  77. connectionPoolLock.Lock()
  78. defer connectionPoolLock.Unlock()
  79. pool, found := connectionPool[tcpAddress]
  80. if !found {
  81. println("can not return connection", tcpAddress, "=>", conn.LocalAddr().String())
  82. return
  83. }
  84. pool.Put(conn)
  85. // println("returned connection", tcpAddress, "=>", conn.LocalAddr().String())
  86. }
  87. func toVolumeServerTcpAddress(volumeServer string) (grpcAddress string, err error) {
  88. sepIndex := strings.LastIndex(volumeServer, ":")
  89. port, err := strconv.Atoi(volumeServer[sepIndex+1:])
  90. if err != nil {
  91. glog.Errorf("failed to parse volume server address: %v", volumeServer)
  92. return "", err
  93. }
  94. return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+20000), nil
  95. }
  96. func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(ctx2 context.Context, masterClient master_pb.SeaweedClient) error) error {
  97. ctx := context.Background()
  98. masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer)
  99. if parseErr != nil {
  100. return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr)
  101. }
  102. return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
  103. client := master_pb.NewSeaweedClient(grpcConnection)
  104. return fn(ctx2, client)
  105. }, masterGrpcAddress, grpcDialOption)
  106. }