diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go index 747d1104d..08dc33f02 100644 --- a/weed/filer/leveldb/leveldb_store.go +++ b/weed/filer/leveldb/leveldb_store.go @@ -43,6 +43,11 @@ func (store *LevelDBStore) Initialize(configuration weed_util.Configuration, pre return store.initialize(dir) } +// For adhoc usage +func (store *LevelDBStore) CustomInitialize(dir string) (err error) { + return store.initialize(dir) +} + func (store *LevelDBStore) initialize(dir string) (err error) { glog.Infof("filer store dir: %s", dir) os.MkdirAll(dir, 0755) diff --git a/weed/shell/command_fs_meta_snapshots_create.go b/weed/shell/command_fs_meta_snapshots_create.go index 1a58dbf81..adc07e804 100644 --- a/weed/shell/command_fs_meta_snapshots_create.go +++ b/weed/shell/command_fs_meta_snapshots_create.go @@ -1,16 +1,25 @@ package shell import ( + "context" "flag" "fmt" - cp "github.com/otiai10/copy" "github.com/seaweedfs/seaweedfs/weed/filer" + filer_leveldb "github.com/seaweedfs/seaweedfs/weed/filer/leveldb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/syndtr/goleveldb/leveldb" + "github.com/seaweedfs/seaweedfs/weed/util/mem" + "google.golang.org/protobuf/proto" "io" + "os" + "path/filepath" + "time" ) +const LevelDbPath = "/tmp/snapshots.db" +const SnapshotsParentPath = "/seaweedfs_meta_snapshots/" +const DateFormat = "2006-01-02" + func init() { Commands = append(Commands, &commandFsMetaSnapshotsCreate{}) } @@ -25,62 +34,200 @@ func (c *commandFsMetaSnapshotsCreate) Name() string { func (c *commandFsMetaSnapshotsCreate) Help() string { return `create snapshots of meta data from given time range. - fs.meta.snapshots.create -s yyyy-mm-dd -e yyyy-mm-dd -o + fs.meta.snapshots.create -s=2022-08-09 -e=2022-10-12 -f 7 create snapshot starting from 2022-08-09 ending at 2022-10-12 every seven days. //These snapshot maybe later used to backup the system to certain timestamp. - ` } -func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - db, err := leveldb.OpenFile("/root/go/repos/seaweedfs/weed/snapshots.db", nil) +func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, snapshotCnt int, snapshotCheckPoints []time.Time, homeDir string) (err error) { + var event filer_pb.SubscribeMetadataResponse + err = proto.Unmarshal(data, &event) if err != nil { return err } - defer db.Close() - fsMetaSnapshotsCreateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) - if err = fsMetaSnapshotsCreateCommand.Parse(args); err != nil { + fmt.Printf("%+v\n", event) + eventTime := event.TsNs + println(time.Unix(0, eventTime).Format(DateFormat)) + for snapshotCnt < len(snapshotCheckPoints) && time.Unix(0, eventTime).After(snapshotCheckPoints[snapshotCnt]) { + snapshotPath := homeDir + SnapshotsParentPath + snapshotCheckPoints[snapshotCnt].Format(DateFormat) + err = CreateIfNotExists(snapshotPath, 0755) + if err != nil { + return err + } + println("generating snapshots of metadata at: " + snapshotPath) + err = generateSnapshots(homeDir+LevelDbPath, snapshotPath) + if err != nil { + return err + } + snapshotCnt++ + } + if snapshotCnt == len(snapshotCheckPoints) { + return nil + } + ctx := context.Background() + if filer_pb.IsEmpty(&event) { + return nil + } else if filer_pb.IsCreate(&event) { + println("+", util.FullPath(event.EventNotification.NewParentPath).Child(event.EventNotification.NewEntry.Name)) + entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry) + return store.InsertEntry(ctx, entry) + } else if filer_pb.IsDelete(&event) { + println("-", util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)) + return store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)) + } else if filer_pb.IsUpdate(&event) { + println("~", util.FullPath(event.EventNotification.NewParentPath).Child(event.EventNotification.NewEntry.Name)) + entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry) + return store.UpdateEntry(ctx, entry) + } else { + // renaming + println("-", util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)) + if err := store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)); err != nil { + return err + } + println("+", util.FullPath(event.EventNotification.NewParentPath).Child(event.EventNotification.NewEntry.Name)) + return store.InsertEntry(ctx, filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)) + } + return nil +} + +func generateSnapshots(scrDir, dest string) error { + entries, err := os.ReadDir(scrDir) + if err != nil { return err } + for _, entry := range entries { + sourcePath := filepath.Join(scrDir, entry.Name()) + destPath := filepath.Join(dest, entry.Name()) - path := filer.SystemLogDir - count := 0 - err = filer_pb.TraverseBfs(commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { - if !entry.IsDirectory { - fmt.Println("%+v", entry) - err = db.Put([]byte(entry.GetName()), entry.Content, nil) - count++ - if count%2 == 0 { - snapshotName := fmt.Sprintf("/root/go/repos/seaweedfs/weed/snapshots-%d.db", count/2) - e := cp.Copy("/root/go/repos/seaweedfs/weed/snapshots.db", snapshotName) - println(snapshotName) - if e != nil { - println("failed to generate " + snapshotName + e.Error()) - } + fileInfo, err := os.Stat(sourcePath) + if err != nil { + return err + } + + switch fileInfo.Mode() & os.ModeType { + case os.ModeDir: + if err := CreateIfNotExists(destPath, 0755); err != nil { + return err + } + if err := generateSnapshots(sourcePath, destPath); err != nil { + return err + } + default: + if err := Copy(sourcePath, destPath); err != nil { + return err } } - }) - verifying, err := leveldb.OpenFile("/root/go/repos/seaweedfs/weed/snapshots-1.db", nil) + } + return nil +} + +func Copy(srcFile, dstFile string) error { + out, err := os.Create(dstFile) + if err != nil { + return err + } + + defer out.Close() + + in, err := os.Open(srcFile) + defer in.Close() + if err != nil { + return err + } + + _, err = io.Copy(out, in) + if err != nil { + return err + } + + return nil +} + +func Exists(filePath string) bool { + if _, err := os.Stat(filePath); os.IsNotExist(err) { + return false + } + + return true +} + +func CreateIfNotExists(dir string, perm os.FileMode) error { + if Exists(dir) { + return nil + } + + if err := os.MkdirAll(dir, perm); err != nil { + return fmt.Errorf("failed to create directory: '%s', error: '%s'", dir, err.Error()) + } + + return nil +} + +func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + fsMetaSnapshotsCreateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + start := fsMetaSnapshotsCreateCommand.String("s", "", "start date of metadata compaction in yyyy-mm-dd format") + end := fsMetaSnapshotsCreateCommand.String("e", "", "end date of metadata compaction in yyyy-mm-dd format") + frequency := fsMetaSnapshotsCreateCommand.Int("f", 7, "the frequency of generating the metadata snapshots in days") + if err = fsMetaSnapshotsCreateCommand.Parse(args); err != nil { + return err + } + startDate, err := time.Parse(DateFormat, *start) + if err != nil { + return err + } + endDate, err := time.Parse(DateFormat, *end) if err != nil { return err } - iter := verifying.NewIterator(nil, nil) - for iter.Next() { - key := iter.Key() - fmt.Println("1 ---->") - fmt.Println(string(key)) + var snapshotCheckPoints []time.Time + for startDate.Before(endDate) || startDate.Equal(endDate) { + snapshotCheckPoints = append(snapshotCheckPoints, startDate) + startDate = startDate.AddDate(0, 0, *frequency) + } + snapshotCnt := 0 + homeDirname, err := os.UserHomeDir() + if err != nil { + return err } - verifying.Close() - verifying, err = leveldb.OpenFile("/root/go/repos/seaweedfs/weed/snapshots-4.db", nil) + levelDbPath := homeDirname + LevelDbPath + err = os.RemoveAll(levelDbPath) if err != nil { return err } - iter = verifying.NewIterator(nil, nil) - for iter.Next() { - key := iter.Key() - fmt.Println("4 ---->") - fmt.Println(string(key)) + store := &filer_leveldb.LevelDBStore{} + store.CustomInitialize(levelDbPath) + path := filer.SystemLogDir + var processEntry func(entry *filer_pb.Entry, isLast bool) error + processEntry = func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory { + return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(path+"/"+entry.Name), "", processEntry) + } + totalSize := filer.FileSize(entry) + buf := mem.Allocate(int(totalSize)) + if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return filer.ReadAll(buf, commandEnv.MasterClient, entry.GetChunks()) + }); err != nil && err != filer_pb.ErrNotFound { + return err + } + idx := uint32(0) + for idx < uint32(totalSize) { + logEntrySize := util.BytesToUint32(buf[idx : idx+4]) + var logEntry filer_pb.LogEntry + err = proto.Unmarshal(buf[idx+4:idx+4+logEntrySize], &logEntry) + if err != nil { + return err + } + idx = idx + 4 + logEntrySize + err = processMetaDataEvents(store, logEntry.Data, snapshotCnt, snapshotCheckPoints, homeDirname) + if err != nil { + return err + } + } + + return nil } - verifying.Close() + + err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(path), "", processEntry) return err }