You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

209 lines
4.8 KiB

  1. package filer
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  7. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  8. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  9. )
  10. type ReaderCache struct {
  11. chunkCache chunk_cache.ChunkCache
  12. lookupFileIdFn wdclient.LookupFileIdFunctionType
  13. sync.Mutex
  14. downloaders map[string]*SingleChunkCacher
  15. limit int
  16. }
  17. type SingleChunkCacher struct {
  18. sync.Mutex
  19. parent *ReaderCache
  20. chunkFileId string
  21. data []byte
  22. err error
  23. cipherKey []byte
  24. isGzipped bool
  25. chunkSize int
  26. shouldCache bool
  27. wg sync.WaitGroup
  28. cacheStartedCh chan struct{}
  29. completedTime time.Time
  30. }
  31. func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
  32. return &ReaderCache{
  33. limit: limit,
  34. chunkCache: chunkCache,
  35. lookupFileIdFn: lookupFileIdFn,
  36. downloaders: make(map[string]*SingleChunkCacher),
  37. }
  38. }
  39. func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
  40. if rc.lookupFileIdFn == nil {
  41. return
  42. }
  43. rc.Lock()
  44. defer rc.Unlock()
  45. for _, chunkView := range chunkViews {
  46. if _, found := rc.downloaders[chunkView.FileId]; found {
  47. continue
  48. }
  49. if len(rc.downloaders) >= rc.limit {
  50. // if still no slots, return
  51. return
  52. }
  53. // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset)
  54. // cache this chunk if not yet
  55. cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
  56. go cacher.startCaching()
  57. <-cacher.cacheStartedCh
  58. rc.downloaders[chunkView.FileId] = cacher
  59. }
  60. return
  61. }
  62. func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
  63. rc.Lock()
  64. defer rc.Unlock()
  65. if cacher, found := rc.downloaders[fileId]; found {
  66. if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
  67. return n, err
  68. }
  69. }
  70. if shouldCache || rc.lookupFileIdFn == nil {
  71. n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
  72. if n > 0 {
  73. return n, err
  74. }
  75. }
  76. // clean up old downloaders
  77. if len(rc.downloaders) >= rc.limit {
  78. oldestFid, oldestTime := "", time.Now()
  79. for fid, downloader := range rc.downloaders {
  80. if !downloader.completedTime.IsZero() {
  81. if downloader.completedTime.Before(oldestTime) {
  82. oldestFid, oldestTime = fid, downloader.completedTime
  83. }
  84. }
  85. }
  86. if oldestFid != "" {
  87. oldDownloader := rc.downloaders[oldestFid]
  88. delete(rc.downloaders, oldestFid)
  89. oldDownloader.destroy()
  90. }
  91. }
  92. // glog.V(4).Infof("cache1 %s", fileId)
  93. cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
  94. go cacher.startCaching()
  95. <-cacher.cacheStartedCh
  96. rc.downloaders[fileId] = cacher
  97. return cacher.readChunkAt(buffer, offset)
  98. }
  99. func (rc *ReaderCache) UnCache(fileId string) {
  100. rc.Lock()
  101. defer rc.Unlock()
  102. // glog.V(4).Infof("uncache %s", fileId)
  103. if downloader, found := rc.downloaders[fileId]; found {
  104. downloader.destroy()
  105. delete(rc.downloaders, fileId)
  106. }
  107. }
  108. func (rc *ReaderCache) destroy() {
  109. rc.Lock()
  110. defer rc.Unlock()
  111. for _, downloader := range rc.downloaders {
  112. downloader.destroy()
  113. }
  114. }
  115. func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
  116. return &SingleChunkCacher{
  117. parent: parent,
  118. chunkFileId: fileId,
  119. cipherKey: cipherKey,
  120. isGzipped: isGzipped,
  121. chunkSize: chunkSize,
  122. shouldCache: shouldCache,
  123. cacheStartedCh: make(chan struct{}),
  124. }
  125. }
  126. func (s *SingleChunkCacher) startCaching() {
  127. s.wg.Add(1)
  128. defer s.wg.Done()
  129. s.Lock()
  130. defer s.Unlock()
  131. s.cacheStartedCh <- struct{}{} // means this has been started
  132. urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
  133. if err != nil {
  134. s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
  135. return
  136. }
  137. s.data = mem.Allocate(s.chunkSize)
  138. _, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
  139. if s.err != nil {
  140. mem.Free(s.data)
  141. s.data = nil
  142. return
  143. }
  144. if s.shouldCache {
  145. s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
  146. }
  147. s.completedTime = time.Now()
  148. return
  149. }
  150. func (s *SingleChunkCacher) destroy() {
  151. // wait for all reads to finish before destroying the data
  152. s.wg.Wait()
  153. s.Lock()
  154. defer s.Unlock()
  155. if s.data != nil {
  156. mem.Free(s.data)
  157. s.data = nil
  158. }
  159. close(s.cacheStartedCh)
  160. }
  161. func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
  162. s.wg.Add(1)
  163. defer s.wg.Done()
  164. s.Lock()
  165. defer s.Unlock()
  166. if s.err != nil {
  167. return 0, s.err
  168. }
  169. if len(s.data) == 0 {
  170. return 0, nil
  171. }
  172. return copy(buf, s.data[offset:]), nil
  173. }