diff --git a/weed/shell/command_fs_meta_snapshots_create.go b/weed/shell/command_fs_meta_snapshots_create.go new file mode 100644 index 000000000..57eaef25a --- /dev/null +++ b/weed/shell/command_fs_meta_snapshots_create.go @@ -0,0 +1,302 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + filer_leveldb "github.com/seaweedfs/seaweedfs/weed/filer/leveldb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/mem" + "google.golang.org/protobuf/proto" + "io" + "os" + "path/filepath" + "sort" + "time" +) + +const LevelDbPath = "snapshots.db" +const DateFormat = "2006-01-02" +const SnapshotDirPostFix = "-snapshot" + +func init() { + Commands = append(Commands, &commandFsMetaSnapshotsCreate{}) +} + +type commandFsMetaSnapshotsCreate struct { +} + +func (c *commandFsMetaSnapshotsCreate) Name() string { + return "fs.meta.snapshots.create" +} + +type SnapshotConfig struct { + dir string +} + +func (c SnapshotConfig) GetString(key string) string { + return c.dir +} +func (c SnapshotConfig) GetBool(key string) bool { + panic("implement me") +} + +func (c SnapshotConfig) GetInt(key string) int { + panic("implement me") +} + +func (c SnapshotConfig) GetStringSlice(key string) []string { + panic("implement me") +} + +func (c SnapshotConfig) SetDefault(key string, value interface{}) { + panic("implement me") +} + +func (c *commandFsMetaSnapshotsCreate) Help() string { + return `create snapshots of meta data from given time range. + + fs.meta.snapshots.create -interval-days=7 -count=3 -path=/your/path + // fs.meta.snapshots.create will generate desired number of snapshots with desired duration interval from yesterday the generated files will be saved from input path. + // These snapshot maybe later used to backup the system to certain timestamp. + // path input is relative to home directory. +` +} + +func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, count int, snapshotCheckPoints []time.Time, homeDir string, snapshotPath string) (SnapshotCount int, err error) { + var event filer_pb.SubscribeMetadataResponse + err = proto.Unmarshal(data, &event) + if err != nil { + return count, err + } + eventTime := event.TsNs + for count < len(snapshotCheckPoints) && time.Unix(0, eventTime).After(snapshotCheckPoints[count].Add(-time.Microsecond)) { + snapshotPath := filepath.Join(homeDir, snapshotPath, snapshotCheckPoints[count].Format(DateFormat)+SnapshotDirPostFix) + err = createIfNotExists(snapshotPath, 0755) + if err != nil { + return count, err + } + err = generateSnapshots(filepath.Join(homeDir, LevelDbPath), snapshotPath) + if err != nil { + return count, err + } + count++ + } + if count < 0 { + return count, nil + } + ctx := context.Background() + if filer_pb.IsEmpty(&event) { + return count, nil + } else if filer_pb.IsCreate(&event) { + entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry) + return count, store.InsertEntry(ctx, entry) + } else if filer_pb.IsDelete(&event) { + return count, store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)) + } else if filer_pb.IsUpdate(&event) { + entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry) + return count, store.UpdateEntry(ctx, entry) + } else { + if err := store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)); err != nil { + return count, err + } + return count, store.InsertEntry(ctx, filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)) + } + return count, nil +} + +func processEntryLog(entry *filer_pb.Entry, commandEnv *CommandEnv, snapshotCount int, store *filer_leveldb.LevelDBStore, snapshotsToGenerate []time.Time, homeDirname string, snapshotPath string) (count int, err error) { + totalSize := filer.FileSize(entry) + buf := mem.Allocate(int(totalSize)) + if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return filer.ReadAll(buf, commandEnv.MasterClient, entry.GetChunks()) + }); err != nil && err != filer_pb.ErrNotFound { + return snapshotCount, err + } + idx := uint32(0) + for idx < uint32(totalSize) { + logEntrySize := util.BytesToUint32(buf[idx : idx+4]) + var logEntry filer_pb.LogEntry + err = proto.Unmarshal(buf[idx+4:idx+4+logEntrySize], &logEntry) + if err != nil { + return snapshotCount, err + } + idx = idx + 4 + logEntrySize + snapshotCount, err = processMetaDataEvents(store, logEntry.Data, snapshotCount, snapshotsToGenerate, homeDirname, snapshotPath) + if err != nil { + return snapshotCount, err + } + } + return snapshotCount, err +} + +func generateSnapshots(scrDir, dest string) error { + entries, err := os.ReadDir(scrDir) + if err := createIfNotExists(dest, 0755); err != nil { + return err + } + if err != nil { + return err + } + for _, entry := range entries { + sourcePath := filepath.Join(scrDir, entry.Name()) + destPath := filepath.Join(dest, entry.Name()) + + fileInfo, err := os.Stat(sourcePath) + if err != nil { + return err + } + + switch fileInfo.Mode() & os.ModeType { + case os.ModeDir: + if err := createIfNotExists(destPath, 0755); err != nil { + return err + } + if err := generateSnapshots(sourcePath, destPath); err != nil { + return err + } + default: + if err := copy(sourcePath, destPath); err != nil { + return err + } + } + } + return nil +} + +func copy(srcFile, dstFile string) error { + out, err := os.Create(dstFile) + if err != nil { + return err + } + + defer out.Close() + + in, err := os.Open(srcFile) + defer in.Close() + if err != nil { + return err + } + + _, err = io.Copy(out, in) + if err != nil { + return err + } + + return nil +} + +func exists(filePath string) bool { + if _, err := os.Stat(filePath); os.IsNotExist(err) { + return false + } + return true +} + +func createIfNotExists(dir string, perm os.FileMode) error { + if exists(dir) { + return nil + } + + if err := os.MkdirAll(dir, perm); err != nil { + return fmt.Errorf("failed to create directory: '%s', error: '%s'", dir, err.Error()) + } + + return nil +} + +func setupLevelDb(levelDbPath string, levelDbBootstrapPath string) (store *filer_leveldb.LevelDBStore, err error) { + store = &filer_leveldb.LevelDBStore{} + err = os.RemoveAll(levelDbPath) + if err != nil { + return + } + if len(levelDbBootstrapPath) != 0 { + // copy the latest snapshot as starting point + err = generateSnapshots(levelDbBootstrapPath, levelDbPath) + if err != nil { + return + } + } + config := SnapshotConfig{ + dir: levelDbPath, + } + + store.Initialize(config, "") + return +} + +func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, _writer io.Writer) (err error) { + fsMetaSnapshotsCreateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + snapshotPath := fsMetaSnapshotsCreateCommand.String("path", "", "the path to store generated snapshot files") + count := fsMetaSnapshotsCreateCommand.Int("count", 3, "number of snapshots generated") + intervalDays := fsMetaSnapshotsCreateCommand.Int("interval-days", 7, "the duration interval between each generated snapshot") + if err = fsMetaSnapshotsCreateCommand.Parse(args); err != nil { + return err + } + homeDirname, err := os.UserHomeDir() + if err != nil { + return err + } + snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err := computeRequirements(homeDirname, *snapshotPath, *count, *intervalDays) + if err != nil { + return err + } + levelDbPath := filepath.Join(homeDirname, LevelDbPath) + store, err := setupLevelDb(levelDbPath, filepath.Join(homeDirname, *snapshotPath, levelDbBootstrapPath)) + if err != nil { + return err + } + // sort to make sure we are processing ascending list of snapshots to generate + sort.Slice(snapshotsToGenerate, func(i, j int) bool { + return snapshotsToGenerate[i].Before(snapshotsToGenerate[j]) + }) + changeLogPath := filer.SystemLogDir + var processEntry func(entry *filer_pb.Entry, isLast bool) error + var levelDbBootstrapDate string + if levelDbBootstrapPath != "" { + levelDbBootstrapDate = levelDbBootstrapPath[:len(DateFormat)] + } + snapshotCount := 0 + processEntry = func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory { + // skip logs prior to the latest previous snapshot + if levelDbBootstrapDate != "" && entry.GetName() <= levelDbBootstrapDate { + return nil + } + return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath+"/"+entry.Name), "", processEntry) + } + snapshotCount, err = processEntryLog(entry, commandEnv, snapshotCount, store, snapshotsToGenerate, homeDirname, *snapshotPath) + return err + } + + err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath), "", processEntry) + if err != nil { + return err + } + // edge case + // there might be unfinished snapshot left over in the duration gaps. + // process meta event only triggers snapshots when there are event after the snapshot time. + for snapshotCount < len(snapshotsToGenerate) { + generatePath := filepath.Join(homeDirname, *snapshotPath, snapshotsToGenerate[snapshotCount].Format(DateFormat)+SnapshotDirPostFix) + err = createIfNotExists(generatePath, 0755) + if err != nil { + return err + } + err = generateSnapshots(levelDbPath, generatePath) + if err != nil { + return err + } + snapshotCount++ + } + // remove previous snapshots. + for _, snapshot := range snapshotsToRemove { + err = os.RemoveAll(snapshot) + if err != nil { + return err + } + } + return nil +} diff --git a/weed/shell/command_fs_meta_snapshots_create_scheduler.go b/weed/shell/command_fs_meta_snapshots_create_scheduler.go new file mode 100644 index 000000000..962445007 --- /dev/null +++ b/weed/shell/command_fs_meta_snapshots_create_scheduler.go @@ -0,0 +1,86 @@ +package shell + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "sort" + "strings" + "time" +) + +func computeRequirementsFromDirectory(previousSnapshots []string, homeDirectory string, snapshotPath string, count int, durationDays int, targetDate time.Time) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, levelDbBootstrapPath string, err error) { + levelDbBootstrapPath = previousSnapshots[len(previousSnapshots)-1] + lastSnapshotDate, err := time.Parse(DateFormat, levelDbBootstrapPath[:len(DateFormat)]) + if err != nil { + return + } + if err != nil { + return + } + // the snapshots cover the last nanosecond of the current date + lastSnapshotDate = lastSnapshotDate.AddDate(0, 0, 1).Add(-1 * time.Nanosecond) + gapDuration := targetDate.Sub(lastSnapshotDate) + oneSnapshotInterval := 24 * time.Hour * time.Duration(durationDays) + totalSnapshotsInterval := 24 * time.Hour * time.Duration(durationDays*count) + // gap too small no snapshot will be generated + if gapDuration < oneSnapshotInterval { + return snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, errors.New(fmt.Sprintf("last snapshot was generated at %v no need to generate new snapshots", lastSnapshotDate.Format(DateFormat))) + } else if gapDuration > totalSnapshotsInterval { + // gap too large generate from targetDate + // and remove all previous snapshots + _, snapshotsToGenerate, _, err = computeRequirementsFromEmpty(count, durationDays, targetDate) + for _, file := range previousSnapshots { + snapshotsToRemove = append(snapshotsToRemove, filepath.Join(homeDirectory, snapshotPath, file)) + } + return + } + snapshotDate := lastSnapshotDate.AddDate(0, 0, durationDays) + for snapshotDate.Before(targetDate) || snapshotDate.Equal(targetDate) { + snapshotsToGenerate = append(snapshotsToGenerate, snapshotDate) + snapshotDate = snapshotDate.AddDate(0, 0, durationDays) + } + totalCount := len(previousSnapshots) + len(snapshotsToGenerate) + toRemoveIdx := 0 + for toRemoveIdx < len(previousSnapshots) && totalCount-toRemoveIdx > count { + snapshotsToRemove = append(snapshotsToRemove, filepath.Join(homeDirectory, snapshotPath, previousSnapshots[toRemoveIdx])) + toRemoveIdx += 1 + } + return +} + +func computeRequirementsFromEmpty(count int, durationDays int, targetDate time.Time) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, levelDbBootstrapPath string, err error) { + snapshotDate := targetDate + for i := 0; i < count; i++ { + snapshotsToGenerate = append(snapshotsToGenerate, snapshotDate) + snapshotDate = snapshotDate.AddDate(0, 0, -1*durationDays) + } + return snapshotsToRemove, snapshotsToGenerate, "", nil +} + +// compute number of snapshot need to be generated and number of snapshots to remove from give directory. +func computeRequirements(homeDirectory string, snapshotPath string, count int, durationDays int) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, levelDbBootstrapPath string, err error) { + snapshotDirectory := filepath.Join(homeDirectory, snapshotPath) + files, _ := os.ReadDir(snapshotDirectory) + // sort files by name + sort.Slice(files, func(i, j int) bool { + return files[i].Name() < files[j].Name() + }) + // filter for snapshots file name only + var prevSnapshotFiles []string + for _, file := range files { + if strings.HasSuffix(file.Name(), SnapshotDirPostFix) { + prevSnapshotFiles = append(prevSnapshotFiles, file.Name()) + } + } + curDate := time.Now() + curDateStr := curDate.Format(DateFormat) + // ensure snapshot start at today 00:00 - 1ns + today, err := time.Parse(DateFormat, curDateStr) + targetDate := today.Add(-1 * time.Nanosecond) + if len(prevSnapshotFiles) == 0 { + return computeRequirementsFromEmpty(count, durationDays, targetDate) + } + return computeRequirementsFromDirectory(prevSnapshotFiles, homeDirectory, snapshotPath, count, durationDays, targetDate) +} diff --git a/weed/shell/command_fs_meta_snapshots_create_scheduler_test.go b/weed/shell/command_fs_meta_snapshots_create_scheduler_test.go new file mode 100644 index 000000000..9a2159aa6 --- /dev/null +++ b/weed/shell/command_fs_meta_snapshots_create_scheduler_test.go @@ -0,0 +1,64 @@ +package shell + +import ( + "github.com/stretchr/testify/assert" + "path/filepath" + "reflect" + "testing" + "time" +) + +func TestComputeRequirementsFromDirectory(t *testing.T) { + homeDir := "home" + snapDir := "test" + // case1: we have previous snapshots, target date is relative close to the latest snapshot date, we will use previous snapshots to generate future snapshots and remove some previous snapshots. + var prevSnapshots []string + prevSnapshots = append(prevSnapshots, "2022-01-01") + prevSnapshots = append(prevSnapshots, "2022-02-01") + prevSnapshots = append(prevSnapshots, "2022-03-01") + targetDate, _ := time.Parse(DateFormat, "2022-04-01") + targetDate = targetDate.Add(-1 * time.Nanosecond) + snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err := computeRequirementsFromDirectory(prevSnapshots, homeDir, snapDir, 3, 15, targetDate) + assert.Nil(t, err) + assert.Equal(t, levelDbBootstrapPath, "2022-03-01", "latest previous snapshot should be 2022-03-01") + expectedToRemoveSnapshots := []string{filepath.Join(homeDir, snapDir, "2022-01-01"), filepath.Join(homeDir, snapDir, "2022-02-01")} + assert.True(t, reflect.DeepEqual(snapshotsToRemove, expectedToRemoveSnapshots)) + expectedSnapshotsGenerationDate := []string{"2022-03-16", "2022-03-31"} + for i, date := range expectedSnapshotsGenerationDate { + assert.Equal(t, snapshotsToGenerate[i].Format(DateFormat), date) + } + + // case2: we have previous snapshots, target date is too close to the latest snapshot date, no change will happen. + targetDate, _ = time.Parse(DateFormat, "2022-03-02") + snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err = computeRequirementsFromDirectory(prevSnapshots, "home", "test", 3, 15, targetDate) + assert.NotNil(t, err) + assert.Containsf(t, err.Error(), "no need to generate new snapshots", "expected error containing %q, got %s", "no need to generate new snapshots", err) + assert.Empty(t, snapshotsToRemove) + assert.Empty(t, snapshotsToGenerate) + assert.Equal(t, levelDbBootstrapPath, "2022-03-01", "latest previous snapshot should be 2022-03-01") + + // case3: we have previous snapshots, target date is too far out to the latest snapshot date, all previous snapshots will be removed, snapshots will be generated going backward starting on target date. + targetDate, _ = time.Parse(DateFormat, "2022-12-01") + targetDate = targetDate.Add(-1 * time.Nanosecond) + snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err = computeRequirementsFromDirectory(prevSnapshots, "home", "test", 3, 15, targetDate) + assert.Nil(t, err) + expectedToRemoveSnapshots = []string{filepath.Join(homeDir, snapDir, "2022-01-01"), filepath.Join(homeDir, snapDir, "2022-02-01"), filepath.Join(homeDir, snapDir, "2022-03-01")} + assert.True(t, reflect.DeepEqual(snapshotsToRemove, expectedToRemoveSnapshots)) + // we still need to skip all logs prior to 2022-03-02 + assert.Equal(t, levelDbBootstrapPath, "2022-03-01", "latest previous snapshot should be 2022-03-01") + expectedSnapshotsGenerationDate = []string{"2022-11-30", "2022-11-15", "2022-10-31"} + for i, date := range expectedSnapshotsGenerationDate { + assert.Equal(t, snapshotsToGenerate[i].Format(DateFormat), date) + } + + // case4: the target date is exactly n snapshots away from previous snapshot date + targetDate, _ = time.Parse(DateFormat, "2022-03-04") + targetDate = targetDate.Add(-1 * time.Nanosecond) + snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err = computeRequirementsFromDirectory(prevSnapshots, "home", "test", 3, 1, targetDate) + expectedToRemoveSnapshots = []string{filepath.Join(homeDir, snapDir, "2022-01-01"), filepath.Join(homeDir, snapDir, "2022-02-01")} + assert.True(t, reflect.DeepEqual(snapshotsToRemove, expectedToRemoveSnapshots)) + expectedSnapshotsGenerationDate = []string{"2022-03-02", "2022-03-03"} + for i, date := range expectedSnapshotsGenerationDate { + assert.Equal(t, snapshotsToGenerate[i].Format(DateFormat), date) + } +}