diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index d9d0331be..60a5c538b 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -5,8 +5,6 @@ import ( "fmt" "io" "math" - "net/url" - "strings" "sync" "time" @@ -122,44 +120,7 @@ func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunction glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) return 0, err } - return retriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset) -} - -func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) { - - var shouldRetry bool - - for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { - for _, urlString := range urlStrings { - n = 0 - if strings.Contains(urlString, "%") { - urlString = url.PathEscape(urlString) - } - shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { - if n < len(buffer) { - x := copy(buffer[n:], data) - n += x - } - }) - if !shouldRetry { - break - } - if err != nil { - glog.V(0).Infof("read %s failed, err: %v", urlString, err) - } else { - break - } - } - if err != nil && shouldRetry { - glog.V(0).Infof("retry reading in %v", waitTime) - time.Sleep(waitTime) - } else { - break - } - } - - return n, err - + return util.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset) } func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index 27d40a78b..f40bb1285 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -2,6 +2,7 @@ package filer import ( "fmt" + "github.com/seaweedfs/seaweedfs/weed/util" "sync" "sync/atomic" "time" @@ -170,7 +171,7 @@ func (s *SingleChunkCacher) startCaching() { s.data = mem.Allocate(s.chunkSize) - _, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0) + _, s.err = util.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0) if s.err != nil { mem.Free(s.data) s.data = nil diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 73a2a219c..51a82fb2e 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -176,7 +176,7 @@ func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer return err } - n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk) + n, err := util.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk) if err != nil { return err } diff --git a/weed/util/http_util.go b/weed/util/http_util.go index ef4b29158..d1505f673 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -10,6 +10,7 @@ import ( "net/http" "net/url" "strings" + "time" "github.com/seaweedfs/seaweedfs/weed/glog" ) @@ -450,3 +451,40 @@ func (r *CountingReader) Read(p []byte) (n int, err error) { r.BytesRead += n return n, err } + +func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) { + + var shouldRetry bool + + for waitTime := time.Second; waitTime < RetryWaitTime; waitTime += waitTime / 2 { + for _, urlString := range urlStrings { + n = 0 + if strings.Contains(urlString, "%") { + urlString = url.PathEscape(urlString) + } + shouldRetry, err = ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { + if n < len(buffer) { + x := copy(buffer[n:], data) + n += x + } + }) + if !shouldRetry { + break + } + if err != nil { + glog.V(0).Infof("read %s failed, err: %v", urlString, err) + } else { + break + } + } + if err != nil && shouldRetry { + glog.V(0).Infof("retry reading in %v", waitTime) + time.Sleep(waitTime) + } else { + break + } + } + + return n, err + +}