You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

217 lines
5.0 KiB

more solid weed mount (#4089) * compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
2 years ago
more solid weed mount (#4089) * compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
2 years ago
more solid weed mount (#4089) * compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
2 years ago
  1. package filer
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  8. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  9. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  10. )
  11. type ReaderCache struct {
  12. chunkCache chunk_cache.ChunkCache
  13. lookupFileIdFn wdclient.LookupFileIdFunctionType
  14. sync.Mutex
  15. downloaders map[string]*SingleChunkCacher
  16. limit int
  17. }
  18. type SingleChunkCacher struct {
  19. sync.Mutex
  20. parent *ReaderCache
  21. chunkFileId string
  22. data []byte
  23. err error
  24. cipherKey []byte
  25. isGzipped bool
  26. chunkSize int
  27. shouldCache bool
  28. wg sync.WaitGroup
  29. cacheStartedCh chan struct{}
  30. completedTimeNew int64
  31. }
  32. func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
  33. return &ReaderCache{
  34. limit: limit,
  35. chunkCache: chunkCache,
  36. lookupFileIdFn: lookupFileIdFn,
  37. downloaders: make(map[string]*SingleChunkCacher),
  38. }
  39. }
  40. func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
  41. if rc.lookupFileIdFn == nil {
  42. return
  43. }
  44. rc.Lock()
  45. defer rc.Unlock()
  46. if len(rc.downloaders) >= rc.limit {
  47. return
  48. }
  49. for x := chunkViews; x != nil; x = x.Next {
  50. chunkView := x.Value
  51. if _, found := rc.downloaders[chunkView.FileId]; found {
  52. continue
  53. }
  54. if len(rc.downloaders) >= rc.limit {
  55. // abort when slots are filled
  56. return
  57. }
  58. // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.ViewOffset)
  59. // cache this chunk if not yet
  60. cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
  61. go cacher.startCaching()
  62. <-cacher.cacheStartedCh
  63. rc.downloaders[chunkView.FileId] = cacher
  64. }
  65. return
  66. }
  67. func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
  68. rc.Lock()
  69. if cacher, found := rc.downloaders[fileId]; found {
  70. if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
  71. rc.Unlock()
  72. return n, err
  73. }
  74. }
  75. if shouldCache || rc.lookupFileIdFn == nil {
  76. n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
  77. if n > 0 {
  78. rc.Unlock()
  79. return n, err
  80. }
  81. }
  82. // clean up old downloaders
  83. if len(rc.downloaders) >= rc.limit {
  84. oldestFid, oldestTime := "", time.Now().UnixNano()
  85. for fid, downloader := range rc.downloaders {
  86. completedTime := atomic.LoadInt64(&downloader.completedTimeNew)
  87. if completedTime > 0 && completedTime < oldestTime {
  88. oldestFid, oldestTime = fid, completedTime
  89. }
  90. }
  91. if oldestFid != "" {
  92. oldDownloader := rc.downloaders[oldestFid]
  93. delete(rc.downloaders, oldestFid)
  94. oldDownloader.destroy()
  95. }
  96. }
  97. // glog.V(4).Infof("cache1 %s", fileId)
  98. cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
  99. go cacher.startCaching()
  100. <-cacher.cacheStartedCh
  101. rc.downloaders[fileId] = cacher
  102. rc.Unlock()
  103. return cacher.readChunkAt(buffer, offset)
  104. }
  105. func (rc *ReaderCache) UnCache(fileId string) {
  106. rc.Lock()
  107. defer rc.Unlock()
  108. // glog.V(4).Infof("uncache %s", fileId)
  109. if downloader, found := rc.downloaders[fileId]; found {
  110. downloader.destroy()
  111. delete(rc.downloaders, fileId)
  112. }
  113. }
  114. func (rc *ReaderCache) destroy() {
  115. rc.Lock()
  116. defer rc.Unlock()
  117. for _, downloader := range rc.downloaders {
  118. downloader.destroy()
  119. }
  120. }
  121. func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
  122. return &SingleChunkCacher{
  123. parent: parent,
  124. chunkFileId: fileId,
  125. cipherKey: cipherKey,
  126. isGzipped: isGzipped,
  127. chunkSize: chunkSize,
  128. shouldCache: shouldCache,
  129. cacheStartedCh: make(chan struct{}),
  130. }
  131. }
  132. func (s *SingleChunkCacher) startCaching() {
  133. s.wg.Add(1)
  134. defer s.wg.Done()
  135. s.Lock()
  136. defer s.Unlock()
  137. s.cacheStartedCh <- struct{}{} // means this has been started
  138. urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
  139. if err != nil {
  140. s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
  141. return
  142. }
  143. s.data = mem.Allocate(s.chunkSize)
  144. _, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
  145. if s.err != nil {
  146. mem.Free(s.data)
  147. s.data = nil
  148. return
  149. }
  150. if s.shouldCache {
  151. s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
  152. }
  153. atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
  154. return
  155. }
  156. func (s *SingleChunkCacher) destroy() {
  157. // wait for all reads to finish before destroying the data
  158. s.wg.Wait()
  159. s.Lock()
  160. defer s.Unlock()
  161. if s.data != nil {
  162. mem.Free(s.data)
  163. s.data = nil
  164. close(s.cacheStartedCh)
  165. }
  166. }
  167. func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
  168. s.wg.Add(1)
  169. defer s.wg.Done()
  170. s.Lock()
  171. defer s.Unlock()
  172. if s.err != nil {
  173. return 0, s.err
  174. }
  175. if len(s.data) <= int(offset) {
  176. return 0, nil
  177. }
  178. return copy(buf, s.data[offset:]), nil
  179. }