You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

187 lines
4.4 KiB

  1. package filer
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  6. "github.com/chrislusf/seaweedfs/weed/util/mem"
  7. "github.com/chrislusf/seaweedfs/weed/wdclient"
  8. "sync"
  9. "time"
  10. )
  11. type ReaderCache struct {
  12. chunkCache chunk_cache.ChunkCache
  13. lookupFileIdFn wdclient.LookupFileIdFunctionType
  14. sync.Mutex
  15. downloaders map[string]*SingleChunkCacher
  16. limit int
  17. }
  18. type SingleChunkCacher struct {
  19. sync.RWMutex
  20. parent *ReaderCache
  21. chunkFileId string
  22. data []byte
  23. err error
  24. cipherKey []byte
  25. isGzipped bool
  26. chunkSize int
  27. shouldCache bool
  28. wg sync.WaitGroup
  29. completedTime time.Time
  30. }
  31. func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
  32. return &ReaderCache{
  33. limit: limit,
  34. chunkCache: chunkCache,
  35. lookupFileIdFn: lookupFileIdFn,
  36. downloaders: make(map[string]*SingleChunkCacher),
  37. }
  38. }
  39. func (rc *ReaderCache) MaybeCache(fileId string, cipherKey []byte, isGzipped bool, chunkSize int) {
  40. rc.Lock()
  41. defer rc.Unlock()
  42. if _, found := rc.downloaders[fileId]; found {
  43. return
  44. }
  45. if rc.lookupFileIdFn == nil {
  46. return
  47. }
  48. // if too many, delete one of them?
  49. glog.V(0).Infof("downloader2 %d", len(rc.downloaders))
  50. if len(rc.downloaders) >= rc.limit {
  51. oldestFid, oldestTime := "", time.Now()
  52. for fid, downloader := range rc.downloaders {
  53. if !downloader.completedTime.IsZero() {
  54. if downloader.completedTime.Before(oldestTime) {
  55. oldestFid, oldestTime = fid, downloader.completedTime
  56. }
  57. }
  58. }
  59. if oldestFid != "" {
  60. oldDownloader := rc.downloaders[oldestFid]
  61. delete(rc.downloaders, oldestFid)
  62. oldDownloader.destroy()
  63. } else {
  64. // if still no slots, return
  65. return
  66. }
  67. }
  68. cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, false)
  69. cacher.wg.Add(1)
  70. go cacher.startCaching()
  71. cacher.wg.Wait()
  72. rc.downloaders[fileId] = cacher
  73. return
  74. }
  75. func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
  76. rc.Lock()
  77. defer rc.Unlock()
  78. if cacher, found := rc.downloaders[fileId]; found {
  79. return cacher.readChunkAt(buffer, offset)
  80. }
  81. if shouldCache || rc.lookupFileIdFn == nil {
  82. n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
  83. if n > 0 {
  84. return n, err
  85. }
  86. }
  87. glog.V(0).Infof("downloader1 %d", len(rc.downloaders))
  88. if len(rc.downloaders) >= rc.limit {
  89. oldestFid, oldestTime := "", time.Now()
  90. for fid, downloader := range rc.downloaders {
  91. if !downloader.completedTime.IsZero() {
  92. if downloader.completedTime.Before(oldestTime) {
  93. oldestFid, oldestTime = fid, downloader.completedTime
  94. }
  95. }
  96. }
  97. if oldestFid != "" {
  98. oldDownloader := rc.downloaders[oldestFid]
  99. delete(rc.downloaders, oldestFid)
  100. oldDownloader.destroy()
  101. }
  102. }
  103. cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
  104. cacher.wg.Add(1)
  105. go cacher.startCaching()
  106. cacher.wg.Wait()
  107. rc.downloaders[fileId] = cacher
  108. return cacher.readChunkAt(buffer, offset)
  109. }
  110. func (rc *ReaderCache) destroy() {
  111. rc.Lock()
  112. defer rc.Unlock()
  113. for _, downloader := range rc.downloaders {
  114. downloader.destroy()
  115. }
  116. }
  117. func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
  118. t := &SingleChunkCacher{
  119. parent: parent,
  120. chunkFileId: fileId,
  121. cipherKey: cipherKey,
  122. isGzipped: isGzipped,
  123. chunkSize: chunkSize,
  124. shouldCache: shouldCache,
  125. }
  126. return t
  127. }
  128. func (s *SingleChunkCacher) startCaching() {
  129. s.Lock()
  130. defer s.Unlock()
  131. s.wg.Done() // means this has been started
  132. urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
  133. if err != nil {
  134. s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
  135. return
  136. }
  137. s.data = mem.Allocate(s.chunkSize)
  138. _, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
  139. if s.err != nil {
  140. mem.Free(s.data)
  141. s.data = nil
  142. return
  143. }
  144. s.completedTime = time.Now()
  145. if s.shouldCache {
  146. s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
  147. }
  148. return
  149. }
  150. func (s *SingleChunkCacher) destroy() {
  151. if s.data != nil {
  152. mem.Free(s.data)
  153. s.data = nil
  154. }
  155. }
  156. func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
  157. s.RLock()
  158. defer s.RUnlock()
  159. return copy(buf, s.data[offset:]), s.err
  160. }