From 0647f66bb54144362c053d135687416dfa0a5802 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2026 14:33:37 -0700 Subject: [PATCH] filer.sync: add exponential backoff on unexpected EOF during replication (#8557) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * filer.sync: add exponential backoff on unexpected EOF during replication When the source volume server drops connections under high traffic, filer.sync retries aggressively (every 1-6s), hammering the already overloaded source. This adds a longer exponential backoff (10s to 2min) specifically for "unexpected EOF" errors, reducing pressure on the source while still retrying indefinitely until success. Also adds more logging throughout the replication path: - Log source URL and error at V(0) when ReadPart or io.ReadAll fails - Log content-length and byte counts at V(4) on success - Log backoff duration in retry messages Fixes #8542 * filer.sync: extract backoff helper and fix 2-minute cap - Extract nextEofBackoff() and isEofError() helpers to deduplicate the backoff logic between fetchAndWrite and uploadManifestChunk - Fix the cap: previously 80s would double to 160s and pass the < 2min check uncapped. Now doubles first, then clamps to 2min. * filer.sync: log source URL instead of empty upload URL on read errors UploadUrl is not populated until after the reader is consumed, so the V(0) and V(4) logs were printing an empty string. Add SourceUrl field to UploadOption and populate it from the HTTP response in fetchAndWrite. * filer.sync: guard isEofError against nil error * filer.sync: use errors.Is for EOF detection, fix log wording - Replace broad substring matching ("read input", "unexpected EOF") with errors.Is(err, io.ErrUnexpectedEOF) and errors.Is(err, io.EOF) so only actual EOF errors trigger the longer backoff - Fix awkward log phrasing: "interrupted replicate" → "interrupted while replicating" * filer.sync: remove EOF backoff from uploadManifestChunk uploadManifestChunk reads from an in-memory bytes.Reader, so any EOF errors there are from the destination side, not a broken source stream. The long source-oriented backoff is inappropriate; let RetryUntil handle destination retries at its normal cadence. --------- Co-authored-by: Copilot --- weed/operation/upload_content.go | 3 ++ .../replication/sink/filersink/fetch_write.go | 43 ++++++++++++++++++- weed/replication/source/filer_source.go | 11 ++++- 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 0fbbde782..b3d4ae895 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -38,6 +38,7 @@ type UploadOption struct { RetryForever bool Md5 string BytesBuffer *bytes.Buffer + SourceUrl string // optional: for logging when reading from a remote source } type UploadResult struct { @@ -135,9 +136,11 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi } else { data, err = io.ReadAll(reader) if err != nil { + glog.V(0).Infof("upload read input %s: %v", uploadOption.SourceUrl, err) err = fmt.Errorf("read input: %w", err) return } + glog.V(4).Infof("upload read %d bytes from %s", len(data), uploadOption.SourceUrl) } doUploadFunc := func() error { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 564203470..962bbf30a 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -3,11 +3,14 @@ package filersink import ( "bytes" "context" + "errors" "fmt" + "io" "os" "path/filepath" "strings" "sync" + "time" "github.com/schollz/progressbar/v3" "google.golang.org/protobuf/proto" @@ -220,7 +223,7 @@ func (fs *FilerSink) uploadManifestChunk(path string, sourceMtime int64, sourceF glog.V(1).Infof("skip retrying stale source manifest %s for %s: %v", sourceFileId, path, uploadErr) return false } - glog.V(0).Infof("upload source manifest %v: %v", sourceFileId, uploadErr) + glog.V(0).Infof("replicate manifest %s for %s: %v", sourceFileId, path, uploadErr) return true }) if err != nil { @@ -237,6 +240,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, return "", fmt.Errorf("upload data: %w", err) } + eofBackoff := time.Duration(0) retryName := fmt.Sprintf("replicate chunk %s", sourceChunk.GetFileIdString()) err = util.RetryUntil(retryName, func() error { filename, header, resp, readErr := fs.filerSource.ReadPart(sourceChunk.GetFileIdString()) @@ -245,6 +249,11 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, } defer util_http.CloseResponse(resp) + sourceUrl := "" + if resp.Request != nil && resp.Request.URL != nil { + sourceUrl = resp.Request.URL.String() + } + currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry( fs, &filer_pb.AssignVolumeRequest{ @@ -263,6 +272,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, MimeType: header.Get("Content-Type"), PairMap: nil, RetryForever: false, + SourceUrl: sourceUrl, }, func(host, fileId string) string { fileUrl := fs.buildUploadUrl(host, fileId) @@ -278,6 +288,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, return fmt.Errorf("upload result: %v", uploadResult.Error) } + eofBackoff = 0 fileId = currentFileId return nil }, func(uploadErr error) (shouldContinue bool) { @@ -285,7 +296,13 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, uploadErr) return false } - glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), uploadErr) + if isEofError(uploadErr) { + eofBackoff = nextEofBackoff(eofBackoff) + glog.V(0).Infof("source connection interrupted while replicating %s for %s, backing off %v: %v", sourceChunk.GetFileIdString(), path, eofBackoff, uploadErr) + time.Sleep(eofBackoff) + } else { + glog.V(0).Infof("replicate %s for %s: %v", sourceChunk.GetFileIdString(), path, uploadErr) + } return true }) if err != nil { @@ -295,6 +312,28 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, return fileId, nil } +const maxEofBackoff = 2 * time.Minute + +// nextEofBackoff returns the next backoff duration for unexpected EOF errors. +// It starts at 10s, doubles each time, and caps at 2 minutes. +func nextEofBackoff(current time.Duration) time.Duration { + if current < 10*time.Second { + return 10 * time.Second + } + current *= 2 + if current > maxEofBackoff { + current = maxEofBackoff + } + return current +} + +func isEofError(err error) bool { + if err == nil { + return false + } + return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) +} + func (fs *FilerSink) buildUploadUrl(host, fileId string) string { if fs.writeChunkByFiler { return fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index fa9a285d9..bb110d1cf 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -107,7 +107,13 @@ func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrls func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) { if fs.proxyByFiler { - return util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "") + filename, header, resp, err = util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "") + if err != nil { + glog.V(0).Infof("read part %s via filer proxy %s: %v", fileId, fs.address, err) + } else { + glog.V(4).Infof("read part %s via filer proxy %s content-length:%s", fileId, fs.address, header.Get("Content-Length")) + } + return } fileUrls, err := fs.LookupFileId(context.Background(), fileId) @@ -118,8 +124,9 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea for _, fileUrl := range fileUrls { filename, header, resp, err = util_http.DownloadFile(fileUrl, "") if err != nil { - glog.V(1).Infof("fail to read from %s: %v", fileUrl, err) + glog.V(0).Infof("fail to read part %s from %s: %v", fileId, fileUrl, err) } else { + glog.V(4).Infof("read part %s from %s content-length:%s", fileId, fileUrl, header.Get("Content-Length")) break } }