|
|
@ -5,6 +5,7 @@ import ( |
|
|
"fmt" |
|
|
"fmt" |
|
|
"io" |
|
|
"io" |
|
|
"sync" |
|
|
"sync" |
|
|
|
|
|
"sync/atomic" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
@ -23,7 +24,7 @@ func TraverseBfs(ctx context.Context, filerClient FilerClient, parentPath util.F |
|
|
pending.Add(1) |
|
|
pending.Add(1) |
|
|
queue.Enqueue(parentPath) |
|
|
queue.Enqueue(parentPath) |
|
|
|
|
|
|
|
|
var once sync.Once |
|
|
|
|
|
|
|
|
var hasError int32 |
|
|
var firstErr error |
|
|
var firstErr error |
|
|
|
|
|
|
|
|
enqueue := func(p util.FullPath) bool { |
|
|
enqueue := func(p util.FullPath) bool { |
|
|
@ -64,10 +65,10 @@ func TraverseBfs(ctx context.Context, filerClient FilerClient, parentPath util.F |
|
|
if ctx.Err() == nil { |
|
|
if ctx.Err() == nil { |
|
|
processErr := processOneDirectory(ctx, filerClient, dir, enqueue, fn) |
|
|
processErr := processOneDirectory(ctx, filerClient, dir, enqueue, fn) |
|
|
if processErr != nil { |
|
|
if processErr != nil { |
|
|
once.Do(func() { |
|
|
|
|
|
|
|
|
if atomic.CompareAndSwapInt32(&hasError, 0, 1) { |
|
|
firstErr = processErr |
|
|
firstErr = processErr |
|
|
cancel() |
|
|
cancel() |
|
|
}) |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
pending.Done() |
|
|
pending.Done() |
|
|
|