diff --git a/go/storage/needle.go b/go/storage/needle.go index 612a89fed..8ab76c0f3 100644 --- a/go/storage/needle.go +++ b/go/storage/needle.go @@ -13,7 +13,6 @@ import ( "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/images" "github.com/chrislusf/seaweedfs/go/operation" - "github.com/chrislusf/seaweedfs/go/util" ) const ( @@ -23,14 +22,6 @@ const ( MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 ) -var ( - BYTESPOOL *util.BytesPool -) - -func init() { - BYTESPOOL = util.NewBytesPool() -} - /* * A Needle means a uploaded and stored file. * Needle file size is limited to 4GB for now. @@ -53,7 +44,7 @@ type Needle struct { Checksum CRC `comment:"CRC32 to check integrity"` Padding []byte `comment:"Aligned to 8 bytes"` - rawBytes []byte // underlying supporing []byte, fetched and released into a pool + rawBlock *Block // underlying supporing []byte, fetched and released into a pool } func (n *Needle) String() (str string) { diff --git a/go/storage/needle_byte_cache.go b/go/storage/needle_byte_cache.go new file mode 100644 index 000000000..c7781917e --- /dev/null +++ b/go/storage/needle_byte_cache.go @@ -0,0 +1,70 @@ +package storage + +import ( + "fmt" + "os" + "sync/atomic" + + "github.com/hashicorp/golang-lru" + + "github.com/chrislusf/seaweedfs/go/util" +) + +var ( + bytesCache *lru.Cache + bytesPool *util.BytesPool +) + +/* +There are one level of caching, and one level of pooling. + +In pooling, all []byte are fetched and returned to the pool bytesPool. + +In caching, the string~[]byte mapping is cached, to +*/ +func init() { + bytesPool = util.NewBytesPool() + bytesCache, _ = lru.NewWithEvict(1, func(key interface{}, value interface{}) { + value.(*Block).decreaseReference() + }) +} + +type Block struct { + Bytes []byte + refCount int32 +} + +func (block *Block) decreaseReference() { + if atomic.AddInt32(&block.refCount, -1) == 0 { + bytesPool.Put(block.Bytes) + } +} +func (block *Block) increaseReference() { + atomic.AddInt32(&block.refCount, 1) +} + +// get bytes from the LRU cache of []byte first, then from the bytes pool +// when []byte in LRU cache is evicted, it will be put back to the bytes pool +func getBytesForFileBlock(r *os.File, offset int64, readSize int) (block *Block, isNew bool) { + // check cache, return if found + cacheKey := fmt.Sprintf("%d:%d:%d", r.Fd(), offset>>3, readSize) + if obj, found := bytesCache.Get(cacheKey); found { + block = obj.(*Block) + block.increaseReference() + return block, false + } + + // get the []byte from pool + b := bytesPool.Get(readSize) + // refCount = 2, one by the bytesCache, one by the actual needle object + block = &Block{Bytes: b, refCount: 2} + bytesCache.Add(cacheKey, block) + return block, true +} + +func (n *Needle) ReleaseMemory() { + n.rawBlock.decreaseReference() +} +func ReleaseBytes(b []byte) { + bytesPool.Put(b) +} diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index 8d051dea3..e2be256be 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -136,33 +136,20 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { return 0, fmt.Errorf("Unsupported Version! (%d)", version) } -func ReleaseBytes(b []byte) { - // println("Releasing", len(b)) - BYTESPOOL.Put(b) -} - -func BorrwoBytes(size int) []byte { - ret := BYTESPOOL.Get(size) - // println("Reading", len(ret)) - return ret -} - -func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice, rawBytes []byte, err error) { +func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) { padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize) readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding - rawBytes = BorrwoBytes(int(readSize)) - dataSlice = rawBytes[0:int(readSize)] - _, err = r.ReadAt(dataSlice, offset) - return -} - -func (n *Needle) ReleaseMemory() { - ReleaseBytes(n.rawBytes) + block, isNew := getBytesForFileBlock(r, offset, int(readSize)) + dataSlice = block.Bytes[0:int(readSize)] + if isNew { + _, err = r.ReadAt(dataSlice, offset) + } + return dataSlice, block, err } func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) { - bytes, rawBytes, err := ReadNeedleBlob(r, offset, size) - n.rawBytes = rawBytes + bytes, block, err := ReadNeedleBlob(r, offset, size) + n.rawBlock = block if err != nil { return err } diff --git a/go/weed/weed_server/volume_server_handlers_sync.go b/go/weed/weed_server/volume_server_handlers_sync.go index cb24f7cd6..c52c93bd2 100644 --- a/go/weed/weed_server/volume_server_handlers_sync.go +++ b/go/weed/weed_server/volume_server_handlers_sync.go @@ -50,8 +50,8 @@ func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *ht } offset := uint32(util.ParseUint64(r.FormValue("offset"), 0)) size := uint32(util.ParseUint64(r.FormValue("size"), 0)) - content, rawBytes, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size) - defer storage.ReleaseBytes(rawBytes) + content, block, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size) + defer storage.ReleaseBytes(block.Bytes) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return