You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

298 lines
8.9 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. filer_leveldb "github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  11. "google.golang.org/protobuf/proto"
  12. "io"
  13. "os"
  14. "path/filepath"
  15. "sort"
  16. "time"
  17. )
  18. const LevelDbPath = "tmp/snapshots.db"
  19. const DateFormat = "2006-01-02"
  20. const SnapshotDirPostFix = "-snapshot"
  21. func init() {
  22. Commands = append(Commands, &commandFsMetaSnapshotsCreate{})
  23. }
  24. type commandFsMetaSnapshotsCreate struct {
  25. }
  26. func (c *commandFsMetaSnapshotsCreate) Name() string {
  27. return "fs.meta.snapshots.create"
  28. }
  29. type SnapshotConfig struct {
  30. dir string
  31. }
  32. func (c SnapshotConfig) GetString(key string) string {
  33. return c.dir
  34. }
  35. func (c SnapshotConfig) GetBool(key string) bool {
  36. panic("implement me")
  37. }
  38. func (c SnapshotConfig) GetInt(key string) int {
  39. panic("implement me")
  40. }
  41. func (c SnapshotConfig) GetStringSlice(key string) []string {
  42. panic("implement me")
  43. }
  44. func (c SnapshotConfig) SetDefault(key string, value interface{}) {
  45. panic("implement me")
  46. }
  47. func (c *commandFsMetaSnapshotsCreate) Help() string {
  48. return `create snapshots of meta data from given time range.
  49. fs.meta.snapshots.create -interval-days=7 -count=3 -path=/your/path
  50. // fs.meta.snapshots.create will generate desired number of snapshots with desired duration interval from yesterday the generated files will be saved from input path.
  51. // These snapshot maybe later used to backup the system to certain timestamp.
  52. // path input is relative to home directory.
  53. `
  54. }
  55. func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, count int, snapshotCheckPoints []time.Time, homeDir string, snapshotPath string) (SnapshotCount int, err error) {
  56. var event filer_pb.SubscribeMetadataResponse
  57. err = proto.Unmarshal(data, &event)
  58. if err != nil {
  59. return count, err
  60. }
  61. eventTime := event.TsNs
  62. for count < len(snapshotCheckPoints) && time.Unix(0, eventTime).After(snapshotCheckPoints[count].Add(-time.Microsecond)) {
  63. snapshotPath := filepath.Join(homeDir, snapshotPath, snapshotCheckPoints[count].Format(DateFormat)+SnapshotDirPostFix)
  64. err = createIfNotExists(snapshotPath, 0755)
  65. if err != nil {
  66. return count, err
  67. }
  68. err = generateSnapshots(homeDir+LevelDbPath, snapshotPath)
  69. if err != nil {
  70. return count, err
  71. }
  72. count++
  73. }
  74. if count < 0 {
  75. return count, nil
  76. }
  77. ctx := context.Background()
  78. if filer_pb.IsEmpty(&event) {
  79. return count, nil
  80. } else if filer_pb.IsCreate(&event) {
  81. entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
  82. return count, store.InsertEntry(ctx, entry)
  83. } else if filer_pb.IsDelete(&event) {
  84. return count, store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name))
  85. } else if filer_pb.IsUpdate(&event) {
  86. entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
  87. return count, store.UpdateEntry(ctx, entry)
  88. } else {
  89. if err := store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)); err != nil {
  90. return count, err
  91. }
  92. return count, store.InsertEntry(ctx, filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry))
  93. }
  94. return count, nil
  95. }
  96. func processEntryLog(entry *filer_pb.Entry, commandEnv *CommandEnv, snapshotCount int, store *filer_leveldb.LevelDBStore, snapshotsToGenerate []time.Time, homeDirname string, snapshotPath string) (count int, err error) {
  97. totalSize := filer.FileSize(entry)
  98. buf := mem.Allocate(int(totalSize))
  99. if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  100. return filer.ReadAll(buf, commandEnv.MasterClient, entry.GetChunks())
  101. }); err != nil && err != filer_pb.ErrNotFound {
  102. return snapshotCount, err
  103. }
  104. idx := uint32(0)
  105. for idx < uint32(totalSize) {
  106. logEntrySize := util.BytesToUint32(buf[idx : idx+4])
  107. var logEntry filer_pb.LogEntry
  108. err = proto.Unmarshal(buf[idx+4:idx+4+logEntrySize], &logEntry)
  109. if err != nil {
  110. return snapshotCount, err
  111. }
  112. idx = idx + 4 + logEntrySize
  113. snapshotCount, err = processMetaDataEvents(store, logEntry.Data, snapshotCount, snapshotsToGenerate, homeDirname, snapshotPath)
  114. if err != nil {
  115. return snapshotCount, err
  116. }
  117. }
  118. return snapshotCount, err
  119. }
  120. func generateSnapshots(scrDir, dest string) error {
  121. entries, err := os.ReadDir(scrDir)
  122. if err != nil {
  123. return err
  124. }
  125. for _, entry := range entries {
  126. sourcePath := filepath.Join(scrDir, entry.Name())
  127. destPath := filepath.Join(dest, entry.Name())
  128. fileInfo, err := os.Stat(sourcePath)
  129. if err != nil {
  130. return err
  131. }
  132. switch fileInfo.Mode() & os.ModeType {
  133. case os.ModeDir:
  134. if err := createIfNotExists(destPath, 0755); err != nil {
  135. return err
  136. }
  137. if err := generateSnapshots(sourcePath, destPath); err != nil {
  138. return err
  139. }
  140. default:
  141. if err := copy(sourcePath, destPath); err != nil {
  142. return err
  143. }
  144. }
  145. }
  146. return nil
  147. }
  148. func copy(srcFile, dstFile string) error {
  149. out, err := os.Create(dstFile)
  150. if err != nil {
  151. return err
  152. }
  153. defer out.Close()
  154. in, err := os.Open(srcFile)
  155. defer in.Close()
  156. if err != nil {
  157. return err
  158. }
  159. _, err = io.Copy(out, in)
  160. if err != nil {
  161. return err
  162. }
  163. return nil
  164. }
  165. func exists(filePath string) bool {
  166. if _, err := os.Stat(filePath); os.IsNotExist(err) {
  167. return false
  168. }
  169. return true
  170. }
  171. func createIfNotExists(dir string, perm os.FileMode) error {
  172. if exists(dir) {
  173. return nil
  174. }
  175. if err := os.MkdirAll(dir, perm); err != nil {
  176. return fmt.Errorf("failed to create directory: '%s', error: '%s'", dir, err.Error())
  177. }
  178. return nil
  179. }
  180. func setupLevelDb(levelDbPath string, levelDbBootstrapPath string) (store *filer_leveldb.LevelDBStore, err error) {
  181. err = os.RemoveAll(levelDbPath)
  182. if err != nil {
  183. return &filer_leveldb.LevelDBStore{}, err
  184. }
  185. if len(levelDbBootstrapPath) != 0 {
  186. // copy the latest snapshot as starting point
  187. err = generateSnapshots(levelDbBootstrapPath, levelDbPath)
  188. if err != nil {
  189. return
  190. }
  191. }
  192. config := SnapshotConfig{
  193. dir: levelDbPath,
  194. }
  195. store.Initialize(config, "")
  196. return
  197. }
  198. func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, _writer io.Writer) (err error) {
  199. fsMetaSnapshotsCreateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  200. snapshotPath := fsMetaSnapshotsCreateCommand.String("path", "", "the path to store generated snapshot files")
  201. count := fsMetaSnapshotsCreateCommand.Int("count", 3, "number of snapshots generated")
  202. intervalDays := fsMetaSnapshotsCreateCommand.Int("interval-days", 7, "the duration interval between each generated snapshot")
  203. if err = fsMetaSnapshotsCreateCommand.Parse(args); err != nil {
  204. return err
  205. }
  206. homeDirname, err := os.UserHomeDir()
  207. if err != nil {
  208. return err
  209. }
  210. snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err := computeRequirements(homeDirname, *snapshotPath, *count, *intervalDays)
  211. if err != nil {
  212. return err
  213. }
  214. levelDbPath := filepath.Join(homeDirname, LevelDbPath)
  215. store, err := setupLevelDb(levelDbPath, filepath.Join(homeDirname, *snapshotPath, levelDbBootstrapPath))
  216. if err != nil {
  217. return err
  218. }
  219. // sort to make sure we are processing ascending list of snapshots to generate
  220. sort.Slice(snapshotsToGenerate, func(i, j int) bool {
  221. return snapshotsToGenerate[i].Before(snapshotsToGenerate[j])
  222. })
  223. changeLogPath := filer.SystemLogDir
  224. var processEntry func(entry *filer_pb.Entry, isLast bool) error
  225. var levelDbBootstrapDate string
  226. if levelDbBootstrapPath != "" {
  227. levelDbBootstrapDate = levelDbBootstrapPath[:len(DateFormat)]
  228. }
  229. snapshotCount := 0
  230. processEntry = func(entry *filer_pb.Entry, isLast bool) error {
  231. if entry.IsDirectory {
  232. // skip logs prior to the latest previous snapshot
  233. if entry.GetName() <= levelDbBootstrapDate {
  234. println(entry.GetName())
  235. return nil
  236. }
  237. return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath+"/"+entry.Name), "", processEntry)
  238. }
  239. snapshotCount, err = processEntryLog(entry, commandEnv, snapshotCount, store, snapshotsToGenerate, homeDirname, *snapshotPath)
  240. return err
  241. }
  242. err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath), "", processEntry)
  243. if err != nil {
  244. return err
  245. }
  246. // edge case
  247. // there might be unfinished snapshot left over in the duration gaps.
  248. // process meta event only triggers snapshots when there are event after the snapshot time.
  249. for snapshotCount < len(snapshotsToGenerate) {
  250. generatePath := filepath.Join(homeDirname, *snapshotPath, snapshotsToGenerate[snapshotCount].Format(DateFormat))
  251. err = createIfNotExists(generatePath, 0755)
  252. if err != nil {
  253. return err
  254. }
  255. err = generateSnapshots(levelDbPath, generatePath)
  256. if err != nil {
  257. return err
  258. }
  259. snapshotCount++
  260. }
  261. // remove previous snapshots.
  262. for _, snapshot := range snapshotsToRemove {
  263. err = os.RemoveAll(snapshot)
  264. if err != nil {
  265. return err
  266. }
  267. }
  268. return nil
  269. }