Browse Source

correct cache: fix racing condition

pull/302/head
chrislusf 9 years ago
parent
commit
adcfaa5735
  1. 9
      go/storage/needle_byte_cache.go
  2. 7
      go/storage/needle_read_write.go

9
go/storage/needle_byte_cache.go

@ -45,21 +45,24 @@ func (block *Block) increaseReference() {
// get bytes from the LRU cache of []byte first, then from the bytes pool // get bytes from the LRU cache of []byte first, then from the bytes pool
// when []byte in LRU cache is evicted, it will be put back to the bytes pool // when []byte in LRU cache is evicted, it will be put back to the bytes pool
func getBytesForFileBlock(r *os.File, offset int64, readSize int) (block *Block, isNew bool) {
func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, block *Block, err error) {
// check cache, return if found // check cache, return if found
cacheKey := fmt.Sprintf("%d:%d:%d", r.Fd(), offset>>3, readSize) cacheKey := fmt.Sprintf("%d:%d:%d", r.Fd(), offset>>3, readSize)
if obj, found := bytesCache.Get(cacheKey); found { if obj, found := bytesCache.Get(cacheKey); found {
block = obj.(*Block) block = obj.(*Block)
block.increaseReference() block.increaseReference()
return block, false
dataSlice = block.Bytes[0:readSize]
return dataSlice, block, nil
} }
// get the []byte from pool // get the []byte from pool
b := bytesPool.Get(readSize) b := bytesPool.Get(readSize)
// refCount = 2, one by the bytesCache, one by the actual needle object // refCount = 2, one by the bytesCache, one by the actual needle object
block = &Block{Bytes: b, refCount: 2} block = &Block{Bytes: b, refCount: 2}
dataSlice = block.Bytes[0:readSize]
_, err = r.ReadAt(dataSlice, offset)
bytesCache.Add(cacheKey, block) bytesCache.Add(cacheKey, block)
return block, true
return dataSlice, block, err
} }
func (n *Needle) ReleaseMemory() { func (n *Needle) ReleaseMemory() {

7
go/storage/needle_read_write.go

@ -139,12 +139,7 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) { func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) {
padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize) padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize)
readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding
block, isNew := getBytesForFileBlock(r, offset, int(readSize))
dataSlice = block.Bytes[0:int(readSize)]
if isNew {
_, err = r.ReadAt(dataSlice, offset)
}
return dataSlice, block, err
return getBytesForFileBlock(r, offset, int(readSize))
} }
func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) { func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) {

Loading…
Cancel
Save