From 6fbbc785745af3427028dfa78a26466bdfaed41f Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 2 Mar 2022 13:50:46 -0800 Subject: [PATCH] stream reading a whole chunk --- weed/filer/filechunk_manifest.go | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index f2f1c7f16..c74af226c 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -3,12 +3,12 @@ package filer import ( "bytes" "fmt" - "github.com/chrislusf/seaweedfs/weed/util/mem" "github.com/chrislusf/seaweedfs/weed/wdclient" "io" "math" "net/url" "strings" + "sync" "time" "github.com/golang/protobuf/proto" @@ -19,9 +19,15 @@ import ( ) const ( - ManifestBatch = 10000 + ManifestBatch = 3 ) +var bytesBufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { for _, chunk := range chunks { if chunk.IsChunkManifest { @@ -78,14 +84,14 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c } // IsChunkManifest - data := mem.Allocate(int(chunk.Size)) - defer mem.Free(data) - _, err := fetchChunk(data, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) + bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer) + defer bytesBufferPool.Put(bytesBuffer) + err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) if err != nil { return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err) } m := &filer_pb.FileChunkManifest{} - if err := proto.Unmarshal(data, m); err != nil { + if err := proto.Unmarshal(bytesBuffer.Bytes(), m); err != nil { return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err) } @@ -95,13 +101,17 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c } // TODO fetch from cache for weed mount? -func fetchChunk(data []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) (int, error) { +func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error { urlStrings, err := lookupFileIdFn(fileId) if err != nil { glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) - return 0, err + return err + } + err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0) + if err != nil { + return err } - return retriedFetchChunkData(data, urlStrings, cipherKey, isGzipped, true, 0) + return nil } func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {