diff --git a/weed/filer/meta_replay.go b/weed/filer/meta_replay.go index 51c4e6987..0432e17de 100644 --- a/weed/filer/meta_replay.go +++ b/weed/filer/meta_replay.go @@ -2,6 +2,7 @@ package filer import ( "context" + "sync" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -35,3 +36,40 @@ func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) err return nil } + + +// ParallelProcessDirectoryStructure processes each entry in parallel, and also ensure parent directories are processed first. +// This also assumes the parent directories are in the entryChan already. +func ParallelProcessDirectoryStructure(entryChan chan *Entry, concurrency int, eachEntryFn func(entry *Entry)(error)) (firstErr error) { + + executors := util.NewLimitedConcurrentExecutor(concurrency) + + var wg sync.WaitGroup + for entry := range entryChan { + wg.Add(1) + if entry.IsDirectory() { + func() { + defer wg.Done() + if err := eachEntryFn(entry); err != nil { + if firstErr == nil { + firstErr = err + } + } + }() + } else { + executors.Execute(func() { + defer wg.Done() + if err := eachEntryFn(entry); err != nil { + if firstErr == nil { + firstErr = err + } + } + }) + } + if firstErr != nil { + break + } + } + wg.Wait() + return +}