You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

214 lines
5.6 KiB

5 years ago
5 years ago
5 years ago
5 years ago
  1. package shell
  2. import (
  3. "flag"
  4. "fmt"
  5. "io"
  6. "os"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/golang/protobuf/proto"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandFsMetaSave{})
  16. }
  17. type commandFsMetaSave struct {
  18. }
  19. func (c *commandFsMetaSave) Name() string {
  20. return "fs.meta.save"
  21. }
  22. func (c *commandFsMetaSave) Help() string {
  23. return `save all directory and file meta data to a local file for metadata backup.
  24. fs.meta.save / # save from the root
  25. fs.meta.save -v -o t.meta / # save from the root, output to t.meta file.
  26. fs.meta.save /path/to/save # save from the directory /path/to/save
  27. fs.meta.save . # save from current directory
  28. fs.meta.save # save from current directory
  29. The meta data will be saved into a local <filer_host>-<port>-<time>.meta file.
  30. These meta data can be later loaded by fs.meta.load command,
  31. This assumes there are no deletions, so this is different from taking a snapshot.
  32. `
  33. }
  34. func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  35. fsMetaSaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  36. verbose := fsMetaSaveCommand.Bool("v", false, "print out each processed files")
  37. outputFileName := fsMetaSaveCommand.String("o", "", "output the meta data to this file")
  38. chunksFileName := fsMetaSaveCommand.String("chunks", "", "output all the chunks to this file")
  39. if err = fsMetaSaveCommand.Parse(args); err != nil {
  40. return nil
  41. }
  42. path, parseErr := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args()))
  43. if parseErr != nil {
  44. return parseErr
  45. }
  46. if *outputFileName != "" {
  47. fileName := *outputFileName
  48. if fileName == "" {
  49. t := time.Now()
  50. fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta",
  51. commandEnv.option.FilerHost, commandEnv.option.FilerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second())
  52. }
  53. return doTraverseBfsAndSaving(fileName, commandEnv, writer, path, *verbose, func(dst io.Writer, outputChan chan []byte) {
  54. sizeBuf := make([]byte, 4)
  55. for b := range outputChan {
  56. util.Uint32toBytes(sizeBuf, uint32(len(b)))
  57. dst.Write(sizeBuf)
  58. dst.Write(b)
  59. }
  60. }, func(entry *filer_pb.FullEntry, outputChan chan []byte) (err error) {
  61. bytes, err := proto.Marshal(entry)
  62. if err != nil {
  63. fmt.Fprintf(writer, "marshall error: %v\n", err)
  64. return
  65. }
  66. outputChan <- bytes
  67. return nil
  68. })
  69. }
  70. if *chunksFileName != "" {
  71. return doTraverseBfsAndSaving(*chunksFileName, commandEnv, writer, path, *verbose, func(dst io.Writer, outputChan chan []byte) {
  72. for b := range outputChan {
  73. dst.Write(b)
  74. }
  75. }, func(entry *filer_pb.FullEntry, outputChan chan []byte) (err error) {
  76. for _, chunk := range entry.Entry.Chunks {
  77. dir := entry.Dir
  78. if dir == "/" {
  79. dir = ""
  80. }
  81. outputLine := fmt.Sprintf("%d\t%s\t%s/%s\n", chunk.Fid.FileKey, chunk.FileId, dir, entry.Entry.Name)
  82. outputChan <- []byte(outputLine)
  83. }
  84. return nil
  85. })
  86. }
  87. return err
  88. }
  89. func doTraverseBfsAndSaving(fileName string, commandEnv *CommandEnv, writer io.Writer, path string, verbose bool,
  90. saveFn func(dst io.Writer, outputChan chan []byte),
  91. genFn func(entry *filer_pb.FullEntry, outputChan chan []byte) error) error {
  92. dst, openErr := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  93. if openErr != nil {
  94. return fmt.Errorf("failed to create file %s: %v", fileName, openErr)
  95. }
  96. defer dst.Close()
  97. var wg sync.WaitGroup
  98. wg.Add(1)
  99. outputChan := make(chan []byte, 1024)
  100. go func() {
  101. saveFn(dst, outputChan)
  102. wg.Done()
  103. }()
  104. var dirCount, fileCount uint64
  105. err := doTraverseBfs(writer, commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) {
  106. protoMessage := &filer_pb.FullEntry{
  107. Dir: string(parentPath),
  108. Entry: entry,
  109. }
  110. if err := genFn(protoMessage, outputChan); err != nil {
  111. fmt.Fprintf(writer, "marshall error: %v\n", err)
  112. return
  113. }
  114. if entry.IsDirectory {
  115. atomic.AddUint64(&dirCount, 1)
  116. } else {
  117. atomic.AddUint64(&fileCount, 1)
  118. }
  119. if verbose {
  120. println(parentPath.Child(entry.Name))
  121. }
  122. })
  123. close(outputChan)
  124. wg.Wait()
  125. if err == nil {
  126. fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount)
  127. }
  128. return err
  129. }
  130. func doTraverseBfs(writer io.Writer, filerClient filer_pb.FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *filer_pb.Entry)) (err error) {
  131. K := 5
  132. var jobQueueWg sync.WaitGroup
  133. queue := util.NewQueue()
  134. jobQueueWg.Add(1)
  135. queue.Enqueue(parentPath)
  136. var isTerminating bool
  137. for i := 0; i < K; i++ {
  138. go func() {
  139. for {
  140. if isTerminating {
  141. break
  142. }
  143. t := queue.Dequeue()
  144. if t == nil {
  145. time.Sleep(329 * time.Millisecond)
  146. continue
  147. }
  148. dir := t.(util.FullPath)
  149. processErr := processOneDirectory(writer, filerClient, dir, queue, &jobQueueWg, fn)
  150. if processErr != nil {
  151. err = processErr
  152. }
  153. jobQueueWg.Done()
  154. }
  155. }()
  156. }
  157. jobQueueWg.Wait()
  158. isTerminating = true
  159. return
  160. }
  161. func processOneDirectory(writer io.Writer, filerClient filer_pb.FilerClient, parentPath util.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *filer_pb.Entry)) (err error) {
  162. return filer_pb.ReadDirAllEntries(filerClient, parentPath, "", func(entry *filer_pb.Entry, isLast bool) {
  163. fn(parentPath, entry)
  164. if entry.IsDirectory {
  165. subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name)
  166. if parentPath == "/" {
  167. subDir = "/" + entry.Name
  168. }
  169. jobQueueWg.Add(1)
  170. queue.Enqueue(util.FullPath(subDir))
  171. }
  172. })
  173. }