You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

91 lines
2.2 KiB

  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "sync/atomic"
  6. "github.com/hashicorp/golang-lru"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. )
  10. var (
  11. bytesCache *lru.Cache
  12. bytesPool *util.BytesPool
  13. )
  14. /*
  15. There are one level of caching, and one level of pooling.
  16. In pooling, all []byte are fetched and returned to the pool bytesPool.
  17. In caching, the string~[]byte mapping is cached
  18. */
  19. func init() {
  20. bytesPool = util.NewBytesPool()
  21. bytesCache, _ = lru.NewWithEvict(50, func(key interface{}, value interface{}) {
  22. value.(*Block).decreaseReference()
  23. })
  24. }
  25. type Block struct {
  26. Bytes []byte
  27. refCount int32
  28. }
  29. func (block *Block) decreaseReference() {
  30. if atomic.AddInt32(&block.refCount, -1) == 0 {
  31. bytesPool.Put(block.Bytes)
  32. }
  33. }
  34. func (block *Block) increaseReference() {
  35. atomic.AddInt32(&block.refCount, 1)
  36. }
  37. // get bytes from the LRU cache of []byte first, then from the bytes pool
  38. // when []byte in LRU cache is evicted, it will be put back to the bytes pool
  39. func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, block *Block, err error) {
  40. //Skip the cache if we are looking for a block that is too big to fit in the cache (defaulting to 10MB)
  41. cacheable := readSize <= (1024*1024*10)
  42. if !cacheable {
  43. glog.V(4).Infoln("Block too big to keep in cache. Size:", readSize)
  44. }
  45. cacheKey := string("")
  46. if cacheable {
  47. // check cache, return if found
  48. cacheKey = fmt.Sprintf("%d:%d:%d", r.Fd(), offset >> 3, readSize)
  49. if obj, found := bytesCache.Get(cacheKey); found {
  50. glog.V(4).Infoln("Found block in cache. Size:", readSize)
  51. block = obj.(*Block)
  52. block.increaseReference()
  53. dataSlice = block.Bytes[0:readSize]
  54. return dataSlice, block, nil
  55. }
  56. }
  57. // get the []byte from pool
  58. b := bytesPool.Get(readSize)
  59. // refCount = 2, one by the bytesCache, one by the actual needle object
  60. refCount := int32(1)
  61. if cacheable {
  62. refCount = 2
  63. }
  64. block = &Block{Bytes: b, refCount: refCount}
  65. dataSlice = block.Bytes[0:readSize]
  66. _, err = r.ReadAt(dataSlice, offset)
  67. if cacheable {
  68. bytesCache.Add(cacheKey, block)
  69. }
  70. return dataSlice, block, err
  71. }
  72. func (n *Needle) ReleaseMemory() {
  73. if n.rawBlock != nil {
  74. n.rawBlock.decreaseReference()
  75. }
  76. }
  77. func ReleaseBytes(b []byte) {
  78. bytesPool.Put(b)
  79. }