Browse Source

final attempt

on par with 1K sized object, but no so good with large ones

the default http flow control is better than current implementation.
tcp_read
Chris Lu 5 years ago
parent
commit
1477eead01
  1. 31
      weed/operation/grpc_client.go
  2. 98
      weed/util/pool.go

31
weed/operation/grpc_client.go

@ -7,6 +7,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -17,7 +18,7 @@ import (
) )
var ( var (
connectionPool = make(map[string]*sync.Pool)
connectionPool = make(map[string]*util.ResourcePool)
connectionPoolLock sync.Mutex connectionPoolLock sync.Mutex
) )
@ -53,41 +54,49 @@ func WithVolumeServerTcpConnection(volumeServer string, fn func(conn net.Conn) e
return err return err
} }
conn := getConnection(tcpAddress)
conn, err := getConnection(tcpAddress)
if err != nil {
return err
}
defer releaseConnection(conn, tcpAddress) defer releaseConnection(conn, tcpAddress)
err = fn(conn) err = fn(conn)
return err return err
} }
func getConnection(tcpAddress string) net.Conn {
func getConnection(tcpAddress string) (net.Conn, error) {
connectionPoolLock.Lock() connectionPoolLock.Lock()
defer connectionPoolLock.Unlock() defer connectionPoolLock.Unlock()
pool, found := connectionPool[tcpAddress] pool, found := connectionPool[tcpAddress]
if !found { if !found {
println("creating pool for", tcpAddress) println("creating pool for", tcpAddress)
pool = &sync.Pool{New: func() interface{} {
raddr, err := net.ResolveTCPAddr("tcp", tcpAddress) raddr, err := net.ResolveTCPAddr("tcp", tcpAddress)
if err != nil { if err != nil {
glog.Fatal(err) glog.Fatal(err)
} }
pool = util.NewResourcePool(16, func() (interface{}, error) {
conn, err := net.DialTCP("tcp", nil, raddr) conn, err := net.DialTCP("tcp", nil, raddr)
if err != nil { if err != nil {
glog.Errorf("failed to connect to %s: %v", tcpAddress, err)
return conn
return conn, err
} }
conn.SetKeepAlive(true) conn.SetKeepAlive(true)
conn.SetNoDelay(true) conn.SetNoDelay(true)
println("connected", tcpAddress, "=>", conn.LocalAddr().String()) println("connected", tcpAddress, "=>", conn.LocalAddr().String())
return conn
}}
return conn, nil
})
connectionPool[tcpAddress] = pool connectionPool[tcpAddress] = pool
} }
conn := pool.Get().(net.Conn)
connObj, err := pool.Get(time.Minute)
if err != nil {
return nil, err
}
// println("get connection", tcpAddress, "=>", conn.LocalAddr().String()) // println("get connection", tcpAddress, "=>", conn.LocalAddr().String())
return conn
return connObj.(net.Conn), nil
} }
func releaseConnection(conn net.Conn, tcpAddress string) { func releaseConnection(conn net.Conn, tcpAddress string) {
@ -99,7 +108,7 @@ func releaseConnection(conn net.Conn, tcpAddress string) {
println("can not return connection", tcpAddress, "=>", conn.LocalAddr().String()) println("can not return connection", tcpAddress, "=>", conn.LocalAddr().String())
return return
} }
pool.Put(conn)
pool.Release(conn)
// println("returned connection", tcpAddress, "=>", conn.LocalAddr().String()) // println("returned connection", tcpAddress, "=>", conn.LocalAddr().String())
} }

98
weed/util/pool.go

@ -0,0 +1,98 @@
package util
import (
"errors"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
)
var (
TimeoutErr = errors.New("timeout")
)
// A bufferedChan implemented by a buffered channel
type ResourcePool struct {
sync.Mutex
bufferedChan chan interface{}
poolSizeLimit int
inuse int
newFn func() (interface{}, error)
}
func NewResourcePool(poolSizeLimit int, newFn func() (interface{}, error)) *ResourcePool {
p := &ResourcePool{
poolSizeLimit: poolSizeLimit,
newFn: newFn,
bufferedChan: make(chan interface{}, poolSizeLimit),
}
return p
}
func (p *ResourcePool) Size() int {
p.Lock()
defer p.Unlock()
return len(p.bufferedChan) + p.inuse
}
func (p *ResourcePool) Free() int {
p.Lock()
defer p.Unlock()
return p.poolSizeLimit - p.inuse
}
func (p *ResourcePool) Get(timeout time.Duration) (interface{}, error) {
d, err := p.get(timeout)
if err != nil {
return nil, err
}
if d == nil && p.newFn != nil {
var err error
d, err = p.newFn()
if err != nil {
return nil, err
}
}
p.Lock()
defer p.Unlock()
p.inuse++
return d, nil
}
func (p *ResourcePool) Release(v interface{}) {
p.Lock()
defer p.Unlock()
if p.inuse == 0 {
glog.V(0).Infof("released too many times?")
return
}
p.bufferedChan <- v
p.inuse--
}
func (p *ResourcePool) get(timeout time.Duration) (interface{}, error) {
select {
case v := <-p.bufferedChan:
return v, nil
default:
}
if p.Free() > 0 {
d, err := p.newFn()
if err != nil {
return nil, err
}
return d, nil
}
// wait for an freed item
select {
case v := <-p.bufferedChan:
return v, nil
case <-time.After(timeout):
}
return nil, TimeoutErr
}
Loading…
Cancel
Save