diff --git a/weed/shell/command_fs_meta_snapshots_create.go b/weed/shell/command_fs_meta_snapshots_create.go index 800c87287..af915f9cc 100644 --- a/weed/shell/command_fs_meta_snapshots_create.go +++ b/weed/shell/command_fs_meta_snapshots_create.go @@ -2,7 +2,6 @@ package shell import ( "context" - "errors" "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -15,7 +14,6 @@ import ( "os" "path/filepath" "sort" - "strings" "time" ) @@ -67,46 +65,71 @@ func (c *commandFsMetaSnapshotsCreate) Help() string { ` } -func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, unfinshiedSnapshotCnt int, snapshotCheckPoints []time.Time, homeDir string, snapshotPath string) (todoCnt int, err error) { +func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, count int, snapshotCheckPoints []time.Time, homeDir string, snapshotPath string) (SnapshotCount int, err error) { var event filer_pb.SubscribeMetadataResponse err = proto.Unmarshal(data, &event) if err != nil { - return unfinshiedSnapshotCnt, err + return count, err } eventTime := event.TsNs - for unfinshiedSnapshotCnt >= 0 && time.Unix(0, eventTime).After(snapshotCheckPoints[unfinshiedSnapshotCnt]) { - snapshotPath := filepath.Join(homeDir, snapshotPath, snapshotCheckPoints[unfinshiedSnapshotCnt].Format(DateFormat)+SnapshotDirPostFix) + for count < len(snapshotCheckPoints) && time.Unix(0, eventTime).After(snapshotCheckPoints[count].Add(-time.Microsecond)) { + snapshotPath := filepath.Join(homeDir, snapshotPath, snapshotCheckPoints[count].Format(DateFormat)+SnapshotDirPostFix) err = createIfNotExists(snapshotPath, 0755) if err != nil { - return unfinshiedSnapshotCnt, err + return count, err } err = generateSnapshots(homeDir+LevelDbPath, snapshotPath) if err != nil { - return unfinshiedSnapshotCnt, err + return count, err } - unfinshiedSnapshotCnt-- + count++ } - if unfinshiedSnapshotCnt < 0 { - return unfinshiedSnapshotCnt, nil + if count < 0 { + return count, nil } ctx := context.Background() if filer_pb.IsEmpty(&event) { - return unfinshiedSnapshotCnt, nil + return count, nil } else if filer_pb.IsCreate(&event) { entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry) - return unfinshiedSnapshotCnt, store.InsertEntry(ctx, entry) + return count, store.InsertEntry(ctx, entry) } else if filer_pb.IsDelete(&event) { - return unfinshiedSnapshotCnt, store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)) + return count, store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)) } else if filer_pb.IsUpdate(&event) { entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry) - return unfinshiedSnapshotCnt, store.UpdateEntry(ctx, entry) + return count, store.UpdateEntry(ctx, entry) } else { if err := store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)); err != nil { - return unfinshiedSnapshotCnt, err + return count, err } - return unfinshiedSnapshotCnt, store.InsertEntry(ctx, filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)) + return count, store.InsertEntry(ctx, filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)) } - return unfinshiedSnapshotCnt, nil + return count, nil +} + +func processEntryLog(entry *filer_pb.Entry, commandEnv *CommandEnv, snapshotCount int, store *filer_leveldb.LevelDBStore, snapshotsToGenerate []time.Time, homeDirname string, snapshotPath string) (count int, err error) { + totalSize := filer.FileSize(entry) + buf := mem.Allocate(int(totalSize)) + if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return filer.ReadAll(buf, commandEnv.MasterClient, entry.GetChunks()) + }); err != nil && err != filer_pb.ErrNotFound { + return snapshotCount, err + } + idx := uint32(0) + for idx < uint32(totalSize) { + logEntrySize := util.BytesToUint32(buf[idx : idx+4]) + var logEntry filer_pb.LogEntry + err = proto.Unmarshal(buf[idx+4:idx+4+logEntrySize], &logEntry) + if err != nil { + return snapshotCount, err + } + idx = idx + 4 + logEntrySize + snapshotCount, err = processMetaDataEvents(store, logEntry.Data, snapshotCount, snapshotsToGenerate, homeDirname, snapshotPath) + if err != nil { + return snapshotCount, err + } + } + return snapshotCount, err } func generateSnapshots(scrDir, dest string) error { @@ -181,85 +204,18 @@ func createIfNotExists(dir string, perm os.FileMode) error { return nil } -func computeRequirementsFromDirectory(previousSnapshots []os.DirEntry, homeDirectory string, snapshotPath string, count int, durationDays int) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, err error) { - lastSnapshotDate, err := time.Parse(DateFormat, previousSnapshots[len(previousSnapshots)-1].Name()[:len(DateFormat)]) - if err != nil { - return snapshotsToRemove, snapshotsToGenerate, err - } - yesterday := time.Now().Add(-time.Hour * 24) - yesterdayStr := yesterday.Format(DateFormat) - // ensure snapshot start at yesterday 00:00 - yesterday, err = time.Parse(DateFormat, yesterdayStr) - if err != nil { - return snapshotsToRemove, snapshotsToGenerate, err - } - gapDays := int(yesterday.Sub(lastSnapshotDate).Hours() / 24) - // gap too small no snapshot will be generated - if gapDays < durationDays { - return snapshotsToRemove, snapshotsToGenerate, errors.New(fmt.Sprintf("last snapshot was generated at %v no need to generate new snapshots", lastSnapshotDate.Format(DateFormat))) - } else if gapDays > durationDays*count { - // gap too large generate from yesterday - // and remove all previous snapshots - _, snapshotsToGenerate, err = computeRequirementsFromEmpty(homeDirectory, count, durationDays) - for _, file := range previousSnapshots { - snapshotsToRemove = append(snapshotsToRemove, filepath.Join(homeDirectory, snapshotPath, file.Name())) - } - return - } - snapshotDate := lastSnapshotDate.AddDate(0, 0, 1*durationDays) - for snapshotDate.Before(yesterday) || snapshotDate.Equal(yesterday) { - snapshotsToGenerate = append(snapshotsToGenerate, snapshotDate) - snapshotDate = lastSnapshotDate.AddDate(0, 0, 1*durationDays) - } - totalCount := len(previousSnapshots) + len(snapshotsToGenerate) - toRemoveIdx := 0 - for toRemoveIdx < len(previousSnapshots) && totalCount-toRemoveIdx > count { - snapshotsToRemove = append(snapshotsToRemove, filepath.Join(homeDirectory, snapshotPath, previousSnapshots[toRemoveIdx].Name())) - toRemoveIdx += 1 - } - return -} - -func computeRequirementsFromEmpty(homeDirectory string, count int, durationDays int) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, err error) { - yesterday := time.Now().Add(-time.Hour * 24).Format(DateFormat) - // ensure snapshot start at yesterday 00:00 - snapshotDate, err := time.Parse(DateFormat, yesterday) - if err != nil { - return snapshotsToRemove, snapshotsToGenerate, err - } - for i := 0; i < count; i++ { - snapshotsToGenerate = append(snapshotsToGenerate, snapshotDate) - snapshotDate = snapshotDate.AddDate(0, 0, -1*durationDays) - } - return snapshotsToRemove, snapshotsToGenerate, nil -} - -// compute number of snapshot need to be generated and number of snapshots to remove from give directory. -func computeRequirements(homeDirectory string, snapshotPath string, count int, durationDays int) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, err error) { - snapshotDirectory := filepath.Join(homeDirectory, snapshotPath) - files, _ := os.ReadDir(snapshotDirectory) - if len(files) == 0 { - return computeRequirementsFromEmpty(homeDirectory, count, durationDays) - } - // sort files by name - sort.Slice(files, func(i, j int) bool { - return files[i].Name() < files[j].Name() - }) - // filter for snapshots file only - var prevSnapshotFiles []os.DirEntry - for _, file := range files { - if strings.HasSuffix(file.Name(), SnapshotDirPostFix) { - prevSnapshotFiles = append(prevSnapshotFiles, file) - } - } - return computeRequirementsFromDirectory(prevSnapshotFiles, homeDirectory, snapshotPath, count, durationDays) -} - -func setupLevelDb(levelDbPath string) (store *filer_leveldb.LevelDBStore, err error) { +func setupLevelDb(levelDbPath string, levelDbBootstrapPath string) (store *filer_leveldb.LevelDBStore, err error) { err = os.RemoveAll(levelDbPath) if err != nil { return &filer_leveldb.LevelDBStore{}, err } + if len(levelDbBootstrapPath) != 0 { + // copy the latest snapshot as starting point + err = generateSnapshots(levelDbBootstrapPath, levelDbPath) + if err != nil { + return + } + } config := SnapshotConfig{ dir: levelDbPath, } @@ -279,43 +235,36 @@ func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, if err != nil { return err } - snapshotsToRemove, snapshotsToGenerate, err := computeRequirements(homeDirname, *snapshotPath, *count, *intervalDays) + snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err := computeRequirements(homeDirname, *snapshotPath, *count, *intervalDays) if err != nil { return err } levelDbPath := filepath.Join(homeDirname, LevelDbPath) - store, err := setupLevelDb(levelDbPath) + store, err := setupLevelDb(levelDbPath, filepath.Join(homeDirname, *snapshotPath, levelDbBootstrapPath)) if err != nil { return err } - unfinishedSnapshotCnt := len(snapshotsToGenerate) - 1 + // sort to make sure we are processing ascending list of snapshots to generate + sort.Slice(snapshotsToGenerate, func(i, j int) bool { + return snapshotsToGenerate[i].Before(snapshotsToGenerate[j]) + }) changeLogPath := filer.SystemLogDir var processEntry func(entry *filer_pb.Entry, isLast bool) error + var levelDbBootstrapDate string + if levelDbBootstrapPath != "" { + levelDbBootstrapDate = levelDbBootstrapPath[:len(DateFormat)] + } + snapshotCount := 0 processEntry = func(entry *filer_pb.Entry, isLast bool) error { if entry.IsDirectory { - return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath+"/"+entry.Name), "", processEntry) - } - totalSize := filer.FileSize(entry) - buf := mem.Allocate(int(totalSize)) - if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return filer.ReadAll(buf, commandEnv.MasterClient, entry.GetChunks()) - }); err != nil && err != filer_pb.ErrNotFound { - return err - } - idx := uint32(0) - for idx < uint32(totalSize) { - logEntrySize := util.BytesToUint32(buf[idx : idx+4]) - var logEntry filer_pb.LogEntry - err = proto.Unmarshal(buf[idx+4:idx+4+logEntrySize], &logEntry) - if err != nil { - return err - } - idx = idx + 4 + logEntrySize - unfinishedSnapshotCnt, err = processMetaDataEvents(store, logEntry.Data, unfinishedSnapshotCnt, snapshotsToGenerate, homeDirname, *snapshotPath) - if err != nil { - return err + // skip logs prior to the latest previous snapshot + if entry.GetName() <= levelDbBootstrapDate { + println(entry.GetName()) + return nil } + return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath+"/"+entry.Name), "", processEntry) } + snapshotCount, err = processEntryLog(entry, commandEnv, snapshotCount, store, snapshotsToGenerate, homeDirname, *snapshotPath) return err } @@ -325,9 +274,9 @@ func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, } // edge case // there might be unfinished snapshot left over in the duration gaps. - // process meta event only triggers snapshots when there are event after the snapshot time - for unfinishedSnapshotCnt >= 0 { - generatePath := filepath.Join(homeDirname, *snapshotPath, snapshotsToGenerate[unfinishedSnapshotCnt].Format(DateFormat)) + // process meta event only triggers snapshots when there are event after the snapshot time. + for snapshotCount < len(snapshotsToGenerate) { + generatePath := filepath.Join(homeDirname, *snapshotPath, snapshotsToGenerate[snapshotCount].Format(DateFormat)) err = createIfNotExists(generatePath, 0755) if err != nil { return err @@ -336,9 +285,9 @@ func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, if err != nil { return err } - unfinishedSnapshotCnt-- + snapshotCount++ } - // remove previous snapshot if needed. + // remove previous snapshots. for _, snapshot := range snapshotsToRemove { err = os.RemoveAll(snapshot) if err != nil { diff --git a/weed/shell/command_fs_meta_snapshots_create_scheduler.go b/weed/shell/command_fs_meta_snapshots_create_scheduler.go new file mode 100644 index 000000000..962445007 --- /dev/null +++ b/weed/shell/command_fs_meta_snapshots_create_scheduler.go @@ -0,0 +1,86 @@ +package shell + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "sort" + "strings" + "time" +) + +func computeRequirementsFromDirectory(previousSnapshots []string, homeDirectory string, snapshotPath string, count int, durationDays int, targetDate time.Time) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, levelDbBootstrapPath string, err error) { + levelDbBootstrapPath = previousSnapshots[len(previousSnapshots)-1] + lastSnapshotDate, err := time.Parse(DateFormat, levelDbBootstrapPath[:len(DateFormat)]) + if err != nil { + return + } + if err != nil { + return + } + // the snapshots cover the last nanosecond of the current date + lastSnapshotDate = lastSnapshotDate.AddDate(0, 0, 1).Add(-1 * time.Nanosecond) + gapDuration := targetDate.Sub(lastSnapshotDate) + oneSnapshotInterval := 24 * time.Hour * time.Duration(durationDays) + totalSnapshotsInterval := 24 * time.Hour * time.Duration(durationDays*count) + // gap too small no snapshot will be generated + if gapDuration < oneSnapshotInterval { + return snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, errors.New(fmt.Sprintf("last snapshot was generated at %v no need to generate new snapshots", lastSnapshotDate.Format(DateFormat))) + } else if gapDuration > totalSnapshotsInterval { + // gap too large generate from targetDate + // and remove all previous snapshots + _, snapshotsToGenerate, _, err = computeRequirementsFromEmpty(count, durationDays, targetDate) + for _, file := range previousSnapshots { + snapshotsToRemove = append(snapshotsToRemove, filepath.Join(homeDirectory, snapshotPath, file)) + } + return + } + snapshotDate := lastSnapshotDate.AddDate(0, 0, durationDays) + for snapshotDate.Before(targetDate) || snapshotDate.Equal(targetDate) { + snapshotsToGenerate = append(snapshotsToGenerate, snapshotDate) + snapshotDate = snapshotDate.AddDate(0, 0, durationDays) + } + totalCount := len(previousSnapshots) + len(snapshotsToGenerate) + toRemoveIdx := 0 + for toRemoveIdx < len(previousSnapshots) && totalCount-toRemoveIdx > count { + snapshotsToRemove = append(snapshotsToRemove, filepath.Join(homeDirectory, snapshotPath, previousSnapshots[toRemoveIdx])) + toRemoveIdx += 1 + } + return +} + +func computeRequirementsFromEmpty(count int, durationDays int, targetDate time.Time) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, levelDbBootstrapPath string, err error) { + snapshotDate := targetDate + for i := 0; i < count; i++ { + snapshotsToGenerate = append(snapshotsToGenerate, snapshotDate) + snapshotDate = snapshotDate.AddDate(0, 0, -1*durationDays) + } + return snapshotsToRemove, snapshotsToGenerate, "", nil +} + +// compute number of snapshot need to be generated and number of snapshots to remove from give directory. +func computeRequirements(homeDirectory string, snapshotPath string, count int, durationDays int) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, levelDbBootstrapPath string, err error) { + snapshotDirectory := filepath.Join(homeDirectory, snapshotPath) + files, _ := os.ReadDir(snapshotDirectory) + // sort files by name + sort.Slice(files, func(i, j int) bool { + return files[i].Name() < files[j].Name() + }) + // filter for snapshots file name only + var prevSnapshotFiles []string + for _, file := range files { + if strings.HasSuffix(file.Name(), SnapshotDirPostFix) { + prevSnapshotFiles = append(prevSnapshotFiles, file.Name()) + } + } + curDate := time.Now() + curDateStr := curDate.Format(DateFormat) + // ensure snapshot start at today 00:00 - 1ns + today, err := time.Parse(DateFormat, curDateStr) + targetDate := today.Add(-1 * time.Nanosecond) + if len(prevSnapshotFiles) == 0 { + return computeRequirementsFromEmpty(count, durationDays, targetDate) + } + return computeRequirementsFromDirectory(prevSnapshotFiles, homeDirectory, snapshotPath, count, durationDays, targetDate) +} diff --git a/weed/shell/command_fs_meta_snapshots_create_scheduler_test.go b/weed/shell/command_fs_meta_snapshots_create_scheduler_test.go new file mode 100644 index 000000000..9a2159aa6 --- /dev/null +++ b/weed/shell/command_fs_meta_snapshots_create_scheduler_test.go @@ -0,0 +1,64 @@ +package shell + +import ( + "github.com/stretchr/testify/assert" + "path/filepath" + "reflect" + "testing" + "time" +) + +func TestComputeRequirementsFromDirectory(t *testing.T) { + homeDir := "home" + snapDir := "test" + // case1: we have previous snapshots, target date is relative close to the latest snapshot date, we will use previous snapshots to generate future snapshots and remove some previous snapshots. + var prevSnapshots []string + prevSnapshots = append(prevSnapshots, "2022-01-01") + prevSnapshots = append(prevSnapshots, "2022-02-01") + prevSnapshots = append(prevSnapshots, "2022-03-01") + targetDate, _ := time.Parse(DateFormat, "2022-04-01") + targetDate = targetDate.Add(-1 * time.Nanosecond) + snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err := computeRequirementsFromDirectory(prevSnapshots, homeDir, snapDir, 3, 15, targetDate) + assert.Nil(t, err) + assert.Equal(t, levelDbBootstrapPath, "2022-03-01", "latest previous snapshot should be 2022-03-01") + expectedToRemoveSnapshots := []string{filepath.Join(homeDir, snapDir, "2022-01-01"), filepath.Join(homeDir, snapDir, "2022-02-01")} + assert.True(t, reflect.DeepEqual(snapshotsToRemove, expectedToRemoveSnapshots)) + expectedSnapshotsGenerationDate := []string{"2022-03-16", "2022-03-31"} + for i, date := range expectedSnapshotsGenerationDate { + assert.Equal(t, snapshotsToGenerate[i].Format(DateFormat), date) + } + + // case2: we have previous snapshots, target date is too close to the latest snapshot date, no change will happen. + targetDate, _ = time.Parse(DateFormat, "2022-03-02") + snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err = computeRequirementsFromDirectory(prevSnapshots, "home", "test", 3, 15, targetDate) + assert.NotNil(t, err) + assert.Containsf(t, err.Error(), "no need to generate new snapshots", "expected error containing %q, got %s", "no need to generate new snapshots", err) + assert.Empty(t, snapshotsToRemove) + assert.Empty(t, snapshotsToGenerate) + assert.Equal(t, levelDbBootstrapPath, "2022-03-01", "latest previous snapshot should be 2022-03-01") + + // case3: we have previous snapshots, target date is too far out to the latest snapshot date, all previous snapshots will be removed, snapshots will be generated going backward starting on target date. + targetDate, _ = time.Parse(DateFormat, "2022-12-01") + targetDate = targetDate.Add(-1 * time.Nanosecond) + snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err = computeRequirementsFromDirectory(prevSnapshots, "home", "test", 3, 15, targetDate) + assert.Nil(t, err) + expectedToRemoveSnapshots = []string{filepath.Join(homeDir, snapDir, "2022-01-01"), filepath.Join(homeDir, snapDir, "2022-02-01"), filepath.Join(homeDir, snapDir, "2022-03-01")} + assert.True(t, reflect.DeepEqual(snapshotsToRemove, expectedToRemoveSnapshots)) + // we still need to skip all logs prior to 2022-03-02 + assert.Equal(t, levelDbBootstrapPath, "2022-03-01", "latest previous snapshot should be 2022-03-01") + expectedSnapshotsGenerationDate = []string{"2022-11-30", "2022-11-15", "2022-10-31"} + for i, date := range expectedSnapshotsGenerationDate { + assert.Equal(t, snapshotsToGenerate[i].Format(DateFormat), date) + } + + // case4: the target date is exactly n snapshots away from previous snapshot date + targetDate, _ = time.Parse(DateFormat, "2022-03-04") + targetDate = targetDate.Add(-1 * time.Nanosecond) + snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err = computeRequirementsFromDirectory(prevSnapshots, "home", "test", 3, 1, targetDate) + expectedToRemoveSnapshots = []string{filepath.Join(homeDir, snapDir, "2022-01-01"), filepath.Join(homeDir, snapDir, "2022-02-01")} + assert.True(t, reflect.DeepEqual(snapshotsToRemove, expectedToRemoveSnapshots)) + expectedSnapshotsGenerationDate = []string{"2022-03-02", "2022-03-03"} + for i, date := range expectedSnapshotsGenerationDate { + assert.Equal(t, snapshotsToGenerate[i].Format(DateFormat), date) + } +}