You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

261 lines
7.3 KiB

5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/wdclient"
  6. "io"
  7. "math"
  8. "net/url"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/golang/protobuf/proto"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. )
  17. const (
  18. ManifestBatch = 3
  19. )
  20. var bytesBufferPool = sync.Pool{
  21. New: func() interface{} {
  22. return new(bytes.Buffer)
  23. },
  24. }
  25. func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
  26. for _, chunk := range chunks {
  27. if chunk.IsChunkManifest {
  28. return true
  29. }
  30. }
  31. return false
  32. }
  33. func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) {
  34. for _, c := range chunks {
  35. if c.IsChunkManifest {
  36. manifestChunks = append(manifestChunks, c)
  37. } else {
  38. nonManifestChunks = append(nonManifestChunks, c)
  39. }
  40. }
  41. return
  42. }
  43. func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  44. // TODO maybe parallel this
  45. for _, chunk := range chunks {
  46. if max(chunk.Offset, startOffset) >= min(chunk.Offset+int64(chunk.Size), stopOffset) {
  47. continue
  48. }
  49. if !chunk.IsChunkManifest {
  50. dataChunks = append(dataChunks, chunk)
  51. continue
  52. }
  53. resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
  54. if err != nil {
  55. return chunks, nil, err
  56. }
  57. manifestChunks = append(manifestChunks, chunk)
  58. // recursive
  59. dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
  60. if subErr != nil {
  61. return chunks, nil, subErr
  62. }
  63. dataChunks = append(dataChunks, dchunks...)
  64. manifestChunks = append(manifestChunks, mchunks...)
  65. }
  66. return
  67. }
  68. func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  69. if !chunk.IsChunkManifest {
  70. return
  71. }
  72. // IsChunkManifest
  73. bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer)
  74. defer bytesBufferPool.Put(bytesBuffer)
  75. err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
  76. if err != nil {
  77. return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
  78. }
  79. m := &filer_pb.FileChunkManifest{}
  80. if err := proto.Unmarshal(bytesBuffer.Bytes(), m); err != nil {
  81. return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
  82. }
  83. // recursive
  84. filer_pb.AfterEntryDeserialization(m.Chunks)
  85. return m.Chunks, nil
  86. }
  87. // TODO fetch from cache for weed mount?
  88. func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error {
  89. urlStrings, err := lookupFileIdFn(fileId)
  90. if err != nil {
  91. glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
  92. return err
  93. }
  94. err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0)
  95. if err != nil {
  96. return err
  97. }
  98. return nil
  99. }
  100. func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
  101. var shouldRetry bool
  102. for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
  103. for _, urlString := range urlStrings {
  104. n = 0
  105. if strings.Contains(urlString, "%") {
  106. urlString = url.PathEscape(urlString)
  107. }
  108. shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
  109. if n < len(buffer) {
  110. x := copy(buffer[n:], data)
  111. n += x
  112. }
  113. })
  114. if !shouldRetry {
  115. break
  116. }
  117. if err != nil {
  118. glog.V(0).Infof("read %s failed, err: %v", urlString, err)
  119. } else {
  120. break
  121. }
  122. }
  123. if err != nil && shouldRetry {
  124. glog.V(0).Infof("retry reading in %v", waitTime)
  125. time.Sleep(waitTime)
  126. } else {
  127. break
  128. }
  129. }
  130. return n, err
  131. }
  132. func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
  133. var shouldRetry bool
  134. var totalWritten int
  135. for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
  136. for _, urlString := range urlStrings {
  137. var localProcesed int
  138. shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
  139. if totalWritten > localProcesed {
  140. toBeSkipped := totalWritten - localProcesed
  141. if len(data) <= toBeSkipped {
  142. localProcesed += len(data)
  143. return // skip if already processed
  144. }
  145. data = data[toBeSkipped:]
  146. localProcesed += toBeSkipped
  147. }
  148. writer.Write(data)
  149. localProcesed += len(data)
  150. totalWritten += len(data)
  151. })
  152. if !shouldRetry {
  153. break
  154. }
  155. if err != nil {
  156. glog.V(0).Infof("read %s failed, err: %v", urlString, err)
  157. } else {
  158. break
  159. }
  160. }
  161. if err != nil && shouldRetry {
  162. glog.V(0).Infof("retry reading in %v", waitTime)
  163. time.Sleep(waitTime)
  164. } else {
  165. break
  166. }
  167. }
  168. return err
  169. }
  170. func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
  171. return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest)
  172. }
  173. func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) {
  174. var dataChunks []*filer_pb.FileChunk
  175. for _, chunk := range inputChunks {
  176. if !chunk.IsChunkManifest {
  177. dataChunks = append(dataChunks, chunk)
  178. } else {
  179. chunks = append(chunks, chunk)
  180. }
  181. }
  182. remaining := len(dataChunks)
  183. for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor {
  184. chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor])
  185. if err != nil {
  186. return dataChunks, err
  187. }
  188. chunks = append(chunks, chunk)
  189. remaining -= mergeFactor
  190. }
  191. // remaining
  192. for i := len(dataChunks) - remaining; i < len(dataChunks); i++ {
  193. chunks = append(chunks, dataChunks[i])
  194. }
  195. return
  196. }
  197. func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
  198. filer_pb.BeforeEntrySerialization(dataChunks)
  199. // create and serialize the manifest
  200. data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{
  201. Chunks: dataChunks,
  202. })
  203. if serErr != nil {
  204. return nil, fmt.Errorf("serializing manifest: %v", serErr)
  205. }
  206. minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
  207. for _, chunk := range dataChunks {
  208. if minOffset > int64(chunk.Offset) {
  209. minOffset = chunk.Offset
  210. }
  211. if maxOffset < int64(chunk.Size)+chunk.Offset {
  212. maxOffset = int64(chunk.Size) + chunk.Offset
  213. }
  214. }
  215. manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0)
  216. if err != nil {
  217. return nil, err
  218. }
  219. manifestChunk.IsChunkManifest = true
  220. manifestChunk.Offset = minOffset
  221. manifestChunk.Size = uint64(maxOffset - minOffset)
  222. return
  223. }
  224. type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error)