From be0379f6fd6e6707e59423cdc606f5493da7df41 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 10 Feb 2026 19:06:35 -0800 Subject: [PATCH] Fix filer.sync retry on stale chunk (#8298) * Fix filer.sync stale chunk uploads * Tweak filersink stale logging --- weed/operation/upload_content.go | 13 +- .../replication/sink/filersink/fetch_write.go | 166 +++++++++++++----- .../sink/filersink/fetch_write_test.go | 79 +++++++++ weed/replication/sink/filersink/filer_sink.go | 11 +- 4 files changed, 219 insertions(+), 50 deletions(-) create mode 100644 weed/replication/sink/filersink/fetch_write_test.go diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 9f96c851a..0fbbde782 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -129,6 +129,17 @@ func newUploader(httpClient HTTPClient) *Uploader { // UploadWithRetry will retry both assigning volume request and uploading content // The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume. func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) { + bytesReader, ok := reader.(*util.BytesReader) + if ok { + data = bytesReader.Bytes + } else { + data, err = io.ReadAll(reader) + if err != nil { + err = fmt.Errorf("read input: %w", err) + return + } + } + doUploadFunc := func() error { var host string @@ -158,7 +169,7 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi uploadOption.Jwt = auth var uploadErr error - uploadResult, uploadErr, data = uploader.doUpload(context.Background(), reader, uploadOption) + uploadResult, uploadErr = uploader.retriedUploadData(context.Background(), data, uploadOption) return uploadErr } if uploadOption.RetryForever { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 1bcb36a5f..f0399c577 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -1,9 +1,11 @@ package filersink import ( + "context" "fmt" "os" "path/filepath" + "strings" "sync" "github.com/schollz/progressbar/v3" @@ -18,7 +20,7 @@ import ( util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) -func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path string) (replicatedChunks []*filer_pb.FileChunk, err error) { +func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path string, sourceMtime int64) (replicatedChunks []*filer_pb.FileChunk, err error) { if len(sourceChunks) == 0 { return } @@ -46,7 +48,7 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st fs.executor.Execute(func() { defer wg.Done() util.Retry("replicate chunks", func() error { - replicatedChunk, e := fs.replicateOneChunk(source, path) + replicatedChunk, e := fs.replicateOneChunk(source, path, sourceMtime) if e != nil { err = e return e @@ -65,9 +67,9 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st return } -func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path string) (*filer_pb.FileChunk, error) { +func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (*filer_pb.FileChunk, error) { - fileId, err := fs.fetchAndWrite(sourceChunk, path) + fileId, err := fs.fetchAndWrite(sourceChunk, path, sourceMtime) if err != nil { return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err) } @@ -84,60 +86,130 @@ func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path str }, nil } -func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) (fileId string, err error) { - - filename, header, resp, err := fs.filerSource.ReadPart(sourceChunk.GetFileIdString()) - if err != nil { - return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err) - } - defer util_http.CloseResponse(resp) - +func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (fileId string, err error) { uploader, err := operation.NewUploader() if err != nil { glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err) return "", fmt.Errorf("upload data: %w", err) } - fileId, uploadResult, err, _ := uploader.UploadWithRetry( - fs, - &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: fs.replication, - Collection: fs.collection, - TtlSec: fs.ttlSec, - DataCenter: fs.dataCenter, - DiskType: fs.diskType, - Path: path, - }, - &operation.UploadOption{ - Filename: filename, - Cipher: false, - IsInputCompressed: "gzip" == header.Get("Content-Encoding"), - MimeType: header.Get("Content-Type"), - PairMap: nil, - RetryForever: true, - }, - func(host, fileId string) string { - fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - if fs.writeChunkByFiler { - fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) - } - glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) - return fileUrl - }, - resp.Body, - ) + retryName := fmt.Sprintf("replicate chunk %s", sourceChunk.GetFileIdString()) + err = util.RetryUntil(retryName, func() error { + filename, header, resp, readErr := fs.filerSource.ReadPart(sourceChunk.GetFileIdString()) + if readErr != nil { + return fmt.Errorf("read part %s: %w", sourceChunk.GetFileIdString(), readErr) + } + defer util_http.CloseResponse(resp) + + currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry( + fs, + &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: fs.replication, + Collection: fs.collection, + TtlSec: fs.ttlSec, + DataCenter: fs.dataCenter, + DiskType: fs.diskType, + Path: path, + }, + &operation.UploadOption{ + Filename: filename, + Cipher: false, + IsInputCompressed: "gzip" == header.Get("Content-Encoding"), + MimeType: header.Get("Content-Type"), + PairMap: nil, + RetryForever: false, + }, + func(host, fileId string) string { + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + if fs.writeChunkByFiler { + fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) + } + glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) + return fileUrl + }, + resp.Body, + ) + if uploadErr != nil { + return fmt.Errorf("upload data: %w", uploadErr) + } + if uploadResult.Error != "" { + return fmt.Errorf("upload result: %v", uploadResult.Error) + } + + fileId = currentFileId + return nil + }, func(uploadErr error) (shouldContinue bool) { + if fs.hasSourceNewerVersion(path, sourceMtime) { + glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, uploadErr) + return false + } + glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), uploadErr) + return true + }) + if err != nil { + return "", err + } + + return fileId, nil +} + +func (fs *FilerSink) hasSourceNewerVersion(targetPath string, sourceMtime int64) bool { + if sourceMtime <= 0 || fs.filerSource == nil { + return false + } + + sourcePath, ok := fs.targetPathToSourcePath(targetPath) + if !ok { + return false + } + sourceEntry, err := filer_pb.GetEntry(context.Background(), fs.filerSource, sourcePath) if err != nil { - glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err) - return "", fmt.Errorf("upload data: %w", err) + glog.V(1).Infof("lookup source entry %s: %v", sourcePath, err) + return false } - if uploadResult.Error != "" { - glog.V(0).Infof("upload failure %v: %v", filename, err) - return "", fmt.Errorf("upload result: %v", uploadResult.Error) + if sourceEntry == nil { + glog.V(1).Infof("source entry %s no longer exists", sourcePath) + return true } - return + return sourceEntry.Attributes != nil && sourceEntry.Attributes.Mtime > sourceMtime +} + +func (fs *FilerSink) targetPathToSourcePath(targetPath string) (util.FullPath, bool) { + if fs.filerSource == nil { + return "", false + } + + normalizePath := func(p string) string { + p = strings.TrimSuffix(p, "/") + if p == "" { + return "/" + } + return p + } + + sourceRoot := normalizePath(fs.filerSource.Dir) + targetRoot := normalizePath(fs.dir) + targetPath = normalizePath(targetPath) + + var relative string + switch { + case targetRoot == "/": + relative = strings.TrimPrefix(targetPath, "/") + case targetPath == targetRoot: + relative = "" + case strings.HasPrefix(targetPath, targetRoot+"/"): + relative = targetPath[len(targetRoot)+1:] + default: + return "", false + } + + if relative == "" { + return util.FullPath(sourceRoot), true + } + return util.FullPath(sourceRoot).Child(relative), true } var _ = filer_pb.FilerClient(&FilerSink{}) diff --git a/weed/replication/sink/filersink/fetch_write_test.go b/weed/replication/sink/filersink/fetch_write_test.go new file mode 100644 index 000000000..3238d8fae --- /dev/null +++ b/weed/replication/sink/filersink/fetch_write_test.go @@ -0,0 +1,79 @@ +package filersink + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/replication/source" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func TestTargetPathToSourcePath(t *testing.T) { + tests := []struct { + name string + targetRoot string + sourceRoot string + targetPath string + wantPath util.FullPath + wantOK bool + }{ + { + name: "basic mapping", + targetRoot: "/target", + sourceRoot: "/source", + targetPath: "/target/path/file.txt", + wantPath: "/source/path/file.txt", + wantOK: true, + }, + { + name: "trailing slash roots", + targetRoot: "/target/", + sourceRoot: "/source/", + targetPath: "/target/path/file.txt", + wantPath: "/source/path/file.txt", + wantOK: true, + }, + { + name: "root target mapping", + targetRoot: "/", + sourceRoot: "/source", + targetPath: "/path/file.txt", + wantPath: "/source/path/file.txt", + wantOK: true, + }, + { + name: "target root itself", + targetRoot: "/target", + sourceRoot: "/source", + targetPath: "/target", + wantPath: "/source", + wantOK: true, + }, + { + name: "outside target root", + targetRoot: "/target", + sourceRoot: "/source", + targetPath: "/other/path/file.txt", + wantPath: "", + wantOK: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + fs := &FilerSink{ + dir: tc.targetRoot, + filerSource: &source.FilerSource{ + Dir: tc.sourceRoot, + }, + } + + gotPath, ok := fs.targetPathToSourcePath(tc.targetPath) + if ok != tc.wantOK { + t.Fatalf("ok mismatch: got %v, want %v", ok, tc.wantOK) + } + if gotPath != tc.wantPath { + t.Fatalf("path mismatch: got %q, want %q", gotPath, tc.wantPath) + } + }) + } +} diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 74e2b3a5a..280f71c86 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -127,7 +127,7 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [ } } - replicatedChunks, err := fs.replicateChunks(entry.GetChunks(), key) + replicatedChunks, err := fs.replicateChunks(entry.GetChunks(), key, getEntryMtime(entry)) if err != nil { // only warning here since the source chunk may have been deleted already @@ -211,7 +211,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent } // replicate the chunks that are new in the source - replicatedChunks, err := fs.replicateChunks(newChunks, key) + replicatedChunks, err := fs.replicateChunks(newChunks, key, getEntryMtime(newEntry)) if err != nil { glog.Warningf("replicate entry chunks %s: %v", key, err) return true, nil @@ -261,3 +261,10 @@ func compareChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunc return } + +func getEntryMtime(entry *filer_pb.Entry) int64 { + if entry == nil || entry.Attributes == nil { + return 0 + } + return entry.Attributes.Mtime +}