You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

248 lines
6.9 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. filer_leveldb "github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  11. "google.golang.org/protobuf/proto"
  12. "io"
  13. "os"
  14. "path/filepath"
  15. "time"
  16. )
  17. const LevelDbPath = "/tmp/snapshots.db"
  18. const DateFormat = "2006-01-02"
  19. func init() {
  20. Commands = append(Commands, &commandFsMetaSnapshotsCreate{})
  21. }
  22. type commandFsMetaSnapshotsCreate struct {
  23. }
  24. func (c *commandFsMetaSnapshotsCreate) Name() string {
  25. return "fs.meta.snapshots.create"
  26. }
  27. type SnapshotConfig struct {
  28. dir string
  29. }
  30. func (c SnapshotConfig) GetString(key string) string {
  31. return c.dir
  32. }
  33. func (c SnapshotConfig) GetBool(key string) bool {
  34. panic("implement me")
  35. }
  36. func (c SnapshotConfig) GetInt(key string) int {
  37. panic("implement me")
  38. }
  39. func (c SnapshotConfig) GetStringSlice(key string) []string {
  40. panic("implement me")
  41. }
  42. func (c SnapshotConfig) SetDefault(key string, value interface{}) {
  43. panic("implement me")
  44. }
  45. func (c *commandFsMetaSnapshotsCreate) Help() string {
  46. return `create snapshots of meta data from given time range.
  47. fs.meta.snapshots.create -snapshot-interval=7 -snapshot-cnt=3 -path=/your/path
  48. // fs.meta.snapshots.create will generate desired number of snapshots with desired duration interval from yesterday the generated files will be saved from input path.
  49. // These snapshot maybe later used to backup the system to certain timestamp.
  50. `
  51. }
  52. func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, snapshotCheckPoints []time.Time, homeDir string, snapshotPath string) (err error) {
  53. var event filer_pb.SubscribeMetadataResponse
  54. err = proto.Unmarshal(data, &event)
  55. if err != nil {
  56. return err
  57. }
  58. eventTime := event.TsNs
  59. snapshotCnt := len(snapshotCheckPoints) - 1
  60. for snapshotCnt >= 0 && time.Unix(0, eventTime).After(snapshotCheckPoints[snapshotCnt]) {
  61. snapshotPath := homeDir + snapshotPath + snapshotCheckPoints[snapshotCnt].Format(DateFormat)
  62. err = CreateIfNotExists(snapshotPath, 0755)
  63. if err != nil {
  64. return err
  65. }
  66. err = generateSnapshots(homeDir+LevelDbPath, snapshotPath)
  67. if err != nil {
  68. return err
  69. }
  70. snapshotCnt++
  71. }
  72. if snapshotCnt == len(snapshotCheckPoints) {
  73. return nil
  74. }
  75. ctx := context.Background()
  76. if filer_pb.IsEmpty(&event) {
  77. return nil
  78. } else if filer_pb.IsCreate(&event) {
  79. entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
  80. return store.InsertEntry(ctx, entry)
  81. } else if filer_pb.IsDelete(&event) {
  82. return store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name))
  83. } else if filer_pb.IsUpdate(&event) {
  84. entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
  85. return store.UpdateEntry(ctx, entry)
  86. } else {
  87. if err := store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)); err != nil {
  88. return err
  89. }
  90. return store.InsertEntry(ctx, filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry))
  91. }
  92. return nil
  93. }
  94. func generateSnapshots(scrDir, dest string) error {
  95. entries, err := os.ReadDir(scrDir)
  96. if err != nil {
  97. return err
  98. }
  99. for _, entry := range entries {
  100. sourcePath := filepath.Join(scrDir, entry.Name())
  101. destPath := filepath.Join(dest, entry.Name())
  102. fileInfo, err := os.Stat(sourcePath)
  103. if err != nil {
  104. return err
  105. }
  106. switch fileInfo.Mode() & os.ModeType {
  107. case os.ModeDir:
  108. if err := CreateIfNotExists(destPath, 0755); err != nil {
  109. return err
  110. }
  111. if err := generateSnapshots(sourcePath, destPath); err != nil {
  112. return err
  113. }
  114. default:
  115. if err := Copy(sourcePath, destPath); err != nil {
  116. return err
  117. }
  118. }
  119. }
  120. return nil
  121. }
  122. func Copy(srcFile, dstFile string) error {
  123. out, err := os.Create(dstFile)
  124. if err != nil {
  125. return err
  126. }
  127. defer out.Close()
  128. in, err := os.Open(srcFile)
  129. defer in.Close()
  130. if err != nil {
  131. return err
  132. }
  133. _, err = io.Copy(out, in)
  134. if err != nil {
  135. return err
  136. }
  137. return nil
  138. }
  139. func Exists(filePath string) bool {
  140. if _, err := os.Stat(filePath); os.IsNotExist(err) {
  141. return false
  142. }
  143. return true
  144. }
  145. func CreateIfNotExists(dir string, perm os.FileMode) error {
  146. if Exists(dir) {
  147. return nil
  148. }
  149. if err := os.MkdirAll(dir, perm); err != nil {
  150. return fmt.Errorf("failed to create directory: '%s', error: '%s'", dir, err.Error())
  151. }
  152. return nil
  153. }
  154. func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  155. fsMetaSnapshotsCreateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  156. snapshotPath := fsMetaSnapshotsCreateCommand.String("path", "", "the path to store generated snapshot files")
  157. snapshotCnt := fsMetaSnapshotsCreateCommand.Int("snapshot-cnt", 3, "number of snapshots generated")
  158. snapshotInterval := fsMetaSnapshotsCreateCommand.Int("snapshot-interval", 7, "the duration interval between each generated snapshot")
  159. if err = fsMetaSnapshotsCreateCommand.Parse(args); err != nil {
  160. return err
  161. }
  162. yesterday := time.Now().Add(-time.Hour * 24).Format(DateFormat)
  163. // ensure snapshot start at yesterday 00:00
  164. snapshotDate, err := time.Parse(DateFormat, yesterday)
  165. if err != nil {
  166. return err
  167. }
  168. var snapshotCheckPoints []time.Time
  169. for i := 0; i < *snapshotCnt; i++ {
  170. snapshotCheckPoints = append(snapshotCheckPoints, snapshotDate)
  171. snapshotDate = snapshotDate.AddDate(0, 0, -1**snapshotInterval)
  172. }
  173. homeDirname, err := os.UserHomeDir()
  174. if err != nil {
  175. return err
  176. }
  177. levelDbPath := homeDirname + LevelDbPath
  178. err = os.RemoveAll(levelDbPath)
  179. if err != nil {
  180. return err
  181. }
  182. store := &filer_leveldb.LevelDBStore{}
  183. config := SnapshotConfig{
  184. dir: levelDbPath,
  185. }
  186. store.Initialize(config, "")
  187. changeLogPath := filer.SystemLogDir
  188. var processEntry func(entry *filer_pb.Entry, isLast bool) error
  189. processEntry = func(entry *filer_pb.Entry, isLast bool) error {
  190. if entry.IsDirectory {
  191. return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath+"/"+entry.Name), "", processEntry)
  192. }
  193. totalSize := filer.FileSize(entry)
  194. buf := mem.Allocate(int(totalSize))
  195. if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  196. return filer.ReadAll(buf, commandEnv.MasterClient, entry.GetChunks())
  197. }); err != nil && err != filer_pb.ErrNotFound {
  198. return err
  199. }
  200. idx := uint32(0)
  201. for idx < uint32(totalSize) {
  202. logEntrySize := util.BytesToUint32(buf[idx : idx+4])
  203. var logEntry filer_pb.LogEntry
  204. err = proto.Unmarshal(buf[idx+4:idx+4+logEntrySize], &logEntry)
  205. if err != nil {
  206. return err
  207. }
  208. idx = idx + 4 + logEntrySize
  209. err = processMetaDataEvents(store, logEntry.Data, snapshotCheckPoints, homeDirname, *snapshotPath)
  210. if err != nil {
  211. return err
  212. }
  213. }
  214. return nil
  215. }
  216. err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath), "", processEntry)
  217. return err
  218. }