diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index 6820f8696..d4d6d3358 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" "sync" + "time" "google.golang.org/grpc" @@ -17,7 +18,7 @@ import ( ) var ( - connectionPool = make(map[string]*sync.Pool) + connectionPool = make(map[string]*util.ResourcePool) connectionPoolLock sync.Mutex ) @@ -53,41 +54,49 @@ func WithVolumeServerTcpConnection(volumeServer string, fn func(conn net.Conn) e return err } - conn := getConnection(tcpAddress) + conn, err := getConnection(tcpAddress) + if err != nil { + return err + } defer releaseConnection(conn, tcpAddress) err = fn(conn) return err } -func getConnection(tcpAddress string) net.Conn { +func getConnection(tcpAddress string) (net.Conn, error) { connectionPoolLock.Lock() defer connectionPoolLock.Unlock() pool, found := connectionPool[tcpAddress] if !found { println("creating pool for", tcpAddress) - pool = &sync.Pool{New: func() interface{} { - raddr, err := net.ResolveTCPAddr("tcp", tcpAddress) - if err != nil { - glog.Fatal(err) - } + + raddr, err := net.ResolveTCPAddr("tcp", tcpAddress) + if err != nil { + glog.Fatal(err) + } + + pool = util.NewResourcePool(16, func() (interface{}, error) { conn, err := net.DialTCP("tcp", nil, raddr) if err != nil { - glog.Errorf("failed to connect to %s: %v", tcpAddress, err) - return conn + return conn, err } conn.SetKeepAlive(true) conn.SetNoDelay(true) println("connected", tcpAddress, "=>", conn.LocalAddr().String()) - return conn - }} + return conn, nil + }) connectionPool[tcpAddress] = pool } - conn := pool.Get().(net.Conn) + + connObj, err := pool.Get(time.Minute) + if err != nil { + return nil, err + } // println("get connection", tcpAddress, "=>", conn.LocalAddr().String()) - return conn + return connObj.(net.Conn), nil } func releaseConnection(conn net.Conn, tcpAddress string) { @@ -99,7 +108,7 @@ func releaseConnection(conn net.Conn, tcpAddress string) { println("can not return connection", tcpAddress, "=>", conn.LocalAddr().String()) return } - pool.Put(conn) + pool.Release(conn) // println("returned connection", tcpAddress, "=>", conn.LocalAddr().String()) } diff --git a/weed/util/pool.go b/weed/util/pool.go new file mode 100644 index 000000000..db137ec0b --- /dev/null +++ b/weed/util/pool.go @@ -0,0 +1,98 @@ +package util + +import ( + "errors" + "sync" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +var ( + TimeoutErr = errors.New("timeout") +) + +// A bufferedChan implemented by a buffered channel +type ResourcePool struct { + sync.Mutex + bufferedChan chan interface{} + poolSizeLimit int + inuse int + newFn func() (interface{}, error) +} + +func NewResourcePool(poolSizeLimit int, newFn func() (interface{}, error)) *ResourcePool { + p := &ResourcePool{ + poolSizeLimit: poolSizeLimit, + newFn: newFn, + bufferedChan: make(chan interface{}, poolSizeLimit), + } + return p +} + +func (p *ResourcePool) Size() int { + p.Lock() + defer p.Unlock() + return len(p.bufferedChan) + p.inuse +} + +func (p *ResourcePool) Free() int { + p.Lock() + defer p.Unlock() + return p.poolSizeLimit - p.inuse +} + +func (p *ResourcePool) Get(timeout time.Duration) (interface{}, error) { + d, err := p.get(timeout) + if err != nil { + return nil, err + } + if d == nil && p.newFn != nil { + var err error + d, err = p.newFn() + if err != nil { + return nil, err + } + } + p.Lock() + defer p.Unlock() + p.inuse++ + return d, nil +} + +func (p *ResourcePool) Release(v interface{}) { + + p.Lock() + defer p.Unlock() + if p.inuse == 0 { + glog.V(0).Infof("released too many times?") + return + } + p.bufferedChan <- v + p.inuse-- +} + +func (p *ResourcePool) get(timeout time.Duration) (interface{}, error) { + + select { + case v := <-p.bufferedChan: + return v, nil + default: + } + + if p.Free() > 0 { + d, err := p.newFn() + if err != nil { + return nil, err + } + return d, nil + } + + // wait for an freed item + select { + case v := <-p.bufferedChan: + return v, nil + case <-time.After(timeout): + } + return nil, TimeoutErr +}