159 lines
4.4 KiB

  1. package net2
  2. import (
  3. "net"
  4. "strings"
  5. "time"
  6. rp "github.com/chrislusf/seaweedfs/weed/wdclient/resource_pool"
  7. )
  8. const defaultDialTimeout = 1 * time.Second
  9. func defaultDialFunc(network string, address string) (net.Conn, error) {
  10. return net.DialTimeout(network, address, defaultDialTimeout)
  11. }
  12. func parseResourceLocation(resourceLocation string) (
  13. network string,
  14. address string) {
  15. idx := strings.Index(resourceLocation, " ")
  16. if idx >= 0 {
  17. return resourceLocation[:idx], resourceLocation[idx+1:]
  18. }
  19. return "", resourceLocation
  20. }
  21. // A thin wrapper around the underlying resource pool.
  22. type connectionPoolImpl struct {
  23. options ConnectionOptions
  24. pool rp.ResourcePool
  25. }
  26. // This returns a connection pool where all connections are connected
  27. // to the same (network, address)
  28. func newBaseConnectionPool(
  29. options ConnectionOptions,
  30. createPool func(rp.Options) rp.ResourcePool) ConnectionPool {
  31. dial := options.Dial
  32. if dial == nil {
  33. dial = defaultDialFunc
  34. }
  35. openFunc := func(loc string) (interface{}, error) {
  36. network, address := parseResourceLocation(loc)
  37. return dial(network, address)
  38. }
  39. closeFunc := func(handle interface{}) error {
  40. return handle.(net.Conn).Close()
  41. }
  42. poolOptions := rp.Options{
  43. MaxActiveHandles: options.MaxActiveConnections,
  44. MaxIdleHandles: options.MaxIdleConnections,
  45. MaxIdleTime: options.MaxIdleTime,
  46. OpenMaxConcurrency: options.DialMaxConcurrency,
  47. Open: openFunc,
  48. Close: closeFunc,
  49. NowFunc: options.NowFunc,
  50. }
  51. return &connectionPoolImpl{
  52. options: options,
  53. pool: createPool(poolOptions),
  54. }
  55. }
  56. // This returns a connection pool where all connections are connected
  57. // to the same (network, address)
  58. func NewSimpleConnectionPool(options ConnectionOptions) ConnectionPool {
  59. return newBaseConnectionPool(options, rp.NewSimpleResourcePool)
  60. }
  61. // This returns a connection pool that manages multiple (network, address)
  62. // entries. The connections to each (network, address) entry acts
  63. // independently. For example ("tcp", "localhost:11211") could act as memcache
  64. // shard 0 and ("tcp", "localhost:11212") could act as memcache shard 1.
  65. func NewMultiConnectionPool(options ConnectionOptions) ConnectionPool {
  66. return newBaseConnectionPool(
  67. options,
  68. func(poolOptions rp.Options) rp.ResourcePool {
  69. return rp.NewMultiResourcePool(poolOptions, nil)
  70. })
  71. }
  72. // See ConnectionPool for documentation.
  73. func (p *connectionPoolImpl) NumActive() int32 {
  74. return p.pool.NumActive()
  75. }
  76. // See ConnectionPool for documentation.
  77. func (p *connectionPoolImpl) ActiveHighWaterMark() int32 {
  78. return p.pool.ActiveHighWaterMark()
  79. }
  80. // This returns the number of alive idle connections. This method is not part
  81. // of ConnectionPool's API. It is used only for testing.
  82. func (p *connectionPoolImpl) NumIdle() int {
  83. return p.pool.NumIdle()
  84. }
  85. // BaseConnectionPool can only register a single (network, address) entry.
  86. // Register should be call before any Get calls.
  87. func (p *connectionPoolImpl) Register(network string, address string) error {
  88. return p.pool.Register(network + " " + address)
  89. }
  90. // BaseConnectionPool has nothing to do on Unregister.
  91. func (p *connectionPoolImpl) Unregister(network string, address string) error {
  92. return nil
  93. }
  94. func (p *connectionPoolImpl) ListRegistered() []NetworkAddress {
  95. result := make([]NetworkAddress, 0, 1)
  96. for _, location := range p.pool.ListRegistered() {
  97. network, address := parseResourceLocation(location)
  98. result = append(
  99. result,
  100. NetworkAddress{
  101. Network: network,
  102. Address: address,
  103. })
  104. }
  105. return result
  106. }
  107. // This gets an active connection from the connection pool. Note that network
  108. // and address arguments are ignored (The connections with point to the
  109. // network/address provided by the first Register call).
  110. func (p *connectionPoolImpl) Get(
  111. network string,
  112. address string) (ManagedConn, error) {
  113. handle, err := p.pool.Get(network + " " + address)
  114. if err != nil {
  115. return nil, err
  116. }
  117. return NewManagedConn(network, address, handle, p, p.options), nil
  118. }
  119. // See ConnectionPool for documentation.
  120. func (p *connectionPoolImpl) Release(conn ManagedConn) error {
  121. return conn.ReleaseConnection()
  122. }
  123. // See ConnectionPool for documentation.
  124. func (p *connectionPoolImpl) Discard(conn ManagedConn) error {
  125. return conn.DiscardConnection()
  126. }
  127. // See ConnectionPool for documentation.
  128. func (p *connectionPoolImpl) EnterLameDuckMode() {
  129. p.pool.EnterLameDuckMode()
  130. }