Browse Source

add []byte caching and pooling

fixes https://github.com/chrislusf/seaweedfs/issues/211
pull/302/head
chrislusf 9 years ago
parent
commit
b03e7b26b5
  1. 11
      go/storage/needle.go
  2. 70
      go/storage/needle_byte_cache.go
  3. 27
      go/storage/needle_read_write.go
  4. 4
      go/weed/weed_server/volume_server_handlers_sync.go

11
go/storage/needle.go

@ -13,7 +13,6 @@ import (
"github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/images" "github.com/chrislusf/seaweedfs/go/images"
"github.com/chrislusf/seaweedfs/go/operation" "github.com/chrislusf/seaweedfs/go/operation"
"github.com/chrislusf/seaweedfs/go/util"
) )
const ( const (
@ -23,14 +22,6 @@ const (
MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8
) )
var (
BYTESPOOL *util.BytesPool
)
func init() {
BYTESPOOL = util.NewBytesPool()
}
/* /*
* A Needle means a uploaded and stored file. * A Needle means a uploaded and stored file.
* Needle file size is limited to 4GB for now. * Needle file size is limited to 4GB for now.
@ -53,7 +44,7 @@ type Needle struct {
Checksum CRC `comment:"CRC32 to check integrity"` Checksum CRC `comment:"CRC32 to check integrity"`
Padding []byte `comment:"Aligned to 8 bytes"` Padding []byte `comment:"Aligned to 8 bytes"`
rawBytes []byte // underlying supporing []byte, fetched and released into a pool
rawBlock *Block // underlying supporing []byte, fetched and released into a pool
} }
func (n *Needle) String() (str string) { func (n *Needle) String() (str string) {

70
go/storage/needle_byte_cache.go

@ -0,0 +1,70 @@
package storage
import (
"fmt"
"os"
"sync/atomic"
"github.com/hashicorp/golang-lru"
"github.com/chrislusf/seaweedfs/go/util"
)
var (
bytesCache *lru.Cache
bytesPool *util.BytesPool
)
/*
There are one level of caching, and one level of pooling.
In pooling, all []byte are fetched and returned to the pool bytesPool.
In caching, the string~[]byte mapping is cached, to
*/
func init() {
bytesPool = util.NewBytesPool()
bytesCache, _ = lru.NewWithEvict(1, func(key interface{}, value interface{}) {
value.(*Block).decreaseReference()
})
}
type Block struct {
Bytes []byte
refCount int32
}
func (block *Block) decreaseReference() {
if atomic.AddInt32(&block.refCount, -1) == 0 {
bytesPool.Put(block.Bytes)
}
}
func (block *Block) increaseReference() {
atomic.AddInt32(&block.refCount, 1)
}
// get bytes from the LRU cache of []byte first, then from the bytes pool
// when []byte in LRU cache is evicted, it will be put back to the bytes pool
func getBytesForFileBlock(r *os.File, offset int64, readSize int) (block *Block, isNew bool) {
// check cache, return if found
cacheKey := fmt.Sprintf("%d:%d:%d", r.Fd(), offset>>3, readSize)
if obj, found := bytesCache.Get(cacheKey); found {
block = obj.(*Block)
block.increaseReference()
return block, false
}
// get the []byte from pool
b := bytesPool.Get(readSize)
// refCount = 2, one by the bytesCache, one by the actual needle object
block = &Block{Bytes: b, refCount: 2}
bytesCache.Add(cacheKey, block)
return block, true
}
func (n *Needle) ReleaseMemory() {
n.rawBlock.decreaseReference()
}
func ReleaseBytes(b []byte) {
bytesPool.Put(b)
}

27
go/storage/needle_read_write.go

@ -136,33 +136,20 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
return 0, fmt.Errorf("Unsupported Version! (%d)", version) return 0, fmt.Errorf("Unsupported Version! (%d)", version)
} }
func ReleaseBytes(b []byte) {
// println("Releasing", len(b))
BYTESPOOL.Put(b)
}
func BorrwoBytes(size int) []byte {
ret := BYTESPOOL.Get(size)
// println("Reading", len(ret))
return ret
}
func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice, rawBytes []byte, err error) {
func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) {
padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize) padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize)
readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding
rawBytes = BorrwoBytes(int(readSize))
dataSlice = rawBytes[0:int(readSize)]
block, isNew := getBytesForFileBlock(r, offset, int(readSize))
dataSlice = block.Bytes[0:int(readSize)]
if isNew {
_, err = r.ReadAt(dataSlice, offset) _, err = r.ReadAt(dataSlice, offset)
return
} }
func (n *Needle) ReleaseMemory() {
ReleaseBytes(n.rawBytes)
return dataSlice, block, err
} }
func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) { func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) {
bytes, rawBytes, err := ReadNeedleBlob(r, offset, size)
n.rawBytes = rawBytes
bytes, block, err := ReadNeedleBlob(r, offset, size)
n.rawBlock = block
if err != nil { if err != nil {
return err return err
} }

4
go/weed/weed_server/volume_server_handlers_sync.go

@ -50,8 +50,8 @@ func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *ht
} }
offset := uint32(util.ParseUint64(r.FormValue("offset"), 0)) offset := uint32(util.ParseUint64(r.FormValue("offset"), 0))
size := uint32(util.ParseUint64(r.FormValue("size"), 0)) size := uint32(util.ParseUint64(r.FormValue("size"), 0))
content, rawBytes, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size)
defer storage.ReleaseBytes(rawBytes)
content, block, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size)
defer storage.ReleaseBytes(block.Bytes)
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
return return

Loading…
Cancel
Save