You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

264 lines
7.8 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. filer_leveldb "github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  11. "google.golang.org/protobuf/proto"
  12. "io"
  13. "os"
  14. "path/filepath"
  15. "time"
  16. )
  17. const LevelDbPath = "/tmp/snapshots.db"
  18. const DateFormat = "2006-01-02"
  19. func init() {
  20. Commands = append(Commands, &commandFsMetaSnapshotsCreate{})
  21. }
  22. type commandFsMetaSnapshotsCreate struct {
  23. }
  24. func (c *commandFsMetaSnapshotsCreate) Name() string {
  25. return "fs.meta.snapshots.create"
  26. }
  27. type SnapshotConfig struct {
  28. dir string
  29. }
  30. func (c SnapshotConfig) GetString(key string) string {
  31. return c.dir
  32. }
  33. func (c SnapshotConfig) GetBool(key string) bool {
  34. panic("implement me")
  35. }
  36. func (c SnapshotConfig) GetInt(key string) int {
  37. panic("implement me")
  38. }
  39. func (c SnapshotConfig) GetStringSlice(key string) []string {
  40. panic("implement me")
  41. }
  42. func (c SnapshotConfig) SetDefault(key string, value interface{}) {
  43. panic("implement me")
  44. }
  45. func (c *commandFsMetaSnapshotsCreate) Help() string {
  46. return `create snapshots of meta data from given time range.
  47. fs.meta.snapshots.create -snapshot-interval=7 -snapshot-cnt=3 -path=/your/path
  48. // fs.meta.snapshots.create will generate desired number of snapshots with desired duration interval from yesterday the generated files will be saved from input path.
  49. // These snapshot maybe later used to backup the system to certain timestamp.
  50. `
  51. }
  52. func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, unfinshiedSnapshotCnt int, snapshotCheckPoints []time.Time, homeDir string, snapshotPath string) (todoCnt int, err error) {
  53. var event filer_pb.SubscribeMetadataResponse
  54. err = proto.Unmarshal(data, &event)
  55. if err != nil {
  56. return unfinshiedSnapshotCnt, err
  57. }
  58. eventTime := event.TsNs
  59. for unfinshiedSnapshotCnt >= 0 && time.Unix(0, eventTime).After(snapshotCheckPoints[unfinshiedSnapshotCnt]) {
  60. snapshotPath := filepath.Join(homeDir, snapshotPath, snapshotCheckPoints[unfinshiedSnapshotCnt].Format(DateFormat))
  61. err = CreateIfNotExists(snapshotPath, 0755)
  62. if err != nil {
  63. return unfinshiedSnapshotCnt, err
  64. }
  65. err = generateSnapshots(homeDir+LevelDbPath, snapshotPath)
  66. if err != nil {
  67. return unfinshiedSnapshotCnt, err
  68. }
  69. unfinshiedSnapshotCnt--
  70. }
  71. if unfinshiedSnapshotCnt < 0 {
  72. return unfinshiedSnapshotCnt, nil
  73. }
  74. ctx := context.Background()
  75. if filer_pb.IsEmpty(&event) {
  76. return unfinshiedSnapshotCnt, nil
  77. } else if filer_pb.IsCreate(&event) {
  78. entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
  79. return unfinshiedSnapshotCnt, store.InsertEntry(ctx, entry)
  80. } else if filer_pb.IsDelete(&event) {
  81. return unfinshiedSnapshotCnt, store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name))
  82. } else if filer_pb.IsUpdate(&event) {
  83. entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
  84. return unfinshiedSnapshotCnt, store.UpdateEntry(ctx, entry)
  85. } else {
  86. if err := store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)); err != nil {
  87. return unfinshiedSnapshotCnt, err
  88. }
  89. return unfinshiedSnapshotCnt, store.InsertEntry(ctx, filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry))
  90. }
  91. return unfinshiedSnapshotCnt, nil
  92. }
  93. func generateSnapshots(scrDir, dest string) error {
  94. entries, err := os.ReadDir(scrDir)
  95. if err != nil {
  96. return err
  97. }
  98. for _, entry := range entries {
  99. sourcePath := filepath.Join(scrDir, entry.Name())
  100. destPath := filepath.Join(dest, entry.Name())
  101. fileInfo, err := os.Stat(sourcePath)
  102. if err != nil {
  103. return err
  104. }
  105. switch fileInfo.Mode() & os.ModeType {
  106. case os.ModeDir:
  107. if err := CreateIfNotExists(destPath, 0755); err != nil {
  108. return err
  109. }
  110. if err := generateSnapshots(sourcePath, destPath); err != nil {
  111. return err
  112. }
  113. default:
  114. if err := Copy(sourcePath, destPath); err != nil {
  115. return err
  116. }
  117. }
  118. }
  119. return nil
  120. }
  121. func Copy(srcFile, dstFile string) error {
  122. out, err := os.Create(dstFile)
  123. if err != nil {
  124. return err
  125. }
  126. defer out.Close()
  127. in, err := os.Open(srcFile)
  128. defer in.Close()
  129. if err != nil {
  130. return err
  131. }
  132. _, err = io.Copy(out, in)
  133. if err != nil {
  134. return err
  135. }
  136. return nil
  137. }
  138. func Exists(filePath string) bool {
  139. if _, err := os.Stat(filePath); os.IsNotExist(err) {
  140. return false
  141. }
  142. return true
  143. }
  144. func CreateIfNotExists(dir string, perm os.FileMode) error {
  145. if Exists(dir) {
  146. return nil
  147. }
  148. if err := os.MkdirAll(dir, perm); err != nil {
  149. return fmt.Errorf("failed to create directory: '%s', error: '%s'", dir, err.Error())
  150. }
  151. return nil
  152. }
  153. func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  154. fsMetaSnapshotsCreateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  155. snapshotPath := fsMetaSnapshotsCreateCommand.String("path", "", "the path to store generated snapshot files")
  156. snapshotCnt := fsMetaSnapshotsCreateCommand.Int("snapshot-cnt", 3, "number of snapshots generated")
  157. snapshotInterval := fsMetaSnapshotsCreateCommand.Int("snapshot-interval", 7, "the duration interval between each generated snapshot")
  158. if err = fsMetaSnapshotsCreateCommand.Parse(args); err != nil {
  159. return err
  160. }
  161. yesterday := time.Now().Add(-time.Hour * 24).Format(DateFormat)
  162. // ensure snapshot start at yesterday 00:00
  163. snapshotDate, err := time.Parse(DateFormat, yesterday)
  164. if err != nil {
  165. return err
  166. }
  167. var snapshotCheckPoints []time.Time
  168. for i := 0; i < *snapshotCnt; i++ {
  169. snapshotCheckPoints = append(snapshotCheckPoints, snapshotDate)
  170. snapshotDate = snapshotDate.AddDate(0, 0, -1**snapshotInterval)
  171. }
  172. homeDirname, err := os.UserHomeDir()
  173. if err != nil {
  174. return err
  175. }
  176. levelDbPath := homeDirname + LevelDbPath
  177. err = os.RemoveAll(levelDbPath)
  178. if err != nil {
  179. return err
  180. }
  181. store := &filer_leveldb.LevelDBStore{}
  182. config := SnapshotConfig{
  183. dir: levelDbPath,
  184. }
  185. store.Initialize(config, "")
  186. unfinishedSnapshotCnt := len(snapshotCheckPoints) - 1
  187. changeLogPath := filer.SystemLogDir
  188. var processEntry func(entry *filer_pb.Entry, isLast bool) error
  189. processEntry = func(entry *filer_pb.Entry, isLast bool) error {
  190. if entry.IsDirectory {
  191. return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath+"/"+entry.Name), "", processEntry)
  192. }
  193. totalSize := filer.FileSize(entry)
  194. buf := mem.Allocate(int(totalSize))
  195. if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  196. return filer.ReadAll(buf, commandEnv.MasterClient, entry.GetChunks())
  197. }); err != nil && err != filer_pb.ErrNotFound {
  198. return err
  199. }
  200. idx := uint32(0)
  201. for idx < uint32(totalSize) {
  202. logEntrySize := util.BytesToUint32(buf[idx : idx+4])
  203. var logEntry filer_pb.LogEntry
  204. err = proto.Unmarshal(buf[idx+4:idx+4+logEntrySize], &logEntry)
  205. if err != nil {
  206. return err
  207. }
  208. idx = idx + 4 + logEntrySize
  209. unfinishedSnapshotCnt, err = processMetaDataEvents(store, logEntry.Data, unfinishedSnapshotCnt, snapshotCheckPoints, homeDirname, *snapshotPath)
  210. if err != nil {
  211. return err
  212. }
  213. }
  214. // edge case
  215. // there might be unfinished snapshot left over in the duration gaps.
  216. // process meta event only triggers snapshots when there are event after the snapshot time
  217. for unfinishedSnapshotCnt >= 0 {
  218. generatePath := filepath.Join(homeDirname, *snapshotPath, snapshotCheckPoints[unfinishedSnapshotCnt].Format(DateFormat))
  219. err = CreateIfNotExists(generatePath, 0755)
  220. if err != nil {
  221. return err
  222. }
  223. err = generateSnapshots(levelDbPath, generatePath)
  224. if err != nil {
  225. return err
  226. }
  227. unfinishedSnapshotCnt--
  228. }
  229. return nil
  230. }
  231. err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath), "", processEntry)
  232. return err
  233. }