Browse Source

refactor

pull/1293/head
Chris Lu 5 years ago
parent
commit
9f9826e95a
  1. 62
      weed/pb/filer_pb/filer_client_bfs.go
  2. 2
      weed/shell/command_fs_meta_notify.go
  3. 57
      weed/shell/command_fs_meta_save.go

62
weed/pb/filer_pb/filer_client_bfs.go

@ -0,0 +1,62 @@
package filer_pb
import (
"fmt"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/util"
)
func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
K := 5
var jobQueueWg sync.WaitGroup
queue := util.NewQueue()
jobQueueWg.Add(1)
queue.Enqueue(parentPath)
var isTerminating bool
for i := 0; i < K; i++ {
go func() {
for {
if isTerminating {
break
}
t := queue.Dequeue()
if t == nil {
time.Sleep(329 * time.Millisecond)
continue
}
dir := t.(util.FullPath)
processErr := processOneDirectory(filerClient, dir, queue, &jobQueueWg, fn)
if processErr != nil {
err = processErr
}
jobQueueWg.Done()
}
}()
}
jobQueueWg.Wait()
isTerminating = true
return
}
func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
return ReadDirAllEntries(filerClient, parentPath, "", func(entry *Entry, isLast bool) {
fn(parentPath, entry)
if entry.IsDirectory {
subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name)
if parentPath == "/" {
subDir = "/" + entry.Name
}
jobQueueWg.Add(1)
queue.Enqueue(util.FullPath(subDir))
}
})
}

2
weed/shell/command_fs_meta_notify.go

@ -43,7 +43,7 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i
var dirCount, fileCount uint64 var dirCount, fileCount uint64
err = doTraverseBfs(writer, commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) {
err = filer_pb.TraverseBfs(commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) {
if entry.IsDirectory { if entry.IsDirectory {
dirCount++ dirCount++

57
weed/shell/command_fs_meta_save.go

@ -96,7 +96,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
} }
func doTraverseBfsAndSaving(commandEnv *CommandEnv, writer io.Writer, path string, verbose bool, saveFn func(outputChan chan interface{}), genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error) error {
func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, saveFn func(outputChan chan interface{}), genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error) error {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
@ -108,7 +108,7 @@ func doTraverseBfsAndSaving(commandEnv *CommandEnv, writer io.Writer, path strin
var dirCount, fileCount uint64 var dirCount, fileCount uint64
err := doTraverseBfs(writer, commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) {
err := filer_pb.TraverseBfs(filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) {
protoMessage := &filer_pb.FullEntry{ protoMessage := &filer_pb.FullEntry{
Dir: string(parentPath), Dir: string(parentPath),
@ -141,56 +141,3 @@ func doTraverseBfsAndSaving(commandEnv *CommandEnv, writer io.Writer, path strin
} }
return err return err
} }
func doTraverseBfs(writer io.Writer, filerClient filer_pb.FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *filer_pb.Entry)) (err error) {
K := 5
var jobQueueWg sync.WaitGroup
queue := util.NewQueue()
jobQueueWg.Add(1)
queue.Enqueue(parentPath)
var isTerminating bool
for i := 0; i < K; i++ {
go func() {
for {
if isTerminating {
break
}
t := queue.Dequeue()
if t == nil {
time.Sleep(329 * time.Millisecond)
continue
}
dir := t.(util.FullPath)
processErr := processOneDirectory(writer, filerClient, dir, queue, &jobQueueWg, fn)
if processErr != nil {
err = processErr
}
jobQueueWg.Done()
}
}()
}
jobQueueWg.Wait()
isTerminating = true
return
}
func processOneDirectory(writer io.Writer, filerClient filer_pb.FilerClient, parentPath util.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *filer_pb.Entry)) (err error) {
return filer_pb.ReadDirAllEntries(filerClient, parentPath, "", func(entry *filer_pb.Entry, isLast bool) {
fn(parentPath, entry)
if entry.IsDirectory {
subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name)
if parentPath == "/" {
subDir = "/" + entry.Name
}
jobQueueWg.Add(1)
queue.Enqueue(util.FullPath(subDir))
}
})
}
Loading…
Cancel
Save