You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

190 lines
4.3 KiB

5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
5 years ago
5 years ago
4 years ago
4 years ago
4 years ago
5 years ago
  1. package chunk_cache
  2. import (
  3. "errors"
  4. "sync"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  7. )
  8. var ErrorOutOfBounds = errors.New("attempt to read out of bounds")
  9. type ChunkCache interface {
  10. ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error)
  11. SetChunk(fileId string, data []byte)
  12. IsInCache(fileId string, lockNeeded bool) (answer bool)
  13. GetMaxFilePartSizeInCache() (answer uint64)
  14. }
  15. // a global cache for recently accessed file chunks
  16. type TieredChunkCache struct {
  17. memCache *ChunkCacheInMemory
  18. diskCaches []*OnDiskCacheLayer
  19. sync.RWMutex
  20. onDiskCacheSizeLimit0 uint64
  21. onDiskCacheSizeLimit1 uint64
  22. onDiskCacheSizeLimit2 uint64
  23. maxFilePartSizeInCache uint64
  24. }
  25. var _ ChunkCache = &TieredChunkCache{}
  26. func NewTieredChunkCache(maxEntries int64, dir string, diskSizeInUnit int64, unitSize int64) *TieredChunkCache {
  27. c := &TieredChunkCache{
  28. memCache: NewChunkCacheInMemory(maxEntries),
  29. }
  30. c.diskCaches = make([]*OnDiskCacheLayer, 3)
  31. c.onDiskCacheSizeLimit0 = uint64(unitSize)
  32. c.onDiskCacheSizeLimit1 = 4 * c.onDiskCacheSizeLimit0
  33. c.onDiskCacheSizeLimit2 = 2 * c.onDiskCacheSizeLimit1
  34. c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_2", diskSizeInUnit*unitSize/8, 2)
  35. c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_3", diskSizeInUnit*unitSize/4+diskSizeInUnit*unitSize/8, 3)
  36. c.diskCaches[2] = NewOnDiskCacheLayer(dir, "c2_2", diskSizeInUnit*unitSize/2, 2)
  37. c.maxFilePartSizeInCache = uint64(unitSize*diskSizeInUnit)/4
  38. return c
  39. }
  40. func (c *TieredChunkCache) GetMaxFilePartSizeInCache() (answer uint64) {
  41. return c.maxFilePartSizeInCache
  42. }
  43. func (c *TieredChunkCache) IsInCache(fileId string, lockNeeded bool) (answer bool) {
  44. if c == nil {
  45. return false
  46. }
  47. if lockNeeded {
  48. c.RLock()
  49. defer c.RUnlock()
  50. }
  51. item := c.memCache.cache.Get(fileId)
  52. if item != nil {
  53. glog.V(4).Infof("fileId %s is in memcache", fileId)
  54. return true
  55. }
  56. fid, err := needle.ParseFileIdFromString(fileId)
  57. if err != nil {
  58. glog.V(4).Infof("failed to parse file id %s", fileId)
  59. return false
  60. }
  61. for i, diskCacheLayer := range c.diskCaches {
  62. for k, v := range diskCacheLayer.diskCaches {
  63. _, ok := v.nm.Get(fid.Key)
  64. if ok {
  65. glog.V(4).Infof("fileId %s is in diskCaches[%d].volume[%d]", fileId, i, k)
  66. return true
  67. }
  68. }
  69. }
  70. return false
  71. }
  72. func (c *TieredChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) {
  73. if c == nil {
  74. return 0, nil
  75. }
  76. c.RLock()
  77. defer c.RUnlock()
  78. minSize := offset + uint64(len(data))
  79. if minSize <= c.onDiskCacheSizeLimit0 {
  80. n, err = c.memCache.readChunkAt(data, fileId, offset)
  81. if err != nil {
  82. glog.Errorf("failed to read from memcache: %s", err)
  83. }
  84. if n == int(len(data)) {
  85. return n, nil
  86. }
  87. }
  88. fid, err := needle.ParseFileIdFromString(fileId)
  89. if err != nil {
  90. glog.Errorf("failed to parse file id %s", fileId)
  91. return 0, nil
  92. }
  93. if minSize <= c.onDiskCacheSizeLimit0 {
  94. n, err = c.diskCaches[0].readChunkAt(data, fid.Key, offset)
  95. if n == int(len(data)) {
  96. return
  97. }
  98. }
  99. if minSize <= c.onDiskCacheSizeLimit1 {
  100. n, err = c.diskCaches[1].readChunkAt(data, fid.Key, offset)
  101. if n == int(len(data)) {
  102. return
  103. }
  104. }
  105. {
  106. n, err = c.diskCaches[2].readChunkAt(data, fid.Key, offset)
  107. if n == int(len(data)) {
  108. return
  109. }
  110. }
  111. return 0, nil
  112. }
  113. func (c *TieredChunkCache) SetChunk(fileId string, data []byte) {
  114. if c == nil {
  115. return
  116. }
  117. c.Lock()
  118. defer c.Unlock()
  119. glog.V(4).Infof("SetChunk %s size %d\n", fileId, len(data))
  120. if c.IsInCache(fileId, false) {
  121. glog.V(4).Infof("fileId %s is already in cache", fileId)
  122. return
  123. }
  124. c.doSetChunk(fileId, data)
  125. }
  126. func (c *TieredChunkCache) doSetChunk(fileId string, data []byte) {
  127. if len(data) <= int(c.onDiskCacheSizeLimit0) {
  128. c.memCache.SetChunk(fileId, data)
  129. }
  130. fid, err := needle.ParseFileIdFromString(fileId)
  131. if err != nil {
  132. glog.Errorf("failed to parse file id %s", fileId)
  133. return
  134. }
  135. if len(data) <= int(c.onDiskCacheSizeLimit0) {
  136. c.diskCaches[0].setChunk(fid.Key, data)
  137. } else if len(data) <= int(c.onDiskCacheSizeLimit1) {
  138. c.diskCaches[1].setChunk(fid.Key, data)
  139. } else {
  140. c.diskCaches[2].setChunk(fid.Key, data)
  141. }
  142. }
  143. func (c *TieredChunkCache) Shutdown() {
  144. if c == nil {
  145. return
  146. }
  147. c.Lock()
  148. defer c.Unlock()
  149. for _, diskCache := range c.diskCaches {
  150. diskCache.shutdown()
  151. }
  152. }
  153. func min(x, y int) int {
  154. if x < y {
  155. return x
  156. }
  157. return y
  158. }