Browse Source

filer.sync: add exponential backoff on unexpected EOF during replication (#8557)

* filer.sync: add exponential backoff on unexpected EOF during replication

When the source volume server drops connections under high traffic,
filer.sync retries aggressively (every 1-6s), hammering the already
overloaded source. This adds a longer exponential backoff (10s to 2min)
specifically for "unexpected EOF" errors, reducing pressure on the
source while still retrying indefinitely until success.

Also adds more logging throughout the replication path:
- Log source URL and error at V(0) when ReadPart or io.ReadAll fails
- Log content-length and byte counts at V(4) on success
- Log backoff duration in retry messages

Fixes #8542

* filer.sync: extract backoff helper and fix 2-minute cap

- Extract nextEofBackoff() and isEofError() helpers to deduplicate
  the backoff logic between fetchAndWrite and uploadManifestChunk
- Fix the cap: previously 80s would double to 160s and pass the
  < 2min check uncapped. Now doubles first, then clamps to 2min.

* filer.sync: log source URL instead of empty upload URL on read errors

UploadUrl is not populated until after the reader is consumed, so the
V(0) and V(4) logs were printing an empty string. Add SourceUrl field
to UploadOption and populate it from the HTTP response in fetchAndWrite.

* filer.sync: guard isEofError against nil error

* filer.sync: use errors.Is for EOF detection, fix log wording

- Replace broad substring matching ("read input", "unexpected EOF")
  with errors.Is(err, io.ErrUnexpectedEOF) and errors.Is(err, io.EOF)
  so only actual EOF errors trigger the longer backoff
- Fix awkward log phrasing: "interrupted replicate" → "interrupted
  while replicating"

* filer.sync: remove EOF backoff from uploadManifestChunk

uploadManifestChunk reads from an in-memory bytes.Reader, so any EOF
errors there are from the destination side, not a broken source stream.
The long source-oriented backoff is inappropriate; let RetryUntil
handle destination retries at its normal cadence.

---------

Co-authored-by: Copilot <copilot@github.com>
master
Chris Lu 17 hours ago
committed by GitHub
parent
commit
0647f66bb5
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 3
      weed/operation/upload_content.go
  2. 43
      weed/replication/sink/filersink/fetch_write.go
  3. 11
      weed/replication/source/filer_source.go

3
weed/operation/upload_content.go

@ -38,6 +38,7 @@ type UploadOption struct {
RetryForever bool
Md5 string
BytesBuffer *bytes.Buffer
SourceUrl string // optional: for logging when reading from a remote source
}
type UploadResult struct {
@ -135,9 +136,11 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi
} else {
data, err = io.ReadAll(reader)
if err != nil {
glog.V(0).Infof("upload read input %s: %v", uploadOption.SourceUrl, err)
err = fmt.Errorf("read input: %w", err)
return
}
glog.V(4).Infof("upload read %d bytes from %s", len(data), uploadOption.SourceUrl)
}
doUploadFunc := func() error {

43
weed/replication/sink/filersink/fetch_write.go

@ -3,11 +3,14 @@ package filersink
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/schollz/progressbar/v3"
"google.golang.org/protobuf/proto"
@ -220,7 +223,7 @@ func (fs *FilerSink) uploadManifestChunk(path string, sourceMtime int64, sourceF
glog.V(1).Infof("skip retrying stale source manifest %s for %s: %v", sourceFileId, path, uploadErr)
return false
}
glog.V(0).Infof("upload source manifest %v: %v", sourceFileId, uploadErr)
glog.V(0).Infof("replicate manifest %s for %s: %v", sourceFileId, path, uploadErr)
return true
})
if err != nil {
@ -237,6 +240,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
return "", fmt.Errorf("upload data: %w", err)
}
eofBackoff := time.Duration(0)
retryName := fmt.Sprintf("replicate chunk %s", sourceChunk.GetFileIdString())
err = util.RetryUntil(retryName, func() error {
filename, header, resp, readErr := fs.filerSource.ReadPart(sourceChunk.GetFileIdString())
@ -245,6 +249,11 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
}
defer util_http.CloseResponse(resp)
sourceUrl := ""
if resp.Request != nil && resp.Request.URL != nil {
sourceUrl = resp.Request.URL.String()
}
currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry(
fs,
&filer_pb.AssignVolumeRequest{
@ -263,6 +272,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
MimeType: header.Get("Content-Type"),
PairMap: nil,
RetryForever: false,
SourceUrl: sourceUrl,
},
func(host, fileId string) string {
fileUrl := fs.buildUploadUrl(host, fileId)
@ -278,6 +288,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
return fmt.Errorf("upload result: %v", uploadResult.Error)
}
eofBackoff = 0
fileId = currentFileId
return nil
}, func(uploadErr error) (shouldContinue bool) {
@ -285,7 +296,13 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, uploadErr)
return false
}
glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), uploadErr)
if isEofError(uploadErr) {
eofBackoff = nextEofBackoff(eofBackoff)
glog.V(0).Infof("source connection interrupted while replicating %s for %s, backing off %v: %v", sourceChunk.GetFileIdString(), path, eofBackoff, uploadErr)
time.Sleep(eofBackoff)
} else {
glog.V(0).Infof("replicate %s for %s: %v", sourceChunk.GetFileIdString(), path, uploadErr)
}
return true
})
if err != nil {
@ -295,6 +312,28 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
return fileId, nil
}
const maxEofBackoff = 2 * time.Minute
// nextEofBackoff returns the next backoff duration for unexpected EOF errors.
// It starts at 10s, doubles each time, and caps at 2 minutes.
func nextEofBackoff(current time.Duration) time.Duration {
if current < 10*time.Second {
return 10 * time.Second
}
current *= 2
if current > maxEofBackoff {
current = maxEofBackoff
}
return current
}
func isEofError(err error) bool {
if err == nil {
return false
}
return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF)
}
func (fs *FilerSink) buildUploadUrl(host, fileId string) string {
if fs.writeChunkByFiler {
return fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId)

11
weed/replication/source/filer_source.go

@ -107,7 +107,13 @@ func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrls
func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) {
if fs.proxyByFiler {
return util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "")
filename, header, resp, err = util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "")
if err != nil {
glog.V(0).Infof("read part %s via filer proxy %s: %v", fileId, fs.address, err)
} else {
glog.V(4).Infof("read part %s via filer proxy %s content-length:%s", fileId, fs.address, header.Get("Content-Length"))
}
return
}
fileUrls, err := fs.LookupFileId(context.Background(), fileId)
@ -118,8 +124,9 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea
for _, fileUrl := range fileUrls {
filename, header, resp, err = util_http.DownloadFile(fileUrl, "")
if err != nil {
glog.V(1).Infof("fail to read from %s: %v", fileUrl, err)
glog.V(0).Infof("fail to read part %s from %s: %v", fileId, fileUrl, err)
} else {
glog.V(4).Infof("read part %s from %s content-length:%s", fileId, fileUrl, header.Get("Content-Length"))
break
}
}

Loading…
Cancel
Save