From b961fcd338c1499fd8f1de9a7e77da4e127dfe65 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 13 Aug 2021 11:00:11 -0700 Subject: [PATCH] filer: stream read from volume server, reduce memory usage --- weed/filer/filechunk_manifest.go | 35 ++++++++++++++++++++++++++++++++ weed/filer/stream.go | 10 ++------- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 0c6b0cfe3..2115e7856 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -132,6 +132,41 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool } +func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { + + var shouldRetry bool + var written int + + for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { + for _, urlString := range urlStrings { + shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + writer.Write(data) + written += len(data) + }) + if !shouldRetry { + break + } + if err != nil { + glog.V(0).Infof("read %s failed, err: %v", urlString, err) + if written > 0 { + break + } + } else { + break + } + } + if err != nil && shouldRetry && written > 0 { + glog.V(0).Infof("retry reading in %v", waitTime) + time.Sleep(waitTime) + } else { + break + } + } + + return err + +} + func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) { return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest) } diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 503e6b23f..c61ee3c12 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -16,7 +16,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/wdclient" ) -func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { +func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks) chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) @@ -40,18 +40,12 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c urlStrings := fileId2Url[chunkView.FileId] start := time.Now() - data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) + err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) if err != nil { stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc() return fmt.Errorf("read chunk: %v", err) } - - _, err = w.Write(data) - if err != nil { - stats.FilerRequestCounter.WithLabelValues("chunkDownloadedError").Inc() - return fmt.Errorf("write chunk: %v", err) - } stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc() }