Browse Source

ADHOC: take snapshots of metadata

pull/4038/head
root 2 years ago
parent
commit
3dd482d3c2
  1. 5
      weed/filer/leveldb/leveldb_store.go
  2. 223
      weed/shell/command_fs_meta_snapshots_create.go

5
weed/filer/leveldb/leveldb_store.go

@ -43,6 +43,11 @@ func (store *LevelDBStore) Initialize(configuration weed_util.Configuration, pre
return store.initialize(dir)
}
// For adhoc usage
func (store *LevelDBStore) CustomInitialize(dir string) (err error) {
return store.initialize(dir)
}
func (store *LevelDBStore) initialize(dir string) (err error) {
glog.Infof("filer store dir: %s", dir)
os.MkdirAll(dir, 0755)

223
weed/shell/command_fs_meta_snapshots_create.go

@ -1,16 +1,25 @@
package shell
import (
"context"
"flag"
"fmt"
cp "github.com/otiai10/copy"
"github.com/seaweedfs/seaweedfs/weed/filer"
filer_leveldb "github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"google.golang.org/protobuf/proto"
"io"
"os"
"path/filepath"
"time"
)
const LevelDbPath = "/tmp/snapshots.db"
const SnapshotsParentPath = "/seaweedfs_meta_snapshots/"
const DateFormat = "2006-01-02"
func init() {
Commands = append(Commands, &commandFsMetaSnapshotsCreate{})
}
@ -25,62 +34,200 @@ func (c *commandFsMetaSnapshotsCreate) Name() string {
func (c *commandFsMetaSnapshotsCreate) Help() string {
return `create snapshots of meta data from given time range.
fs.meta.snapshots.create -s yyyy-mm-dd -e yyyy-mm-dd -o
fs.meta.snapshots.create -s=2022-08-09 -e=2022-10-12 -f 7 create snapshot starting from 2022-08-09 ending at 2022-10-12 every seven days.
//These snapshot maybe later used to backup the system to certain timestamp.
`
}
func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
db, err := leveldb.OpenFile("/root/go/repos/seaweedfs/weed/snapshots.db", nil)
func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, snapshotCnt int, snapshotCheckPoints []time.Time, homeDir string) (err error) {
var event filer_pb.SubscribeMetadataResponse
err = proto.Unmarshal(data, &event)
if err != nil {
return err
}
defer db.Close()
fsMetaSnapshotsCreateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
if err = fsMetaSnapshotsCreateCommand.Parse(args); err != nil {
fmt.Printf("%+v\n", event)
eventTime := event.TsNs
println(time.Unix(0, eventTime).Format(DateFormat))
for snapshotCnt < len(snapshotCheckPoints) && time.Unix(0, eventTime).After(snapshotCheckPoints[snapshotCnt]) {
snapshotPath := homeDir + SnapshotsParentPath + snapshotCheckPoints[snapshotCnt].Format(DateFormat)
err = CreateIfNotExists(snapshotPath, 0755)
if err != nil {
return err
}
println("generating snapshots of metadata at: " + snapshotPath)
err = generateSnapshots(homeDir+LevelDbPath, snapshotPath)
if err != nil {
return err
}
snapshotCnt++
}
if snapshotCnt == len(snapshotCheckPoints) {
return nil
}
ctx := context.Background()
if filer_pb.IsEmpty(&event) {
return nil
} else if filer_pb.IsCreate(&event) {
println("+", util.FullPath(event.EventNotification.NewParentPath).Child(event.EventNotification.NewEntry.Name))
entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
return store.InsertEntry(ctx, entry)
} else if filer_pb.IsDelete(&event) {
println("-", util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name))
return store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name))
} else if filer_pb.IsUpdate(&event) {
println("~", util.FullPath(event.EventNotification.NewParentPath).Child(event.EventNotification.NewEntry.Name))
entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
return store.UpdateEntry(ctx, entry)
} else {
// renaming
println("-", util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name))
if err := store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)); err != nil {
return err
}
println("+", util.FullPath(event.EventNotification.NewParentPath).Child(event.EventNotification.NewEntry.Name))
return store.InsertEntry(ctx, filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry))
}
return nil
}
func generateSnapshots(scrDir, dest string) error {
entries, err := os.ReadDir(scrDir)
if err != nil {
return err
}
for _, entry := range entries {
sourcePath := filepath.Join(scrDir, entry.Name())
destPath := filepath.Join(dest, entry.Name())
path := filer.SystemLogDir
count := 0
err = filer_pb.TraverseBfs(commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) {
if !entry.IsDirectory {
fmt.Println("%+v", entry)
err = db.Put([]byte(entry.GetName()), entry.Content, nil)
count++
if count%2 == 0 {
snapshotName := fmt.Sprintf("/root/go/repos/seaweedfs/weed/snapshots-%d.db", count/2)
e := cp.Copy("/root/go/repos/seaweedfs/weed/snapshots.db", snapshotName)
println(snapshotName)
if e != nil {
println("failed to generate " + snapshotName + e.Error())
}
fileInfo, err := os.Stat(sourcePath)
if err != nil {
return err
}
switch fileInfo.Mode() & os.ModeType {
case os.ModeDir:
if err := CreateIfNotExists(destPath, 0755); err != nil {
return err
}
if err := generateSnapshots(sourcePath, destPath); err != nil {
return err
}
default:
if err := Copy(sourcePath, destPath); err != nil {
return err
}
}
})
verifying, err := leveldb.OpenFile("/root/go/repos/seaweedfs/weed/snapshots-1.db", nil)
}
return nil
}
func Copy(srcFile, dstFile string) error {
out, err := os.Create(dstFile)
if err != nil {
return err
}
defer out.Close()
in, err := os.Open(srcFile)
defer in.Close()
if err != nil {
return err
}
_, err = io.Copy(out, in)
if err != nil {
return err
}
return nil
}
func Exists(filePath string) bool {
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return false
}
return true
}
func CreateIfNotExists(dir string, perm os.FileMode) error {
if Exists(dir) {
return nil
}
if err := os.MkdirAll(dir, perm); err != nil {
return fmt.Errorf("failed to create directory: '%s', error: '%s'", dir, err.Error())
}
return nil
}
func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fsMetaSnapshotsCreateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
start := fsMetaSnapshotsCreateCommand.String("s", "", "start date of metadata compaction in yyyy-mm-dd format")
end := fsMetaSnapshotsCreateCommand.String("e", "", "end date of metadata compaction in yyyy-mm-dd format")
frequency := fsMetaSnapshotsCreateCommand.Int("f", 7, "the frequency of generating the metadata snapshots in days")
if err = fsMetaSnapshotsCreateCommand.Parse(args); err != nil {
return err
}
startDate, err := time.Parse(DateFormat, *start)
if err != nil {
return err
}
endDate, err := time.Parse(DateFormat, *end)
if err != nil {
return err
}
iter := verifying.NewIterator(nil, nil)
for iter.Next() {
key := iter.Key()
fmt.Println("1 ---->")
fmt.Println(string(key))
var snapshotCheckPoints []time.Time
for startDate.Before(endDate) || startDate.Equal(endDate) {
snapshotCheckPoints = append(snapshotCheckPoints, startDate)
startDate = startDate.AddDate(0, 0, *frequency)
}
snapshotCnt := 0
homeDirname, err := os.UserHomeDir()
if err != nil {
return err
}
verifying.Close()
verifying, err = leveldb.OpenFile("/root/go/repos/seaweedfs/weed/snapshots-4.db", nil)
levelDbPath := homeDirname + LevelDbPath
err = os.RemoveAll(levelDbPath)
if err != nil {
return err
}
iter = verifying.NewIterator(nil, nil)
for iter.Next() {
key := iter.Key()
fmt.Println("4 ---->")
fmt.Println(string(key))
store := &filer_leveldb.LevelDBStore{}
store.CustomInitialize(levelDbPath)
path := filer.SystemLogDir
var processEntry func(entry *filer_pb.Entry, isLast bool) error
processEntry = func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory {
return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(path+"/"+entry.Name), "", processEntry)
}
totalSize := filer.FileSize(entry)
buf := mem.Allocate(int(totalSize))
if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.ReadAll(buf, commandEnv.MasterClient, entry.GetChunks())
}); err != nil && err != filer_pb.ErrNotFound {
return err
}
idx := uint32(0)
for idx < uint32(totalSize) {
logEntrySize := util.BytesToUint32(buf[idx : idx+4])
var logEntry filer_pb.LogEntry
err = proto.Unmarshal(buf[idx+4:idx+4+logEntrySize], &logEntry)
if err != nil {
return err
}
idx = idx + 4 + logEntrySize
err = processMetaDataEvents(store, logEntry.Data, snapshotCnt, snapshotCheckPoints, homeDirname)
if err != nil {
return err
}
}
return nil
}
verifying.Close()
err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(path), "", processEntry)
return err
}
Loading…
Cancel
Save