You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

178 lines
5.2 KiB

5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "github.com/golang/protobuf/proto"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. const (
  13. ManifestBatch = 1000
  14. )
  15. func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
  16. for _, chunk := range chunks {
  17. if chunk.IsChunkManifest {
  18. return true
  19. }
  20. }
  21. return false
  22. }
  23. func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) {
  24. for _, c := range chunks {
  25. if c.IsChunkManifest {
  26. manifestChunks = append(manifestChunks, c)
  27. } else {
  28. nonManifestChunks = append(nonManifestChunks, c)
  29. }
  30. }
  31. return
  32. }
  33. func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  34. // TODO maybe parallel this
  35. for _, chunk := range chunks {
  36. if !chunk.IsChunkManifest {
  37. dataChunks = append(dataChunks, chunk)
  38. continue
  39. }
  40. resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
  41. if err != nil {
  42. return chunks, nil, err
  43. }
  44. manifestChunks = append(manifestChunks, chunk)
  45. // recursive
  46. dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks)
  47. if subErr != nil {
  48. return chunks, nil, subErr
  49. }
  50. dataChunks = append(dataChunks, dchunks...)
  51. manifestChunks = append(manifestChunks, mchunks...)
  52. }
  53. return
  54. }
  55. func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  56. if !chunk.IsChunkManifest {
  57. return
  58. }
  59. // IsChunkManifest
  60. data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
  61. if err != nil {
  62. return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
  63. }
  64. m := &filer_pb.FileChunkManifest{}
  65. if err := proto.Unmarshal(data, m); err != nil {
  66. return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
  67. }
  68. // recursive
  69. filer_pb.AfterEntryDeserialization(m.Chunks)
  70. return m.Chunks, nil
  71. }
  72. // TODO fetch from cache for weed mount?
  73. func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
  74. urlStrings, err := lookupFileIdFn(fileId)
  75. if err != nil {
  76. glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
  77. return nil, err
  78. }
  79. return fetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0)
  80. }
  81. func fetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) {
  82. var err error
  83. var buffer bytes.Buffer
  84. for _, urlString := range urlStrings {
  85. err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
  86. buffer.Write(data)
  87. })
  88. if err != nil {
  89. glog.V(0).Infof("read %s failed, err: %v", urlString, err)
  90. buffer.Reset()
  91. } else {
  92. break
  93. }
  94. }
  95. return buffer.Bytes(), err
  96. }
  97. func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
  98. return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest)
  99. }
  100. func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) {
  101. var dataChunks []*filer_pb.FileChunk
  102. for _, chunk := range inputChunks {
  103. if !chunk.IsChunkManifest {
  104. dataChunks = append(dataChunks, chunk)
  105. } else {
  106. chunks = append(chunks, chunk)
  107. }
  108. }
  109. remaining := len(dataChunks)
  110. for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor {
  111. chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor])
  112. if err != nil {
  113. return dataChunks, err
  114. }
  115. chunks = append(chunks, chunk)
  116. remaining -= mergeFactor
  117. }
  118. // remaining
  119. for i := len(dataChunks) - remaining; i < len(dataChunks); i++ {
  120. chunks = append(chunks, dataChunks[i])
  121. }
  122. return
  123. }
  124. func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
  125. filer_pb.BeforeEntrySerialization(dataChunks)
  126. // create and serialize the manifest
  127. data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{
  128. Chunks: dataChunks,
  129. })
  130. if serErr != nil {
  131. return nil, fmt.Errorf("serializing manifest: %v", serErr)
  132. }
  133. minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
  134. for _, chunk := range dataChunks {
  135. if minOffset > int64(chunk.Offset) {
  136. minOffset = chunk.Offset
  137. }
  138. if maxOffset < int64(chunk.Size)+chunk.Offset {
  139. maxOffset = int64(chunk.Size) + chunk.Offset
  140. }
  141. }
  142. manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0)
  143. if err != nil {
  144. return nil, err
  145. }
  146. manifestChunk.IsChunkManifest = true
  147. manifestChunk.Offset = minOffset
  148. manifestChunk.Size = uint64(maxOffset - minOffset)
  149. return
  150. }
  151. type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error)