diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index c0321039b..00a48d41a 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -24,15 +24,17 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st var wg sync.WaitGroup for chunkIndex, sourceChunk := range sourceChunks { wg.Add(1) - go func(chunk *filer_pb.FileChunk, index int) { - defer wg.Done() - replicatedChunk, e := fs.replicateOneChunk(chunk, path) - if e != nil { - err = e - return - } - replicatedChunks[index] = replicatedChunk - }(sourceChunk, chunkIndex) + fs.executor.Execute(func() { + func(chunk *filer_pb.FileChunk, index int) { + defer wg.Done() + replicatedChunk, e := fs.replicateOneChunk(chunk, path) + if e != nil { + err = e + return + } + replicatedChunks[index] = replicatedChunk + }(sourceChunk, chunkIndex) + }) } wg.Wait() diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 816d18a02..3af5a4a80 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -32,6 +32,7 @@ type FilerSink struct { address string writeChunkByFiler bool isIncremental bool + executor *util.LimitedConcurrentExecutor } func init() { @@ -53,6 +54,7 @@ func (fs *FilerSink) IsIncremental() bool { func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error { fs.isIncremental = configuration.GetBool(prefix + "is_incremental") fs.dataCenter = configuration.GetString(prefix + "dataCenter") + fs.executor = util.NewLimitedConcurrentExecutor(32) return fs.DoInitialize( "", configuration.GetString(prefix+"grpcAddress"),