You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

240 lines
7.0 KiB

5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
more solid weed mount (#4089) * compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
2 years ago
more solid weed mount (#4089) * compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
2 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "sync"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  10. "google.golang.org/protobuf/proto"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/util"
  14. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  15. )
  16. const (
  17. ManifestBatch = 10000
  18. )
  19. var bytesBufferPool = sync.Pool{
  20. New: func() interface{} {
  21. return new(bytes.Buffer)
  22. },
  23. }
  24. func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
  25. for _, chunk := range chunks {
  26. if chunk.IsChunkManifest {
  27. return true
  28. }
  29. }
  30. return false
  31. }
  32. func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) {
  33. for _, c := range chunks {
  34. if c.IsChunkManifest {
  35. manifestChunks = append(manifestChunks, c)
  36. } else {
  37. nonManifestChunks = append(nonManifestChunks, c)
  38. }
  39. }
  40. return
  41. }
  42. func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  43. // TODO maybe parallel this
  44. for _, chunk := range chunks {
  45. if max(chunk.Offset, startOffset) >= min(chunk.Offset+int64(chunk.Size), stopOffset) {
  46. continue
  47. }
  48. if !chunk.IsChunkManifest {
  49. dataChunks = append(dataChunks, chunk)
  50. continue
  51. }
  52. resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
  53. if err != nil {
  54. return dataChunks, nil, err
  55. }
  56. manifestChunks = append(manifestChunks, chunk)
  57. // recursive
  58. subDataChunks, subManifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
  59. if subErr != nil {
  60. return dataChunks, nil, subErr
  61. }
  62. dataChunks = append(dataChunks, subDataChunks...)
  63. manifestChunks = append(manifestChunks, subManifestChunks...)
  64. }
  65. return
  66. }
  67. func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  68. if !chunk.IsChunkManifest {
  69. return
  70. }
  71. // IsChunkManifest
  72. bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer)
  73. bytesBuffer.Reset()
  74. defer bytesBufferPool.Put(bytesBuffer)
  75. err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
  76. if err != nil {
  77. return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
  78. }
  79. m := &filer_pb.FileChunkManifest{}
  80. if err := proto.Unmarshal(bytesBuffer.Bytes(), m); err != nil {
  81. return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
  82. }
  83. // recursive
  84. filer_pb.AfterEntryDeserialization(m.Chunks)
  85. return m.Chunks, nil
  86. }
  87. // TODO fetch from cache for weed mount?
  88. func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error {
  89. urlStrings, err := lookupFileIdFn(fileId)
  90. if err != nil {
  91. glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
  92. return err
  93. }
  94. err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0)
  95. if err != nil {
  96. return err
  97. }
  98. return nil
  99. }
  100. func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) {
  101. urlStrings, err := lookupFileIdFn(fileId)
  102. if err != nil {
  103. glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
  104. return 0, err
  105. }
  106. return util_http.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
  107. }
  108. func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
  109. var shouldRetry bool
  110. var totalWritten int
  111. for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
  112. for _, urlString := range urlStrings {
  113. var localProcessed int
  114. var writeErr error
  115. shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
  116. if totalWritten > localProcessed {
  117. toBeSkipped := totalWritten - localProcessed
  118. if len(data) <= toBeSkipped {
  119. localProcessed += len(data)
  120. return // skip if already processed
  121. }
  122. data = data[toBeSkipped:]
  123. localProcessed += toBeSkipped
  124. }
  125. var writtenCount int
  126. writtenCount, writeErr = writer.Write(data)
  127. localProcessed += writtenCount
  128. totalWritten += writtenCount
  129. })
  130. if !shouldRetry {
  131. break
  132. }
  133. if writeErr != nil {
  134. err = writeErr
  135. break
  136. }
  137. if err != nil {
  138. glog.V(0).Infof("read %s failed, err: %v", urlString, err)
  139. } else {
  140. break
  141. }
  142. }
  143. if err != nil && shouldRetry {
  144. glog.V(0).Infof("retry reading in %v", waitTime)
  145. time.Sleep(waitTime)
  146. } else {
  147. break
  148. }
  149. }
  150. return err
  151. }
  152. func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
  153. return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest)
  154. }
  155. func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) {
  156. var dataChunks []*filer_pb.FileChunk
  157. for _, chunk := range inputChunks {
  158. if !chunk.IsChunkManifest {
  159. dataChunks = append(dataChunks, chunk)
  160. } else {
  161. chunks = append(chunks, chunk)
  162. }
  163. }
  164. remaining := len(dataChunks)
  165. for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor {
  166. chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor])
  167. if err != nil {
  168. return dataChunks, err
  169. }
  170. chunks = append(chunks, chunk)
  171. remaining -= mergeFactor
  172. }
  173. // remaining
  174. for i := len(dataChunks) - remaining; i < len(dataChunks); i++ {
  175. chunks = append(chunks, dataChunks[i])
  176. }
  177. return
  178. }
  179. func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
  180. filer_pb.BeforeEntrySerialization(dataChunks)
  181. // create and serialize the manifest
  182. data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{
  183. Chunks: dataChunks,
  184. })
  185. if serErr != nil {
  186. return nil, fmt.Errorf("serializing manifest: %v", serErr)
  187. }
  188. minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
  189. for _, chunk := range dataChunks {
  190. if minOffset > int64(chunk.Offset) {
  191. minOffset = chunk.Offset
  192. }
  193. if maxOffset < int64(chunk.Size)+chunk.Offset {
  194. maxOffset = int64(chunk.Size) + chunk.Offset
  195. }
  196. }
  197. manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0, 0)
  198. if err != nil {
  199. return nil, err
  200. }
  201. manifestChunk.IsChunkManifest = true
  202. manifestChunk.Offset = minOffset
  203. manifestChunk.Size = uint64(maxOffset - minOffset)
  204. return
  205. }
  206. type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error)