|
|
@ -13,6 +13,10 @@ import ( |
|
|
|
"github.com/chrislusf/seaweedfs/weed/util" |
|
|
|
) |
|
|
|
|
|
|
|
const ( |
|
|
|
ManifestBatch = 10000 |
|
|
|
) |
|
|
|
|
|
|
|
func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { |
|
|
|
for _, chunk := range chunks { |
|
|
|
if chunk.IsChunkManifest { |
|
|
@ -51,10 +55,11 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// TODO fetch from cache for weed mount?
|
|
|
|
func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { |
|
|
|
urlString, err := lookupFileIdFn(fileId) |
|
|
|
if err != nil { |
|
|
|
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) |
|
|
|
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
var buffer bytes.Buffer |
|
|
@ -69,8 +74,8 @@ func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKe |
|
|
|
return buffer.Bytes(), nil |
|
|
|
} |
|
|
|
|
|
|
|
func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) { |
|
|
|
return doMaybeManifestize(saveFunc, dataChunks, 10000, mergeIntoManifest) |
|
|
|
func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) { |
|
|
|
return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest) |
|
|
|
} |
|
|
|
|
|
|
|
func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) { |
|
|
@ -84,15 +89,14 @@ func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*fil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
manifestBatch := mergeFactor |
|
|
|
remaining := len(dataChunks) |
|
|
|
for i := 0; i+manifestBatch <= len(dataChunks); i += manifestBatch { |
|
|
|
chunk, err := mergefn(saveFunc, dataChunks[i:i+manifestBatch]) |
|
|
|
for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor { |
|
|
|
chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor]) |
|
|
|
if err != nil { |
|
|
|
return dataChunks, err |
|
|
|
} |
|
|
|
chunks = append(chunks, chunk) |
|
|
|
remaining -= manifestBatch |
|
|
|
remaining -= mergeFactor |
|
|
|
} |
|
|
|
// remaining
|
|
|
|
for i := len(dataChunks) - remaining; i < len(dataChunks); i++ { |
|
|
@ -112,8 +116,7 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer |
|
|
|
} |
|
|
|
|
|
|
|
minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64) |
|
|
|
for k := 0; k < len(dataChunks); k++ { |
|
|
|
chunk := dataChunks[k] |
|
|
|
for _, chunk := range dataChunks { |
|
|
|
if minOffset > int64(chunk.Offset) { |
|
|
|
minOffset = chunk.Offset |
|
|
|
} |
|
|
|