You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

216 lines
4.9 KiB

  1. package filer
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  8. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  9. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  10. )
  11. type ReaderCache struct {
  12. chunkCache chunk_cache.ChunkCache
  13. lookupFileIdFn wdclient.LookupFileIdFunctionType
  14. sync.Mutex
  15. downloaders map[string]*SingleChunkCacher
  16. limit int
  17. }
  18. type SingleChunkCacher struct {
  19. sync.Mutex
  20. parent *ReaderCache
  21. chunkFileId string
  22. data []byte
  23. err error
  24. cipherKey []byte
  25. isGzipped bool
  26. chunkSize int
  27. shouldCache bool
  28. wg sync.WaitGroup
  29. cacheStartedCh chan struct{}
  30. completedTimeNew int64
  31. }
  32. func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
  33. return &ReaderCache{
  34. limit: limit,
  35. chunkCache: chunkCache,
  36. lookupFileIdFn: lookupFileIdFn,
  37. downloaders: make(map[string]*SingleChunkCacher),
  38. }
  39. }
  40. func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
  41. if rc.lookupFileIdFn == nil {
  42. return
  43. }
  44. rc.Lock()
  45. defer rc.Unlock()
  46. if len(rc.downloaders) >= rc.limit {
  47. return
  48. }
  49. for _, chunkView := range chunkViews {
  50. if _, found := rc.downloaders[chunkView.FileId]; found {
  51. continue
  52. }
  53. if len(rc.downloaders) >= rc.limit {
  54. // abort when slots are filled
  55. return
  56. }
  57. // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset)
  58. // cache this chunk if not yet
  59. cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
  60. go cacher.startCaching()
  61. <-cacher.cacheStartedCh
  62. rc.downloaders[chunkView.FileId] = cacher
  63. }
  64. return
  65. }
  66. func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
  67. rc.Lock()
  68. if cacher, found := rc.downloaders[fileId]; found {
  69. if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
  70. rc.Unlock()
  71. return n, err
  72. }
  73. }
  74. if shouldCache || rc.lookupFileIdFn == nil {
  75. n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
  76. if n > 0 {
  77. rc.Unlock()
  78. return n, err
  79. }
  80. }
  81. // clean up old downloaders
  82. if len(rc.downloaders) >= rc.limit {
  83. oldestFid, oldestTime := "", time.Now().UnixNano()
  84. for fid, downloader := range rc.downloaders {
  85. completedTime := atomic.LoadInt64(&downloader.completedTimeNew)
  86. if completedTime > 0 && completedTime < oldestTime {
  87. oldestFid, oldestTime = fid, completedTime
  88. }
  89. }
  90. if oldestFid != "" {
  91. oldDownloader := rc.downloaders[oldestFid]
  92. delete(rc.downloaders, oldestFid)
  93. oldDownloader.destroy()
  94. }
  95. }
  96. // glog.V(4).Infof("cache1 %s", fileId)
  97. cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
  98. go cacher.startCaching()
  99. <-cacher.cacheStartedCh
  100. rc.downloaders[fileId] = cacher
  101. rc.Unlock()
  102. return cacher.readChunkAt(buffer, offset)
  103. }
  104. func (rc *ReaderCache) UnCache(fileId string) {
  105. rc.Lock()
  106. defer rc.Unlock()
  107. // glog.V(4).Infof("uncache %s", fileId)
  108. if downloader, found := rc.downloaders[fileId]; found {
  109. downloader.destroy()
  110. delete(rc.downloaders, fileId)
  111. }
  112. }
  113. func (rc *ReaderCache) destroy() {
  114. rc.Lock()
  115. defer rc.Unlock()
  116. for _, downloader := range rc.downloaders {
  117. downloader.destroy()
  118. }
  119. }
  120. func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
  121. return &SingleChunkCacher{
  122. parent: parent,
  123. chunkFileId: fileId,
  124. cipherKey: cipherKey,
  125. isGzipped: isGzipped,
  126. chunkSize: chunkSize,
  127. shouldCache: shouldCache,
  128. cacheStartedCh: make(chan struct{}),
  129. }
  130. }
  131. func (s *SingleChunkCacher) startCaching() {
  132. s.wg.Add(1)
  133. defer s.wg.Done()
  134. s.Lock()
  135. defer s.Unlock()
  136. s.cacheStartedCh <- struct{}{} // means this has been started
  137. urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
  138. if err != nil {
  139. s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
  140. return
  141. }
  142. s.data = mem.Allocate(s.chunkSize)
  143. _, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
  144. if s.err != nil {
  145. mem.Free(s.data)
  146. s.data = nil
  147. return
  148. }
  149. if s.shouldCache {
  150. s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
  151. }
  152. atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
  153. return
  154. }
  155. func (s *SingleChunkCacher) destroy() {
  156. // wait for all reads to finish before destroying the data
  157. s.wg.Wait()
  158. s.Lock()
  159. defer s.Unlock()
  160. if s.data != nil {
  161. mem.Free(s.data)
  162. s.data = nil
  163. close(s.cacheStartedCh)
  164. }
  165. }
  166. func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
  167. s.wg.Add(1)
  168. defer s.wg.Done()
  169. s.Lock()
  170. defer s.Unlock()
  171. if s.err != nil {
  172. return 0, s.err
  173. }
  174. if len(s.data) == 0 {
  175. return 0, nil
  176. }
  177. return copy(buf, s.data[offset:]), nil
  178. }