You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

233 lines
7.0 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. filer_leveldb "github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  11. "google.golang.org/protobuf/proto"
  12. "io"
  13. "os"
  14. "path/filepath"
  15. "time"
  16. )
  17. const LevelDbPath = "/tmp/snapshots.db"
  18. const SnapshotsParentPath = "/seaweedfs_meta_snapshots/"
  19. const DateFormat = "2006-01-02"
  20. func init() {
  21. Commands = append(Commands, &commandFsMetaSnapshotsCreate{})
  22. }
  23. type commandFsMetaSnapshotsCreate struct {
  24. }
  25. func (c *commandFsMetaSnapshotsCreate) Name() string {
  26. return "fs.meta.snapshots.create"
  27. }
  28. func (c *commandFsMetaSnapshotsCreate) Help() string {
  29. return `create snapshots of meta data from given time range.
  30. fs.meta.snapshots.create -s=2022-08-09 -e=2022-10-12 -f 7 create snapshot starting from 2022-08-09 ending at 2022-10-12 every seven days.
  31. //These snapshot maybe later used to backup the system to certain timestamp.
  32. `
  33. }
  34. func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, snapshotCnt int, snapshotCheckPoints []time.Time, homeDir string) (err error) {
  35. var event filer_pb.SubscribeMetadataResponse
  36. err = proto.Unmarshal(data, &event)
  37. if err != nil {
  38. return err
  39. }
  40. fmt.Printf("%+v\n", event)
  41. eventTime := event.TsNs
  42. println(time.Unix(0, eventTime).Format(DateFormat))
  43. for snapshotCnt < len(snapshotCheckPoints) && time.Unix(0, eventTime).After(snapshotCheckPoints[snapshotCnt]) {
  44. snapshotPath := homeDir + SnapshotsParentPath + snapshotCheckPoints[snapshotCnt].Format(DateFormat)
  45. err = CreateIfNotExists(snapshotPath, 0755)
  46. if err != nil {
  47. return err
  48. }
  49. println("generating snapshots of metadata at: " + snapshotPath)
  50. err = generateSnapshots(homeDir+LevelDbPath, snapshotPath)
  51. if err != nil {
  52. return err
  53. }
  54. snapshotCnt++
  55. }
  56. if snapshotCnt == len(snapshotCheckPoints) {
  57. return nil
  58. }
  59. ctx := context.Background()
  60. if filer_pb.IsEmpty(&event) {
  61. return nil
  62. } else if filer_pb.IsCreate(&event) {
  63. println("+", util.FullPath(event.EventNotification.NewParentPath).Child(event.EventNotification.NewEntry.Name))
  64. entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
  65. return store.InsertEntry(ctx, entry)
  66. } else if filer_pb.IsDelete(&event) {
  67. println("-", util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name))
  68. return store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name))
  69. } else if filer_pb.IsUpdate(&event) {
  70. println("~", util.FullPath(event.EventNotification.NewParentPath).Child(event.EventNotification.NewEntry.Name))
  71. entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
  72. return store.UpdateEntry(ctx, entry)
  73. } else {
  74. // renaming
  75. println("-", util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name))
  76. if err := store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)); err != nil {
  77. return err
  78. }
  79. println("+", util.FullPath(event.EventNotification.NewParentPath).Child(event.EventNotification.NewEntry.Name))
  80. return store.InsertEntry(ctx, filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry))
  81. }
  82. return nil
  83. }
  84. func generateSnapshots(scrDir, dest string) error {
  85. entries, err := os.ReadDir(scrDir)
  86. if err != nil {
  87. return err
  88. }
  89. for _, entry := range entries {
  90. sourcePath := filepath.Join(scrDir, entry.Name())
  91. destPath := filepath.Join(dest, entry.Name())
  92. fileInfo, err := os.Stat(sourcePath)
  93. if err != nil {
  94. return err
  95. }
  96. switch fileInfo.Mode() & os.ModeType {
  97. case os.ModeDir:
  98. if err := CreateIfNotExists(destPath, 0755); err != nil {
  99. return err
  100. }
  101. if err := generateSnapshots(sourcePath, destPath); err != nil {
  102. return err
  103. }
  104. default:
  105. if err := Copy(sourcePath, destPath); err != nil {
  106. return err
  107. }
  108. }
  109. }
  110. return nil
  111. }
  112. func Copy(srcFile, dstFile string) error {
  113. out, err := os.Create(dstFile)
  114. if err != nil {
  115. return err
  116. }
  117. defer out.Close()
  118. in, err := os.Open(srcFile)
  119. defer in.Close()
  120. if err != nil {
  121. return err
  122. }
  123. _, err = io.Copy(out, in)
  124. if err != nil {
  125. return err
  126. }
  127. return nil
  128. }
  129. func Exists(filePath string) bool {
  130. if _, err := os.Stat(filePath); os.IsNotExist(err) {
  131. return false
  132. }
  133. return true
  134. }
  135. func CreateIfNotExists(dir string, perm os.FileMode) error {
  136. if Exists(dir) {
  137. return nil
  138. }
  139. if err := os.MkdirAll(dir, perm); err != nil {
  140. return fmt.Errorf("failed to create directory: '%s', error: '%s'", dir, err.Error())
  141. }
  142. return nil
  143. }
  144. func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  145. fsMetaSnapshotsCreateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  146. start := fsMetaSnapshotsCreateCommand.String("s", "", "start date of metadata compaction in yyyy-mm-dd format")
  147. end := fsMetaSnapshotsCreateCommand.String("e", "", "end date of metadata compaction in yyyy-mm-dd format")
  148. frequency := fsMetaSnapshotsCreateCommand.Int("f", 7, "the frequency of generating the metadata snapshots in days")
  149. if err = fsMetaSnapshotsCreateCommand.Parse(args); err != nil {
  150. return err
  151. }
  152. startDate, err := time.Parse(DateFormat, *start)
  153. if err != nil {
  154. return err
  155. }
  156. endDate, err := time.Parse(DateFormat, *end)
  157. if err != nil {
  158. return err
  159. }
  160. var snapshotCheckPoints []time.Time
  161. for startDate.Before(endDate) || startDate.Equal(endDate) {
  162. snapshotCheckPoints = append(snapshotCheckPoints, startDate)
  163. startDate = startDate.AddDate(0, 0, *frequency)
  164. }
  165. snapshotCnt := 0
  166. homeDirname, err := os.UserHomeDir()
  167. if err != nil {
  168. return err
  169. }
  170. levelDbPath := homeDirname + LevelDbPath
  171. err = os.RemoveAll(levelDbPath)
  172. if err != nil {
  173. return err
  174. }
  175. store := &filer_leveldb.LevelDBStore{}
  176. store.CustomInitialize(levelDbPath)
  177. path := filer.SystemLogDir
  178. var processEntry func(entry *filer_pb.Entry, isLast bool) error
  179. processEntry = func(entry *filer_pb.Entry, isLast bool) error {
  180. if entry.IsDirectory {
  181. return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(path+"/"+entry.Name), "", processEntry)
  182. }
  183. totalSize := filer.FileSize(entry)
  184. buf := mem.Allocate(int(totalSize))
  185. if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  186. return filer.ReadAll(buf, commandEnv.MasterClient, entry.GetChunks())
  187. }); err != nil && err != filer_pb.ErrNotFound {
  188. return err
  189. }
  190. idx := uint32(0)
  191. for idx < uint32(totalSize) {
  192. logEntrySize := util.BytesToUint32(buf[idx : idx+4])
  193. var logEntry filer_pb.LogEntry
  194. err = proto.Unmarshal(buf[idx+4:idx+4+logEntrySize], &logEntry)
  195. if err != nil {
  196. return err
  197. }
  198. idx = idx + 4 + logEntrySize
  199. err = processMetaDataEvents(store, logEntry.Data, snapshotCnt, snapshotCheckPoints, homeDirname)
  200. if err != nil {
  201. return err
  202. }
  203. }
  204. return nil
  205. }
  206. err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(path), "", processEntry)
  207. return err
  208. }