247 lines
7.0 KiB

5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/wdclient"
  6. "io"
  7. "math"
  8. "net/url"
  9. "strings"
  10. "time"
  11. "github.com/golang/protobuf/proto"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. const (
  17. ManifestBatch = 10000
  18. )
  19. func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
  20. for _, chunk := range chunks {
  21. if chunk.IsChunkManifest {
  22. return true
  23. }
  24. }
  25. return false
  26. }
  27. func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) {
  28. for _, c := range chunks {
  29. if c.IsChunkManifest {
  30. manifestChunks = append(manifestChunks, c)
  31. } else {
  32. nonManifestChunks = append(nonManifestChunks, c)
  33. }
  34. }
  35. return
  36. }
  37. func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  38. // TODO maybe parallel this
  39. for _, chunk := range chunks {
  40. if max(chunk.Offset, startOffset) >= min(chunk.Offset+int64(chunk.Size), stopOffset) {
  41. continue
  42. }
  43. if !chunk.IsChunkManifest {
  44. dataChunks = append(dataChunks, chunk)
  45. continue
  46. }
  47. resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
  48. if err != nil {
  49. return chunks, nil, err
  50. }
  51. manifestChunks = append(manifestChunks, chunk)
  52. // recursive
  53. dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
  54. if subErr != nil {
  55. return chunks, nil, subErr
  56. }
  57. dataChunks = append(dataChunks, dchunks...)
  58. manifestChunks = append(manifestChunks, mchunks...)
  59. }
  60. return
  61. }
  62. func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  63. if !chunk.IsChunkManifest {
  64. return
  65. }
  66. // IsChunkManifest
  67. data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
  68. if err != nil {
  69. return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
  70. }
  71. m := &filer_pb.FileChunkManifest{}
  72. if err := proto.Unmarshal(data, m); err != nil {
  73. return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
  74. }
  75. // recursive
  76. filer_pb.AfterEntryDeserialization(m.Chunks)
  77. return m.Chunks, nil
  78. }
  79. // TODO fetch from cache for weed mount?
  80. func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
  81. urlStrings, err := lookupFileIdFn(fileId)
  82. if err != nil {
  83. glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
  84. return nil, err
  85. }
  86. return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0)
  87. }
  88. func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) {
  89. var err error
  90. var shouldRetry bool
  91. receivedData := make([]byte, 0, size)
  92. for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
  93. for _, urlString := range urlStrings {
  94. receivedData = receivedData[:0]
  95. if strings.Contains(urlString, "%") {
  96. urlString = url.PathEscape(urlString)
  97. }
  98. shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
  99. receivedData = append(receivedData, data...)
  100. })
  101. if !shouldRetry {
  102. break
  103. }
  104. if err != nil {
  105. glog.V(0).Infof("read %s failed, err: %v", urlString, err)
  106. } else {
  107. break
  108. }
  109. }
  110. if err != nil && shouldRetry {
  111. glog.V(0).Infof("retry reading in %v", waitTime)
  112. time.Sleep(waitTime)
  113. } else {
  114. break
  115. }
  116. }
  117. return receivedData, err
  118. }
  119. func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
  120. var shouldRetry bool
  121. var totalWritten int
  122. for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
  123. for _, urlString := range urlStrings {
  124. var localProcesed int
  125. shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
  126. if totalWritten > localProcesed {
  127. toBeSkipped := totalWritten - localProcesed
  128. if len(data) <= toBeSkipped {
  129. localProcesed += len(data)
  130. return // skip if already processed
  131. }
  132. data = data[toBeSkipped:]
  133. localProcesed += toBeSkipped
  134. }
  135. writer.Write(data)
  136. localProcesed += len(data)
  137. totalWritten += len(data)
  138. })
  139. if !shouldRetry {
  140. break
  141. }
  142. if err != nil {
  143. glog.V(0).Infof("read %s failed, err: %v", urlString, err)
  144. } else {
  145. break
  146. }
  147. }
  148. if err != nil && shouldRetry {
  149. glog.V(0).Infof("retry reading in %v", waitTime)
  150. time.Sleep(waitTime)
  151. } else {
  152. break
  153. }
  154. }
  155. return err
  156. }
  157. func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
  158. return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest)
  159. }
  160. func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) {
  161. var dataChunks []*filer_pb.FileChunk
  162. for _, chunk := range inputChunks {
  163. if !chunk.IsChunkManifest {
  164. dataChunks = append(dataChunks, chunk)
  165. } else {
  166. chunks = append(chunks, chunk)
  167. }
  168. }
  169. remaining := len(dataChunks)
  170. for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor {
  171. chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor])
  172. if err != nil {
  173. return dataChunks, err
  174. }
  175. chunks = append(chunks, chunk)
  176. remaining -= mergeFactor
  177. }
  178. // remaining
  179. for i := len(dataChunks) - remaining; i < len(dataChunks); i++ {
  180. chunks = append(chunks, dataChunks[i])
  181. }
  182. return
  183. }
  184. func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
  185. filer_pb.BeforeEntrySerialization(dataChunks)
  186. // create and serialize the manifest
  187. data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{
  188. Chunks: dataChunks,
  189. })
  190. if serErr != nil {
  191. return nil, fmt.Errorf("serializing manifest: %v", serErr)
  192. }
  193. minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
  194. for _, chunk := range dataChunks {
  195. if minOffset > int64(chunk.Offset) {
  196. minOffset = chunk.Offset
  197. }
  198. if maxOffset < int64(chunk.Size)+chunk.Offset {
  199. maxOffset = int64(chunk.Size) + chunk.Offset
  200. }
  201. }
  202. manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0)
  203. if err != nil {
  204. return nil, err
  205. }
  206. manifestChunk.IsChunkManifest = true
  207. manifestChunk.Offset = minOffset
  208. manifestChunk.Size = uint64(maxOffset - minOffset)
  209. return
  210. }
  211. type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error)