You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

218 lines
5.0 KiB

12 months ago
10 months ago
more solid weed mount (#4089) * compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
2 years ago
more solid weed mount (#4089) * compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
2 years ago
more solid weed mount (#4089) * compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
2 years ago
12 months ago
  1. package filer
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/util"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  9. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  10. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  11. )
  12. type ReaderCache struct {
  13. chunkCache chunk_cache.ChunkCache
  14. lookupFileIdFn wdclient.LookupFileIdFunctionType
  15. sync.Mutex
  16. downloaders map[string]*SingleChunkCacher
  17. limit int
  18. }
  19. type SingleChunkCacher struct {
  20. completedTimeNew int64
  21. sync.Mutex
  22. parent *ReaderCache
  23. chunkFileId string
  24. data []byte
  25. err error
  26. cipherKey []byte
  27. isGzipped bool
  28. chunkSize int
  29. shouldCache bool
  30. wg sync.WaitGroup
  31. cacheStartedCh chan struct{}
  32. }
  33. func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
  34. return &ReaderCache{
  35. limit: limit,
  36. chunkCache: chunkCache,
  37. lookupFileIdFn: lookupFileIdFn,
  38. downloaders: make(map[string]*SingleChunkCacher),
  39. }
  40. }
  41. func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
  42. if rc.lookupFileIdFn == nil {
  43. return
  44. }
  45. rc.Lock()
  46. defer rc.Unlock()
  47. if len(rc.downloaders) >= rc.limit {
  48. return
  49. }
  50. for x := chunkViews; x != nil; x = x.Next {
  51. chunkView := x.Value
  52. if _, found := rc.downloaders[chunkView.FileId]; found {
  53. continue
  54. }
  55. if len(rc.downloaders) >= rc.limit {
  56. // abort when slots are filled
  57. return
  58. }
  59. // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.ViewOffset)
  60. // cache this chunk if not yet
  61. cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
  62. go cacher.startCaching()
  63. <-cacher.cacheStartedCh
  64. rc.downloaders[chunkView.FileId] = cacher
  65. }
  66. return
  67. }
  68. func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
  69. rc.Lock()
  70. if cacher, found := rc.downloaders[fileId]; found {
  71. if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
  72. rc.Unlock()
  73. return n, err
  74. }
  75. }
  76. if shouldCache || rc.lookupFileIdFn == nil {
  77. n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
  78. if n > 0 {
  79. rc.Unlock()
  80. return n, err
  81. }
  82. }
  83. // clean up old downloaders
  84. if len(rc.downloaders) >= rc.limit {
  85. oldestFid, oldestTime := "", time.Now().UnixNano()
  86. for fid, downloader := range rc.downloaders {
  87. completedTime := atomic.LoadInt64(&downloader.completedTimeNew)
  88. if completedTime > 0 && completedTime < oldestTime {
  89. oldestFid, oldestTime = fid, completedTime
  90. }
  91. }
  92. if oldestFid != "" {
  93. oldDownloader := rc.downloaders[oldestFid]
  94. delete(rc.downloaders, oldestFid)
  95. oldDownloader.destroy()
  96. }
  97. }
  98. // glog.V(4).Infof("cache1 %s", fileId)
  99. cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
  100. go cacher.startCaching()
  101. <-cacher.cacheStartedCh
  102. rc.downloaders[fileId] = cacher
  103. rc.Unlock()
  104. return cacher.readChunkAt(buffer, offset)
  105. }
  106. func (rc *ReaderCache) UnCache(fileId string) {
  107. rc.Lock()
  108. defer rc.Unlock()
  109. // glog.V(4).Infof("uncache %s", fileId)
  110. if downloader, found := rc.downloaders[fileId]; found {
  111. downloader.destroy()
  112. delete(rc.downloaders, fileId)
  113. }
  114. }
  115. func (rc *ReaderCache) destroy() {
  116. rc.Lock()
  117. defer rc.Unlock()
  118. for _, downloader := range rc.downloaders {
  119. downloader.destroy()
  120. }
  121. }
  122. func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
  123. return &SingleChunkCacher{
  124. parent: parent,
  125. chunkFileId: fileId,
  126. cipherKey: cipherKey,
  127. isGzipped: isGzipped,
  128. chunkSize: chunkSize,
  129. shouldCache: shouldCache,
  130. cacheStartedCh: make(chan struct{}),
  131. }
  132. }
  133. func (s *SingleChunkCacher) startCaching() {
  134. s.wg.Add(1)
  135. defer s.wg.Done()
  136. s.Lock()
  137. defer s.Unlock()
  138. s.cacheStartedCh <- struct{}{} // means this has been started
  139. urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
  140. if err != nil {
  141. s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
  142. return
  143. }
  144. s.data = mem.Allocate(s.chunkSize)
  145. _, s.err = util.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
  146. if s.err != nil {
  147. mem.Free(s.data)
  148. s.data = nil
  149. return
  150. }
  151. if s.shouldCache {
  152. s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
  153. }
  154. atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
  155. return
  156. }
  157. func (s *SingleChunkCacher) destroy() {
  158. // wait for all reads to finish before destroying the data
  159. s.wg.Wait()
  160. s.Lock()
  161. defer s.Unlock()
  162. if s.data != nil {
  163. mem.Free(s.data)
  164. s.data = nil
  165. close(s.cacheStartedCh)
  166. }
  167. }
  168. func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
  169. s.wg.Add(1)
  170. defer s.wg.Done()
  171. s.Lock()
  172. defer s.Unlock()
  173. if s.err != nil {
  174. return 0, s.err
  175. }
  176. if len(s.data) <= int(offset) {
  177. return 0, nil
  178. }
  179. return copy(buf, s.data[offset:]), nil
  180. }