From 0452ae6a6c4a2c9543c3e614672017983ca3c179 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 4 Oct 2022 11:35:07 -0700 Subject: [PATCH] filer.sync: limit concurrency when fetching file chunks fix https://github.com/seaweedfs/seaweedfs/issues/3787 --- .../replication/sink/filersink/fetch_write.go | 20 ++++++++++--------- weed/replication/sink/filersink/filer_sink.go | 2 ++ 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index c0321039b..00a48d41a 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -24,15 +24,17 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st var wg sync.WaitGroup for chunkIndex, sourceChunk := range sourceChunks { wg.Add(1) - go func(chunk *filer_pb.FileChunk, index int) { - defer wg.Done() - replicatedChunk, e := fs.replicateOneChunk(chunk, path) - if e != nil { - err = e - return - } - replicatedChunks[index] = replicatedChunk - }(sourceChunk, chunkIndex) + fs.executor.Execute(func() { + func(chunk *filer_pb.FileChunk, index int) { + defer wg.Done() + replicatedChunk, e := fs.replicateOneChunk(chunk, path) + if e != nil { + err = e + return + } + replicatedChunks[index] = replicatedChunk + }(sourceChunk, chunkIndex) + }) } wg.Wait() diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 816d18a02..3af5a4a80 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -32,6 +32,7 @@ type FilerSink struct { address string writeChunkByFiler bool isIncremental bool + executor *util.LimitedConcurrentExecutor } func init() { @@ -53,6 +54,7 @@ func (fs *FilerSink) IsIncremental() bool { func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error { fs.isIncremental = configuration.GetBool(prefix + "is_incremental") fs.dataCenter = configuration.GetString(prefix + "dataCenter") + fs.executor = util.NewLimitedConcurrentExecutor(32) return fs.DoInitialize( "", configuration.GetString(prefix+"grpcAddress"),