You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

260 lines
7.5 KiB

5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/util/mem"
  6. "github.com/chrislusf/seaweedfs/weed/wdclient"
  7. "io"
  8. "math"
  9. "net/url"
  10. "strings"
  11. "time"
  12. "github.com/golang/protobuf/proto"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. )
  17. const (
  18. ManifestBatch = 10000
  19. )
  20. func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
  21. for _, chunk := range chunks {
  22. if chunk.IsChunkManifest {
  23. return true
  24. }
  25. }
  26. return false
  27. }
  28. func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) {
  29. for _, c := range chunks {
  30. if c.IsChunkManifest {
  31. manifestChunks = append(manifestChunks, c)
  32. } else {
  33. nonManifestChunks = append(nonManifestChunks, c)
  34. }
  35. }
  36. return
  37. }
  38. func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  39. // TODO maybe parallel this
  40. for _, chunk := range chunks {
  41. if max(chunk.Offset, startOffset) >= min(chunk.Offset+int64(chunk.Size), stopOffset) {
  42. continue
  43. }
  44. if !chunk.IsChunkManifest {
  45. dataChunks = append(dataChunks, chunk)
  46. continue
  47. }
  48. resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
  49. if err != nil {
  50. return chunks, nil, err
  51. }
  52. manifestChunks = append(manifestChunks, chunk)
  53. // recursive
  54. dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
  55. if subErr != nil {
  56. return chunks, nil, subErr
  57. }
  58. dataChunks = append(dataChunks, dchunks...)
  59. manifestChunks = append(manifestChunks, mchunks...)
  60. }
  61. return
  62. }
  63. func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  64. if !chunk.IsChunkManifest {
  65. return
  66. }
  67. // IsChunkManifest
  68. data := mem.Allocate(int(chunk.Size))
  69. defer mem.Free(data)
  70. _, err := fetchChunk(data, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
  71. if err != nil {
  72. return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
  73. }
  74. m := &filer_pb.FileChunkManifest{}
  75. if err := proto.Unmarshal(data, m); err != nil {
  76. return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
  77. }
  78. // recursive
  79. filer_pb.AfterEntryDeserialization(m.Chunks)
  80. return m.Chunks, nil
  81. }
  82. // TODO fetch from cache for weed mount?
  83. func fetchChunk(data []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) (int, error) {
  84. urlStrings, err := lookupFileIdFn(fileId)
  85. if err != nil {
  86. glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
  87. return 0, err
  88. }
  89. return retriedFetchChunkData(data, urlStrings, cipherKey, isGzipped, true, 0)
  90. }
  91. func fetchChunkRange(data []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) {
  92. urlStrings, err := lookupFileIdFn(fileId)
  93. if err != nil {
  94. glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
  95. return 0, err
  96. }
  97. return retriedFetchChunkData(data, urlStrings, cipherKey, isGzipped, false, offset)
  98. }
  99. func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
  100. var shouldRetry bool
  101. for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
  102. for _, urlString := range urlStrings {
  103. n = 0
  104. if strings.Contains(urlString, "%") {
  105. urlString = url.PathEscape(urlString)
  106. }
  107. shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
  108. if n < len(buffer) {
  109. x := copy(buffer[n:], data)
  110. n += x
  111. }
  112. })
  113. if !shouldRetry {
  114. break
  115. }
  116. if err != nil {
  117. glog.V(0).Infof("read %s failed, err: %v", urlString, err)
  118. } else {
  119. break
  120. }
  121. }
  122. if err != nil && shouldRetry {
  123. glog.V(0).Infof("retry reading in %v", waitTime)
  124. time.Sleep(waitTime)
  125. } else {
  126. break
  127. }
  128. }
  129. return n, err
  130. }
  131. func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
  132. var shouldRetry bool
  133. var totalWritten int
  134. for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
  135. for _, urlString := range urlStrings {
  136. var localProcesed int
  137. shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
  138. if totalWritten > localProcesed {
  139. toBeSkipped := totalWritten - localProcesed
  140. if len(data) <= toBeSkipped {
  141. localProcesed += len(data)
  142. return // skip if already processed
  143. }
  144. data = data[toBeSkipped:]
  145. localProcesed += toBeSkipped
  146. }
  147. writer.Write(data)
  148. localProcesed += len(data)
  149. totalWritten += len(data)
  150. })
  151. if !shouldRetry {
  152. break
  153. }
  154. if err != nil {
  155. glog.V(0).Infof("read %s failed, err: %v", urlString, err)
  156. } else {
  157. break
  158. }
  159. }
  160. if err != nil && shouldRetry {
  161. glog.V(0).Infof("retry reading in %v", waitTime)
  162. time.Sleep(waitTime)
  163. } else {
  164. break
  165. }
  166. }
  167. return err
  168. }
  169. func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
  170. return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest)
  171. }
  172. func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) {
  173. var dataChunks []*filer_pb.FileChunk
  174. for _, chunk := range inputChunks {
  175. if !chunk.IsChunkManifest {
  176. dataChunks = append(dataChunks, chunk)
  177. } else {
  178. chunks = append(chunks, chunk)
  179. }
  180. }
  181. remaining := len(dataChunks)
  182. for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor {
  183. chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor])
  184. if err != nil {
  185. return dataChunks, err
  186. }
  187. chunks = append(chunks, chunk)
  188. remaining -= mergeFactor
  189. }
  190. // remaining
  191. for i := len(dataChunks) - remaining; i < len(dataChunks); i++ {
  192. chunks = append(chunks, dataChunks[i])
  193. }
  194. return
  195. }
  196. func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
  197. filer_pb.BeforeEntrySerialization(dataChunks)
  198. // create and serialize the manifest
  199. data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{
  200. Chunks: dataChunks,
  201. })
  202. if serErr != nil {
  203. return nil, fmt.Errorf("serializing manifest: %v", serErr)
  204. }
  205. minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
  206. for _, chunk := range dataChunks {
  207. if minOffset > int64(chunk.Offset) {
  208. minOffset = chunk.Offset
  209. }
  210. if maxOffset < int64(chunk.Size)+chunk.Offset {
  211. maxOffset = int64(chunk.Size) + chunk.Offset
  212. }
  213. }
  214. manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0)
  215. if err != nil {
  216. return nil, err
  217. }
  218. manifestChunk.IsChunkManifest = true
  219. manifestChunk.Offset = minOffset
  220. manifestChunk.Size = uint64(maxOffset - minOffset)
  221. return
  222. }
  223. type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error)