From af658bf1e81b2e41c64df9630ddf69acf5ec21d0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 10 Feb 2026 19:40:23 -0800 Subject: [PATCH] filersink: address manifest sync review feedback --- .../replication/sink/filersink/fetch_write.go | 80 +++++++++++-------- weed/replication/sink/filersink/filer_sink.go | 4 +- 2 files changed, 49 insertions(+), 35 deletions(-) diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index c0ffa5b2c..564203470 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -24,10 +24,13 @@ import ( util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) -func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path string, sourceMtime int64) (replicatedChunks []*filer_pb.FileChunk, err error) { +func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_pb.FileChunk, path string, sourceMtime int64) (replicatedChunks []*filer_pb.FileChunk, err error) { if len(sourceChunks) == 0 { return } + if ctx == nil { + ctx = context.Background() + } // a simple progress bar. Not ideal. Fix me. var bar *progressbar.ProgressBar @@ -55,23 +58,28 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st } errLock.Unlock() } + hasError := func() bool { + errLock.Lock() + defer errLock.Unlock() + return err != nil + } var wg sync.WaitGroup for chunkIndex, sourceChunk := range sourceChunks { + if hasError() { + break + } + if sourceChunk.IsChunkManifest { - retryName := fmt.Sprintf("replicate manifest chunk %s", sourceChunk.GetFileIdString()) - retryErr := util.Retry(retryName, func() error { - replicatedChunk, e := fs.replicateOneManifestChunk(sourceChunk, path, sourceMtime) - if e != nil { - return e - } - replicatedChunks[chunkIndex] = replicatedChunk - if bar != nil { - bar.Add(1) - } - return nil - }) - setError(retryErr) + replicatedChunk, replicateErr := fs.replicateOneManifestChunk(ctx, sourceChunk, path, sourceMtime) + if replicateErr != nil { + setError(replicateErr) + break + } + replicatedChunks[chunkIndex] = replicatedChunk + if bar != nil { + bar.Add(1) + } continue } @@ -79,18 +87,24 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st index, source := chunkIndex, sourceChunk fs.executor.Execute(func() { defer wg.Done() - util.Retry("replicate chunks", func() error { - replicatedChunk, e := fs.replicateOneChunk(source, path, sourceMtime) + var replicatedChunk *filer_pb.FileChunk + retryErr := util.Retry("replicate chunks", func() error { + chunk, e := fs.replicateOneChunk(source, path, sourceMtime) if e != nil { - setError(e) return e } - replicatedChunks[index] = replicatedChunk - if bar != nil { - bar.Add(1) - } + replicatedChunk = chunk return nil }) + if retryErr != nil { + setError(retryErr) + return + } + + replicatedChunks[index] = replicatedChunk + if bar != nil { + bar.Add(1) + } }) } wg.Wait() @@ -119,13 +133,13 @@ func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path str }, nil } -func (fs *FilerSink) replicateOneManifestChunk(sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (*filer_pb.FileChunk, error) { - resolvedChunks, err := filer.ResolveOneChunkManifest(context.Background(), fs.filerSource.LookupFileId, sourceChunk) +func (fs *FilerSink) replicateOneManifestChunk(ctx context.Context, sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (*filer_pb.FileChunk, error) { + resolvedChunks, err := filer.ResolveOneChunkManifest(ctx, fs.filerSource.LookupFileId, sourceChunk) if err != nil { return nil, fmt.Errorf("resolve manifest %s: %w", sourceChunk.GetFileIdString(), err) } - replicatedResolvedChunks, err := fs.replicateChunks(resolvedChunks, path, sourceMtime) + replicatedResolvedChunks, err := fs.replicateChunks(ctx, resolvedChunks, path, sourceMtime) if err != nil { return nil, fmt.Errorf("replicate manifest data chunks %s: %w", sourceChunk.GetFileIdString(), err) } @@ -188,11 +202,7 @@ func (fs *FilerSink) uploadManifestChunk(path string, sourceMtime int64, sourceF RetryForever: false, }, func(host, fileId string) string { - fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - if fs.writeChunkByFiler { - fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) - } - return fileUrl + return fs.buildUploadUrl(host, fileId) }, bytes.NewReader(manifestData), ) @@ -255,10 +265,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, RetryForever: false, }, func(host, fileId string) string { - fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - if fs.writeChunkByFiler { - fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) - } + fileUrl := fs.buildUploadUrl(host, fileId) glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) return fileUrl }, @@ -288,6 +295,13 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, return fileId, nil } +func (fs *FilerSink) buildUploadUrl(host, fileId string) string { + if fs.writeChunkByFiler { + return fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) + } + return fmt.Sprintf("http://%s/%s", host, fileId) +} + func (fs *FilerSink) hasSourceNewerVersion(targetPath string, sourceMtime int64) bool { if sourceMtime <= 0 || fs.filerSource == nil { return false diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 280f71c86..324bd7bd9 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -127,7 +127,7 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [ } } - replicatedChunks, err := fs.replicateChunks(entry.GetChunks(), key, getEntryMtime(entry)) + replicatedChunks, err := fs.replicateChunks(context.Background(), entry.GetChunks(), key, getEntryMtime(entry)) if err != nil { // only warning here since the source chunk may have been deleted already @@ -211,7 +211,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent } // replicate the chunks that are new in the source - replicatedChunks, err := fs.replicateChunks(newChunks, key, getEntryMtime(newEntry)) + replicatedChunks, err := fs.replicateChunks(context.Background(), newChunks, key, getEntryMtime(newEntry)) if err != nil { glog.Warningf("replicate entry chunks %s: %v", key, err) return true, nil