diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 0fbbde782..b3d4ae895 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -38,6 +38,7 @@ type UploadOption struct { RetryForever bool Md5 string BytesBuffer *bytes.Buffer + SourceUrl string // optional: for logging when reading from a remote source } type UploadResult struct { @@ -135,9 +136,11 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi } else { data, err = io.ReadAll(reader) if err != nil { + glog.V(0).Infof("upload read input %s: %v", uploadOption.SourceUrl, err) err = fmt.Errorf("read input: %w", err) return } + glog.V(4).Infof("upload read %d bytes from %s", len(data), uploadOption.SourceUrl) } doUploadFunc := func() error { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 564203470..962bbf30a 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -3,11 +3,14 @@ package filersink import ( "bytes" "context" + "errors" "fmt" + "io" "os" "path/filepath" "strings" "sync" + "time" "github.com/schollz/progressbar/v3" "google.golang.org/protobuf/proto" @@ -220,7 +223,7 @@ func (fs *FilerSink) uploadManifestChunk(path string, sourceMtime int64, sourceF glog.V(1).Infof("skip retrying stale source manifest %s for %s: %v", sourceFileId, path, uploadErr) return false } - glog.V(0).Infof("upload source manifest %v: %v", sourceFileId, uploadErr) + glog.V(0).Infof("replicate manifest %s for %s: %v", sourceFileId, path, uploadErr) return true }) if err != nil { @@ -237,6 +240,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, return "", fmt.Errorf("upload data: %w", err) } + eofBackoff := time.Duration(0) retryName := fmt.Sprintf("replicate chunk %s", sourceChunk.GetFileIdString()) err = util.RetryUntil(retryName, func() error { filename, header, resp, readErr := fs.filerSource.ReadPart(sourceChunk.GetFileIdString()) @@ -245,6 +249,11 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, } defer util_http.CloseResponse(resp) + sourceUrl := "" + if resp.Request != nil && resp.Request.URL != nil { + sourceUrl = resp.Request.URL.String() + } + currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry( fs, &filer_pb.AssignVolumeRequest{ @@ -263,6 +272,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, MimeType: header.Get("Content-Type"), PairMap: nil, RetryForever: false, + SourceUrl: sourceUrl, }, func(host, fileId string) string { fileUrl := fs.buildUploadUrl(host, fileId) @@ -278,6 +288,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, return fmt.Errorf("upload result: %v", uploadResult.Error) } + eofBackoff = 0 fileId = currentFileId return nil }, func(uploadErr error) (shouldContinue bool) { @@ -285,7 +296,13 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, uploadErr) return false } - glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), uploadErr) + if isEofError(uploadErr) { + eofBackoff = nextEofBackoff(eofBackoff) + glog.V(0).Infof("source connection interrupted while replicating %s for %s, backing off %v: %v", sourceChunk.GetFileIdString(), path, eofBackoff, uploadErr) + time.Sleep(eofBackoff) + } else { + glog.V(0).Infof("replicate %s for %s: %v", sourceChunk.GetFileIdString(), path, uploadErr) + } return true }) if err != nil { @@ -295,6 +312,28 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, return fileId, nil } +const maxEofBackoff = 2 * time.Minute + +// nextEofBackoff returns the next backoff duration for unexpected EOF errors. +// It starts at 10s, doubles each time, and caps at 2 minutes. +func nextEofBackoff(current time.Duration) time.Duration { + if current < 10*time.Second { + return 10 * time.Second + } + current *= 2 + if current > maxEofBackoff { + current = maxEofBackoff + } + return current +} + +func isEofError(err error) bool { + if err == nil { + return false + } + return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) +} + func (fs *FilerSink) buildUploadUrl(host, fileId string) string { if fs.writeChunkByFiler { return fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index fa9a285d9..bb110d1cf 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -107,7 +107,13 @@ func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrls func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) { if fs.proxyByFiler { - return util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "") + filename, header, resp, err = util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "") + if err != nil { + glog.V(0).Infof("read part %s via filer proxy %s: %v", fileId, fs.address, err) + } else { + glog.V(4).Infof("read part %s via filer proxy %s content-length:%s", fileId, fs.address, header.Get("Content-Length")) + } + return } fileUrls, err := fs.LookupFileId(context.Background(), fileId) @@ -118,8 +124,9 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea for _, fileUrl := range fileUrls { filename, header, resp, err = util_http.DownloadFile(fileUrl, "") if err != nil { - glog.V(1).Infof("fail to read from %s: %v", fileUrl, err) + glog.V(0).Infof("fail to read part %s from %s: %v", fileId, fileUrl, err) } else { + glog.V(4).Infof("read part %s from %s content-length:%s", fileId, fileUrl, header.Get("Content-Length")) break } }