You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

136 lines
4.1 KiB

  1. package filer2
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "github.com/golang/protobuf/proto"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
  13. for _, chunk := range chunks {
  14. if chunk.IsChunkManifest {
  15. return true
  16. }
  17. }
  18. return false
  19. }
  20. func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manefestResolveErr error) {
  21. // TODO maybe parallel this
  22. for _, chunk := range chunks {
  23. if !chunk.IsChunkManifest {
  24. dataChunks = append(dataChunks, chunk)
  25. continue
  26. }
  27. // IsChunkManifest
  28. data, err := fetchChunk(lookupFileIdFn, chunk.FileId, chunk.CipherKey, chunk.IsCompressed)
  29. if err != nil {
  30. return chunks, nil, fmt.Errorf("fail to read manifest %s: %v", chunk.FileId, err)
  31. }
  32. m := &filer_pb.FileChunkManifest{}
  33. if err := proto.Unmarshal(data, m); err != nil {
  34. return chunks, nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.FileId, err)
  35. }
  36. manifestChunks = append(manifestChunks, chunk)
  37. // recursive
  38. dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, m.Chunks)
  39. if subErr != nil {
  40. return chunks, nil, subErr
  41. }
  42. dataChunks = append(dataChunks, dchunks...)
  43. manifestChunks = append(manifestChunks, mchunks...)
  44. }
  45. return
  46. }
  47. func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
  48. urlString, err := lookupFileIdFn(fileId)
  49. if err != nil {
  50. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err)
  51. return nil, err
  52. }
  53. var buffer bytes.Buffer
  54. err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) {
  55. buffer.Write(data)
  56. })
  57. if err != nil {
  58. glog.V(0).Infof("read %s failed, err: %v", fileId, err)
  59. return nil, err
  60. }
  61. return buffer.Bytes(), nil
  62. }
  63. func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
  64. return doMaybeManifestize(saveFunc, dataChunks, 10000, mergeIntoManifest)
  65. }
  66. func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) {
  67. var dataChunks []*filer_pb.FileChunk
  68. for _, chunk := range inputChunks {
  69. if !chunk.IsChunkManifest {
  70. dataChunks = append(dataChunks, chunk)
  71. } else {
  72. chunks = append(chunks, chunk)
  73. }
  74. }
  75. manifestBatch := mergeFactor
  76. remaining := len(dataChunks)
  77. for i := 0; i+manifestBatch <= len(dataChunks); i += manifestBatch {
  78. chunk, err := mergefn(saveFunc, dataChunks[i:i+manifestBatch])
  79. if err != nil {
  80. return dataChunks, err
  81. }
  82. chunks = append(chunks, chunk)
  83. remaining -= manifestBatch
  84. }
  85. // remaining
  86. for i := len(dataChunks) - remaining; i < len(dataChunks); i++ {
  87. chunks = append(chunks, dataChunks[i])
  88. }
  89. return
  90. }
  91. func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
  92. // create and serialize the manifest
  93. data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{
  94. Chunks: dataChunks,
  95. })
  96. if serErr != nil {
  97. return nil, fmt.Errorf("serializing manifest: %v", serErr)
  98. }
  99. minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
  100. for k := 0; k < len(dataChunks); k++ {
  101. chunk := dataChunks[k]
  102. if minOffset > int64(chunk.Offset) {
  103. minOffset = chunk.Offset
  104. }
  105. if maxOffset < int64(chunk.Size)+chunk.Offset {
  106. maxOffset = int64(chunk.Size) + chunk.Offset
  107. }
  108. }
  109. manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0)
  110. if err != nil {
  111. return nil, err
  112. }
  113. manifestChunk.IsChunkManifest = true
  114. manifestChunk.Offset = minOffset
  115. manifestChunk.Size = uint64(maxOffset - minOffset)
  116. return
  117. }
  118. type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error)