You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

223 lines
5.3 KiB

11 months ago
more solid weed mount (#4089) * compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
2 years ago
more solid weed mount (#4089) * compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
2 years ago
more solid weed mount (#4089) * compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
2 years ago
  1. package filer
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  9. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  10. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  11. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  12. )
  13. type ReaderCache struct {
  14. chunkCache chunk_cache.ChunkCache
  15. lookupFileIdFn wdclient.LookupFileIdFunctionType
  16. sync.Mutex
  17. downloaders map[string]*SingleChunkCacher
  18. limit int
  19. }
  20. type SingleChunkCacher struct {
  21. completedTimeNew int64
  22. sync.Mutex
  23. parent *ReaderCache
  24. chunkFileId string
  25. data []byte
  26. err error
  27. cipherKey []byte
  28. isGzipped bool
  29. chunkSize int
  30. shouldCache bool
  31. wg sync.WaitGroup
  32. cacheStartedCh chan struct{}
  33. }
  34. func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
  35. return &ReaderCache{
  36. limit: limit,
  37. chunkCache: chunkCache,
  38. lookupFileIdFn: lookupFileIdFn,
  39. downloaders: make(map[string]*SingleChunkCacher),
  40. }
  41. }
  42. func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
  43. if rc.lookupFileIdFn == nil {
  44. return
  45. }
  46. rc.Lock()
  47. defer rc.Unlock()
  48. if len(rc.downloaders) >= rc.limit {
  49. return
  50. }
  51. for x := chunkViews; x != nil; x = x.Next {
  52. chunkView := x.Value
  53. if _, found := rc.downloaders[chunkView.FileId]; found {
  54. continue
  55. }
  56. if rc.chunkCache.IsInCache(chunkView.FileId, true) {
  57. glog.V(4).Infof("%s is in cache", chunkView.FileId)
  58. continue
  59. }
  60. if len(rc.downloaders) >= rc.limit {
  61. // abort when slots are filled
  62. return
  63. }
  64. // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.ViewOffset)
  65. // cache this chunk if not yet
  66. cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), (uint64(chunkView.ViewOffset)+chunkView.ChunkSize) <= rc.chunkCache.GetMaxFilePartSizeInCache())
  67. go cacher.startCaching()
  68. <-cacher.cacheStartedCh
  69. rc.downloaders[chunkView.FileId] = cacher
  70. }
  71. return
  72. }
  73. func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
  74. rc.Lock()
  75. if cacher, found := rc.downloaders[fileId]; found {
  76. if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
  77. rc.Unlock()
  78. return n, err
  79. }
  80. }
  81. if shouldCache || rc.lookupFileIdFn == nil {
  82. n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
  83. if n > 0 {
  84. rc.Unlock()
  85. return n, err
  86. }
  87. }
  88. // clean up old downloaders
  89. if len(rc.downloaders) >= rc.limit {
  90. oldestFid, oldestTime := "", time.Now().UnixNano()
  91. for fid, downloader := range rc.downloaders {
  92. completedTime := atomic.LoadInt64(&downloader.completedTimeNew)
  93. if completedTime > 0 && completedTime < oldestTime {
  94. oldestFid, oldestTime = fid, completedTime
  95. }
  96. }
  97. if oldestFid != "" {
  98. oldDownloader := rc.downloaders[oldestFid]
  99. delete(rc.downloaders, oldestFid)
  100. oldDownloader.destroy()
  101. }
  102. }
  103. // glog.V(4).Infof("cache1 %s", fileId)
  104. cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
  105. go cacher.startCaching()
  106. <-cacher.cacheStartedCh
  107. rc.downloaders[fileId] = cacher
  108. rc.Unlock()
  109. return cacher.readChunkAt(buffer, offset)
  110. }
  111. func (rc *ReaderCache) UnCache(fileId string) {
  112. rc.Lock()
  113. defer rc.Unlock()
  114. // glog.V(4).Infof("uncache %s", fileId)
  115. if downloader, found := rc.downloaders[fileId]; found {
  116. downloader.destroy()
  117. delete(rc.downloaders, fileId)
  118. }
  119. }
  120. func (rc *ReaderCache) destroy() {
  121. rc.Lock()
  122. defer rc.Unlock()
  123. for _, downloader := range rc.downloaders {
  124. downloader.destroy()
  125. }
  126. }
  127. func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
  128. return &SingleChunkCacher{
  129. parent: parent,
  130. chunkFileId: fileId,
  131. cipherKey: cipherKey,
  132. isGzipped: isGzipped,
  133. chunkSize: chunkSize,
  134. shouldCache: shouldCache,
  135. cacheStartedCh: make(chan struct{}),
  136. }
  137. }
  138. func (s *SingleChunkCacher) startCaching() {
  139. s.wg.Add(1)
  140. defer s.wg.Done()
  141. s.Lock()
  142. defer s.Unlock()
  143. s.cacheStartedCh <- struct{}{} // means this has been started
  144. urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
  145. if err != nil {
  146. s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
  147. return
  148. }
  149. s.data = mem.Allocate(s.chunkSize)
  150. _, s.err = util_http.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
  151. if s.err != nil {
  152. mem.Free(s.data)
  153. s.data = nil
  154. return
  155. }
  156. if s.shouldCache {
  157. s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
  158. }
  159. atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
  160. return
  161. }
  162. func (s *SingleChunkCacher) destroy() {
  163. // wait for all reads to finish before destroying the data
  164. s.wg.Wait()
  165. s.Lock()
  166. defer s.Unlock()
  167. if s.data != nil {
  168. mem.Free(s.data)
  169. s.data = nil
  170. close(s.cacheStartedCh)
  171. }
  172. }
  173. func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
  174. s.wg.Add(1)
  175. defer s.wg.Done()
  176. s.Lock()
  177. defer s.Unlock()
  178. if s.err != nil {
  179. return 0, s.err
  180. }
  181. if len(s.data) <= int(offset) {
  182. return 0, nil
  183. }
  184. return copy(buf, s.data[offset:]), nil
  185. }