From 73840fb076eac4c63dae30494dcd4cc9967bb8fc Mon Sep 17 00:00:00 2001 From: root Date: Thu, 15 Dec 2022 22:39:30 -0800 Subject: [PATCH] cover edge cases --- .../shell/command_fs_meta_snapshots_create.go | 52 ++++++++++++------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/weed/shell/command_fs_meta_snapshots_create.go b/weed/shell/command_fs_meta_snapshots_create.go index 9cf1c3a75..e3f95f3a6 100644 --- a/weed/shell/command_fs_meta_snapshots_create.go +++ b/weed/shell/command_fs_meta_snapshots_create.go @@ -63,47 +63,46 @@ func (c *commandFsMetaSnapshotsCreate) Help() string { ` } -func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, snapshotCheckPoints []time.Time, homeDir string, snapshotPath string) (err error) { +func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, unfinshiedSnapshotCnt int, snapshotCheckPoints []time.Time, homeDir string, snapshotPath string) (todoCnt int, err error) { var event filer_pb.SubscribeMetadataResponse err = proto.Unmarshal(data, &event) if err != nil { - return err + return unfinshiedSnapshotCnt, err } eventTime := event.TsNs - snapshotCnt := len(snapshotCheckPoints) - 1 - for snapshotCnt >= 0 && time.Unix(0, eventTime).After(snapshotCheckPoints[snapshotCnt]) { - snapshotPath := homeDir + snapshotPath + snapshotCheckPoints[snapshotCnt].Format(DateFormat) + for unfinshiedSnapshotCnt >= 0 && time.Unix(0, eventTime).After(snapshotCheckPoints[unfinshiedSnapshotCnt]) { + snapshotPath := filepath.Join(homeDir, snapshotPath, snapshotCheckPoints[unfinshiedSnapshotCnt].Format(DateFormat)) err = CreateIfNotExists(snapshotPath, 0755) if err != nil { - return err + return unfinshiedSnapshotCnt, err } err = generateSnapshots(homeDir+LevelDbPath, snapshotPath) if err != nil { - return err + return unfinshiedSnapshotCnt, err } - snapshotCnt++ + unfinshiedSnapshotCnt-- } - if snapshotCnt == len(snapshotCheckPoints) { - return nil + if unfinshiedSnapshotCnt < 0 { + return unfinshiedSnapshotCnt, nil } ctx := context.Background() if filer_pb.IsEmpty(&event) { - return nil + return unfinshiedSnapshotCnt, nil } else if filer_pb.IsCreate(&event) { entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry) - return store.InsertEntry(ctx, entry) + return unfinshiedSnapshotCnt, store.InsertEntry(ctx, entry) } else if filer_pb.IsDelete(&event) { - return store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)) + return unfinshiedSnapshotCnt, store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)) } else if filer_pb.IsUpdate(&event) { entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry) - return store.UpdateEntry(ctx, entry) + return unfinshiedSnapshotCnt, store.UpdateEntry(ctx, entry) } else { if err := store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)); err != nil { - return err + return unfinshiedSnapshotCnt, err } - return store.InsertEntry(ctx, filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)) + return unfinshiedSnapshotCnt, store.InsertEntry(ctx, filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)) } - return nil + return unfinshiedSnapshotCnt, nil } func generateSnapshots(scrDir, dest string) error { @@ -198,6 +197,7 @@ func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, snapshotCheckPoints = append(snapshotCheckPoints, snapshotDate) snapshotDate = snapshotDate.AddDate(0, 0, -1**snapshotInterval) } + homeDirname, err := os.UserHomeDir() if err != nil { return err @@ -212,6 +212,7 @@ func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, dir: levelDbPath, } store.Initialize(config, "") + unfinishedSnapshotCnt := len(snapshotCheckPoints) - 1 changeLogPath := filer.SystemLogDir var processEntry func(entry *filer_pb.Entry, isLast bool) error processEntry = func(entry *filer_pb.Entry, isLast bool) error { @@ -234,10 +235,25 @@ func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, return err } idx = idx + 4 + logEntrySize - err = processMetaDataEvents(store, logEntry.Data, snapshotCheckPoints, homeDirname, *snapshotPath) + unfinishedSnapshotCnt, err = processMetaDataEvents(store, logEntry.Data, unfinishedSnapshotCnt, snapshotCheckPoints, homeDirname, *snapshotPath) + if err != nil { + return err + } + } + // edge case + // there might be unfinished snapshot left over in the duration gaps. + // process meta event only triggers snapshots when there are event after the snapshot time + for unfinishedSnapshotCnt >= 0 { + generatePath := filepath.Join(homeDirname, *snapshotPath, snapshotCheckPoints[unfinishedSnapshotCnt].Format(DateFormat)) + err = CreateIfNotExists(generatePath, 0755) + if err != nil { + return err + } + err = generateSnapshots(levelDbPath, generatePath) if err != nil { return err } + unfinishedSnapshotCnt-- } return nil