diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go index 13b272fbf..4fe0e45a9 100644 --- a/weed/shell/command_fs_meta_notify.go +++ b/weed/shell/command_fs_meta_notify.go @@ -50,7 +50,7 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i var dirCount, fileCount uint64 - err = doTraverse(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error { + err = doTraverseBFS(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error { if entry.IsDirectory { dirCount++ diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index e710fe297..dd5e9defb 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -6,12 +6,15 @@ import ( "fmt" "io" "os" + "sync" + "sync/atomic" "time" + "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/golang/protobuf/proto" ) func init() { @@ -75,9 +78,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. var dirCount, fileCount uint64 - sizeBuf := make([]byte, 4) - - err = doTraverse(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error { + err = doTraverseBFS(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error { protoMessage := &filer_pb.FullEntry{ Dir: string(parentPath), @@ -89,15 +90,16 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("marshall error: %v", err) } + sizeBuf := make([]byte, 4) util.Uint32toBytes(sizeBuf, uint32(len(bytes))) dst.Write(sizeBuf) dst.Write(bytes) if entry.IsDirectory { - dirCount++ + atomic.AddUint64(&dirCount, 1) } else { - fileCount++ + atomic.AddUint64(&fileCount, 1) } if *verbose { @@ -118,7 +120,45 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. }) } -func doTraverse(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) { +func doTraverseBFS(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, + parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) { + + K := 5 + + var jobQueueWg sync.WaitGroup + queue := util.NewQueue() + jobQueueWg.Add(1) + queue.Enqueue(parentPath) + var isTerminating bool + + for i := 0; i < K; i++ { + go func() { + for { + if isTerminating { + break + } + t := queue.Dequeue() + if t == nil { + time.Sleep(329 * time.Millisecond) + continue + } + dir := t.(filer2.FullPath) + processErr := processOneDirectory(ctx, writer, client, dir, queue, &jobQueueWg, fn) + if processErr != nil { + err = processErr + } + jobQueueWg.Done() + } + }() + } + jobQueueWg.Wait() + isTerminating = true + return +} + +func processOneDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, + parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, + fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) { paginatedCount := -1 startFromFileName := "" @@ -150,12 +190,10 @@ func doTraverse(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFi if parentPath == "/" { subDir = "/" + entry.Name } - if err = doTraverse(ctx, writer, client, filer2.FullPath(subDir), fn); err != nil { - return err - } + jobQueueWg.Add(1) + queue.Enqueue(filer2.FullPath(subDir)) } startFromFileName = entry.Name - } } diff --git a/weed/util/queue.go b/weed/util/queue.go new file mode 100644 index 000000000..31d9d1769 --- /dev/null +++ b/weed/util/queue.go @@ -0,0 +1,61 @@ +package util + +import "sync" + +type node struct { + data interface{} + next *node +} + +type Queue struct { + head *node + tail *node + count int + sync.RWMutex +} + +func NewQueue() *Queue { + q := &Queue{} + return q +} + +func (q *Queue) Len() int { + q.RLock() + defer q.RUnlock() + return q.count +} + +func (q *Queue) Enqueue(item interface{}) { + q.Lock() + defer q.Unlock() + + n := &node{data: item} + + if q.tail == nil { + q.tail = n + q.head = n + } else { + q.tail.next = n + q.tail = n + } + q.count++ +} + +func (q *Queue) Dequeue() interface{} { + q.Lock() + defer q.Unlock() + + if q.head == nil { + return nil + } + + n := q.head + q.head = n.next + + if q.head == nil { + q.tail = nil + } + q.count-- + + return n.data +} \ No newline at end of file