You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

349 lines
11 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package shell
  2. import (
  3. "context"
  4. "errors"
  5. "flag"
  6. "fmt"
  7. "github.com/seaweedfs/seaweedfs/weed/filer"
  8. filer_leveldb "github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/util"
  11. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  12. "google.golang.org/protobuf/proto"
  13. "io"
  14. "os"
  15. "path/filepath"
  16. "sort"
  17. "strings"
  18. "time"
  19. )
  20. const LevelDbPath = "tmp/snapshots.db"
  21. const DateFormat = "2006-01-02"
  22. const SnapshotDirPostFix = "-snapshot"
  23. func init() {
  24. Commands = append(Commands, &commandFsMetaSnapshotsCreate{})
  25. }
  26. type commandFsMetaSnapshotsCreate struct {
  27. }
  28. func (c *commandFsMetaSnapshotsCreate) Name() string {
  29. return "fs.meta.snapshots.create"
  30. }
  31. type SnapshotConfig struct {
  32. dir string
  33. }
  34. func (c SnapshotConfig) GetString(key string) string {
  35. return c.dir
  36. }
  37. func (c SnapshotConfig) GetBool(key string) bool {
  38. panic("implement me")
  39. }
  40. func (c SnapshotConfig) GetInt(key string) int {
  41. panic("implement me")
  42. }
  43. func (c SnapshotConfig) GetStringSlice(key string) []string {
  44. panic("implement me")
  45. }
  46. func (c SnapshotConfig) SetDefault(key string, value interface{}) {
  47. panic("implement me")
  48. }
  49. func (c *commandFsMetaSnapshotsCreate) Help() string {
  50. return `create snapshots of meta data from given time range.
  51. fs.meta.snapshots.create -interval-days=7 -count=3 -path=/your/path
  52. // fs.meta.snapshots.create will generate desired number of snapshots with desired duration interval from yesterday the generated files will be saved from input path.
  53. // These snapshot maybe later used to backup the system to certain timestamp.
  54. // path input is relative to home directory.
  55. `
  56. }
  57. func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, unfinshiedSnapshotCnt int, snapshotCheckPoints []time.Time, homeDir string, snapshotPath string) (todoCnt int, err error) {
  58. var event filer_pb.SubscribeMetadataResponse
  59. err = proto.Unmarshal(data, &event)
  60. if err != nil {
  61. return unfinshiedSnapshotCnt, err
  62. }
  63. eventTime := event.TsNs
  64. for unfinshiedSnapshotCnt >= 0 && time.Unix(0, eventTime).After(snapshotCheckPoints[unfinshiedSnapshotCnt]) {
  65. snapshotPath := filepath.Join(homeDir, snapshotPath, snapshotCheckPoints[unfinshiedSnapshotCnt].Format(DateFormat)+SnapshotDirPostFix)
  66. err = createIfNotExists(snapshotPath, 0755)
  67. if err != nil {
  68. return unfinshiedSnapshotCnt, err
  69. }
  70. err = generateSnapshots(homeDir+LevelDbPath, snapshotPath)
  71. if err != nil {
  72. return unfinshiedSnapshotCnt, err
  73. }
  74. unfinshiedSnapshotCnt--
  75. }
  76. if unfinshiedSnapshotCnt < 0 {
  77. return unfinshiedSnapshotCnt, nil
  78. }
  79. ctx := context.Background()
  80. if filer_pb.IsEmpty(&event) {
  81. return unfinshiedSnapshotCnt, nil
  82. } else if filer_pb.IsCreate(&event) {
  83. entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
  84. return unfinshiedSnapshotCnt, store.InsertEntry(ctx, entry)
  85. } else if filer_pb.IsDelete(&event) {
  86. return unfinshiedSnapshotCnt, store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name))
  87. } else if filer_pb.IsUpdate(&event) {
  88. entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
  89. return unfinshiedSnapshotCnt, store.UpdateEntry(ctx, entry)
  90. } else {
  91. if err := store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)); err != nil {
  92. return unfinshiedSnapshotCnt, err
  93. }
  94. return unfinshiedSnapshotCnt, store.InsertEntry(ctx, filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry))
  95. }
  96. return unfinshiedSnapshotCnt, nil
  97. }
  98. func generateSnapshots(scrDir, dest string) error {
  99. entries, err := os.ReadDir(scrDir)
  100. if err != nil {
  101. return err
  102. }
  103. for _, entry := range entries {
  104. sourcePath := filepath.Join(scrDir, entry.Name())
  105. destPath := filepath.Join(dest, entry.Name())
  106. fileInfo, err := os.Stat(sourcePath)
  107. if err != nil {
  108. return err
  109. }
  110. switch fileInfo.Mode() & os.ModeType {
  111. case os.ModeDir:
  112. if err := createIfNotExists(destPath, 0755); err != nil {
  113. return err
  114. }
  115. if err := generateSnapshots(sourcePath, destPath); err != nil {
  116. return err
  117. }
  118. default:
  119. if err := copy(sourcePath, destPath); err != nil {
  120. return err
  121. }
  122. }
  123. }
  124. return nil
  125. }
  126. func copy(srcFile, dstFile string) error {
  127. out, err := os.Create(dstFile)
  128. if err != nil {
  129. return err
  130. }
  131. defer out.Close()
  132. in, err := os.Open(srcFile)
  133. defer in.Close()
  134. if err != nil {
  135. return err
  136. }
  137. _, err = io.Copy(out, in)
  138. if err != nil {
  139. return err
  140. }
  141. return nil
  142. }
  143. func exists(filePath string) bool {
  144. if _, err := os.Stat(filePath); os.IsNotExist(err) {
  145. return false
  146. }
  147. return true
  148. }
  149. func createIfNotExists(dir string, perm os.FileMode) error {
  150. if exists(dir) {
  151. return nil
  152. }
  153. if err := os.MkdirAll(dir, perm); err != nil {
  154. return fmt.Errorf("failed to create directory: '%s', error: '%s'", dir, err.Error())
  155. }
  156. return nil
  157. }
  158. func computeRequirementsFromDirectory(previousSnapshots []os.DirEntry, homeDirectory string, snapshotPath string, count int, durationDays int) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, err error) {
  159. lastSnapshotDate, err := time.Parse(DateFormat, previousSnapshots[len(previousSnapshots)-1].Name()[:len(DateFormat)])
  160. if err != nil {
  161. return snapshotsToRemove, snapshotsToGenerate, err
  162. }
  163. yesterday := time.Now().Add(-time.Hour * 24)
  164. yesterdayStr := yesterday.Format(DateFormat)
  165. // ensure snapshot start at yesterday 00:00
  166. yesterday, err = time.Parse(DateFormat, yesterdayStr)
  167. if err != nil {
  168. return snapshotsToRemove, snapshotsToGenerate, err
  169. }
  170. gapDays := int(yesterday.Sub(lastSnapshotDate).Hours() / 24)
  171. // gap too small no snapshot will be generated
  172. if gapDays < durationDays {
  173. return snapshotsToRemove, snapshotsToGenerate, errors.New(fmt.Sprintf("last snapshot was generated at %v no need to generate new snapshots", lastSnapshotDate.Format(DateFormat)))
  174. } else if gapDays > durationDays*count {
  175. // gap too large generate from yesterday
  176. // and remove all previous snapshots
  177. _, snapshotsToGenerate, err = computeRequirementsFromEmpty(homeDirectory, count, durationDays)
  178. for _, file := range previousSnapshots {
  179. snapshotsToRemove = append(snapshotsToRemove, filepath.Join(homeDirectory, snapshotPath, file.Name()))
  180. }
  181. return
  182. }
  183. snapshotDate := lastSnapshotDate.AddDate(0, 0, 1*durationDays)
  184. for snapshotDate.Before(yesterday) || snapshotDate.Equal(yesterday) {
  185. snapshotsToGenerate = append(snapshotsToGenerate, snapshotDate)
  186. snapshotDate = lastSnapshotDate.AddDate(0, 0, 1*durationDays)
  187. }
  188. totalCount := len(previousSnapshots) + len(snapshotsToGenerate)
  189. toRemoveIdx := 0
  190. for toRemoveIdx < len(previousSnapshots) && totalCount-toRemoveIdx > count {
  191. snapshotsToRemove = append(snapshotsToRemove, filepath.Join(homeDirectory, snapshotPath, previousSnapshots[toRemoveIdx].Name()))
  192. toRemoveIdx += 1
  193. }
  194. return
  195. }
  196. func computeRequirementsFromEmpty(homeDirectory string, count int, durationDays int) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, err error) {
  197. yesterday := time.Now().Add(-time.Hour * 24).Format(DateFormat)
  198. // ensure snapshot start at yesterday 00:00
  199. snapshotDate, err := time.Parse(DateFormat, yesterday)
  200. if err != nil {
  201. return snapshotsToRemove, snapshotsToGenerate, err
  202. }
  203. for i := 0; i < count; i++ {
  204. snapshotsToGenerate = append(snapshotsToGenerate, snapshotDate)
  205. snapshotDate = snapshotDate.AddDate(0, 0, -1*durationDays)
  206. }
  207. return snapshotsToRemove, snapshotsToGenerate, nil
  208. }
  209. // compute number of snapshot need to be generated and number of snapshots to remove from give directory.
  210. func computeRequirements(homeDirectory string, snapshotPath string, count int, durationDays int) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, err error) {
  211. snapshotDirectory := filepath.Join(homeDirectory, snapshotPath)
  212. files, _ := os.ReadDir(snapshotDirectory)
  213. if len(files) == 0 {
  214. return computeRequirementsFromEmpty(homeDirectory, count, durationDays)
  215. }
  216. // sort files by name
  217. sort.Slice(files, func(i, j int) bool {
  218. return files[i].Name() < files[j].Name()
  219. })
  220. // filter for snapshots file only
  221. var prevSnapshotFiles []os.DirEntry
  222. for _, file := range files {
  223. if strings.HasSuffix(file.Name(), SnapshotDirPostFix) {
  224. prevSnapshotFiles = append(prevSnapshotFiles, file)
  225. }
  226. }
  227. return computeRequirementsFromDirectory(prevSnapshotFiles, homeDirectory, snapshotPath, count, durationDays)
  228. }
  229. func setupLevelDb(levelDbPath string) (store *filer_leveldb.LevelDBStore, err error) {
  230. err = os.RemoveAll(levelDbPath)
  231. if err != nil {
  232. return &filer_leveldb.LevelDBStore{}, err
  233. }
  234. config := SnapshotConfig{
  235. dir: levelDbPath,
  236. }
  237. store.Initialize(config, "")
  238. return
  239. }
  240. func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, _writer io.Writer) (err error) {
  241. fsMetaSnapshotsCreateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  242. snapshotPath := fsMetaSnapshotsCreateCommand.String("path", "", "the path to store generated snapshot files")
  243. count := fsMetaSnapshotsCreateCommand.Int("count", 3, "number of snapshots generated")
  244. intervalDays := fsMetaSnapshotsCreateCommand.Int("interval-days", 7, "the duration interval between each generated snapshot")
  245. if err = fsMetaSnapshotsCreateCommand.Parse(args); err != nil {
  246. return err
  247. }
  248. homeDirname, err := os.UserHomeDir()
  249. if err != nil {
  250. return err
  251. }
  252. snapshotsToRemove, snapshotsToGenerate, err := computeRequirements(homeDirname, *snapshotPath, *count, *intervalDays)
  253. if err != nil {
  254. return err
  255. }
  256. levelDbPath := filepath.Join(homeDirname, LevelDbPath)
  257. store, err := setupLevelDb(levelDbPath)
  258. if err != nil {
  259. return err
  260. }
  261. unfinishedSnapshotCnt := len(snapshotsToGenerate) - 1
  262. changeLogPath := filer.SystemLogDir
  263. var processEntry func(entry *filer_pb.Entry, isLast bool) error
  264. processEntry = func(entry *filer_pb.Entry, isLast bool) error {
  265. if entry.IsDirectory {
  266. return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath+"/"+entry.Name), "", processEntry)
  267. }
  268. totalSize := filer.FileSize(entry)
  269. buf := mem.Allocate(int(totalSize))
  270. if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  271. return filer.ReadAll(buf, commandEnv.MasterClient, entry.GetChunks())
  272. }); err != nil && err != filer_pb.ErrNotFound {
  273. return err
  274. }
  275. idx := uint32(0)
  276. for idx < uint32(totalSize) {
  277. logEntrySize := util.BytesToUint32(buf[idx : idx+4])
  278. var logEntry filer_pb.LogEntry
  279. err = proto.Unmarshal(buf[idx+4:idx+4+logEntrySize], &logEntry)
  280. if err != nil {
  281. return err
  282. }
  283. idx = idx + 4 + logEntrySize
  284. unfinishedSnapshotCnt, err = processMetaDataEvents(store, logEntry.Data, unfinishedSnapshotCnt, snapshotsToGenerate, homeDirname, *snapshotPath)
  285. if err != nil {
  286. return err
  287. }
  288. }
  289. return err
  290. }
  291. err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath), "", processEntry)
  292. if err != nil {
  293. return err
  294. }
  295. // edge case
  296. // there might be unfinished snapshot left over in the duration gaps.
  297. // process meta event only triggers snapshots when there are event after the snapshot time
  298. for unfinishedSnapshotCnt >= 0 {
  299. generatePath := filepath.Join(homeDirname, *snapshotPath, snapshotsToGenerate[unfinishedSnapshotCnt].Format(DateFormat))
  300. err = createIfNotExists(generatePath, 0755)
  301. if err != nil {
  302. return err
  303. }
  304. err = generateSnapshots(levelDbPath, generatePath)
  305. if err != nil {
  306. return err
  307. }
  308. unfinishedSnapshotCnt--
  309. }
  310. // remove previous snapshot if needed.
  311. for _, snapshot := range snapshotsToRemove {
  312. err = os.RemoveAll(snapshot)
  313. if err != nil {
  314. return err
  315. }
  316. }
  317. return nil
  318. }