You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

63 lines
1.5 KiB

  1. package hdfs
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  4. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  5. "github.com/chrislusf/seaweedfs/weed/util"
  6. "sync"
  7. "time"
  8. )
  9. type ListDirectoryFunc func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error
  10. func TraverseBfs(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn remote_storage.VisitFunc) (err error) {
  11. K := 5
  12. var dirQueueWg sync.WaitGroup
  13. dirQueue := util.NewQueue()
  14. dirQueueWg.Add(1)
  15. dirQueue.Enqueue(parentPath)
  16. var isTerminating bool
  17. for i := 0; i < K; i++ {
  18. go func() {
  19. for {
  20. if isTerminating {
  21. break
  22. }
  23. t := dirQueue.Dequeue()
  24. if t == nil {
  25. time.Sleep(329 * time.Millisecond)
  26. continue
  27. }
  28. dir := t.(util.FullPath)
  29. processErr := processOneDirectory(listDirFn, dir, visitFn, dirQueue, &dirQueueWg)
  30. if processErr != nil {
  31. err = processErr
  32. }
  33. dirQueueWg.Done()
  34. }
  35. }()
  36. }
  37. dirQueueWg.Wait()
  38. isTerminating = true
  39. return
  40. }
  41. func processOneDirectory(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn remote_storage.VisitFunc, dirQueue *util.Queue, dirQueueWg *sync.WaitGroup) (error) {
  42. return listDirFn(parentPath, func(dir string, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
  43. if err := visitFn(dir, name, isDirectory, remoteEntry); err != nil {
  44. return err
  45. }
  46. if !isDirectory {
  47. return nil
  48. }
  49. dirQueueWg.Add(1)
  50. dirQueue.Enqueue(parentPath.Child(name))
  51. return nil
  52. })
  53. }