Browse Source

Merge b646d386ae into bd419fda51

pull/4038/merge
Lanqing Yang 5 days ago
committed by GitHub
parent
commit
fe4885aca0
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 302
      weed/shell/command_fs_meta_snapshots_create.go
  2. 86
      weed/shell/command_fs_meta_snapshots_create_scheduler.go
  3. 64
      weed/shell/command_fs_meta_snapshots_create_scheduler_test.go

302
weed/shell/command_fs_meta_snapshots_create.go

@ -0,0 +1,302 @@
package shell
import (
"context"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
filer_leveldb "github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"google.golang.org/protobuf/proto"
"io"
"os"
"path/filepath"
"sort"
"time"
)
const LevelDbPath = "snapshots.db"
const DateFormat = "2006-01-02"
const SnapshotDirPostFix = "-snapshot"
func init() {
Commands = append(Commands, &commandFsMetaSnapshotsCreate{})
}
type commandFsMetaSnapshotsCreate struct {
}
func (c *commandFsMetaSnapshotsCreate) Name() string {
return "fs.meta.snapshots.create"
}
type SnapshotConfig struct {
dir string
}
func (c SnapshotConfig) GetString(key string) string {
return c.dir
}
func (c SnapshotConfig) GetBool(key string) bool {
panic("implement me")
}
func (c SnapshotConfig) GetInt(key string) int {
panic("implement me")
}
func (c SnapshotConfig) GetStringSlice(key string) []string {
panic("implement me")
}
func (c SnapshotConfig) SetDefault(key string, value interface{}) {
panic("implement me")
}
func (c *commandFsMetaSnapshotsCreate) Help() string {
return `create snapshots of meta data from given time range.
fs.meta.snapshots.create -interval-days=7 -count=3 -path=/your/path
// fs.meta.snapshots.create will generate desired number of snapshots with desired duration interval from yesterday the generated files will be saved from input path.
// These snapshot maybe later used to backup the system to certain timestamp.
// path input is relative to home directory.
`
}
func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, count int, snapshotCheckPoints []time.Time, homeDir string, snapshotPath string) (SnapshotCount int, err error) {
var event filer_pb.SubscribeMetadataResponse
err = proto.Unmarshal(data, &event)
if err != nil {
return count, err
}
eventTime := event.TsNs
for count < len(snapshotCheckPoints) && time.Unix(0, eventTime).After(snapshotCheckPoints[count].Add(-time.Microsecond)) {
snapshotPath := filepath.Join(homeDir, snapshotPath, snapshotCheckPoints[count].Format(DateFormat)+SnapshotDirPostFix)
err = createIfNotExists(snapshotPath, 0755)
if err != nil {
return count, err
}
err = generateSnapshots(filepath.Join(homeDir, LevelDbPath), snapshotPath)
if err != nil {
return count, err
}
count++
}
if count < 0 {
return count, nil
}
ctx := context.Background()
if filer_pb.IsEmpty(&event) {
return count, nil
} else if filer_pb.IsCreate(&event) {
entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
return count, store.InsertEntry(ctx, entry)
} else if filer_pb.IsDelete(&event) {
return count, store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name))
} else if filer_pb.IsUpdate(&event) {
entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
return count, store.UpdateEntry(ctx, entry)
} else {
if err := store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)); err != nil {
return count, err
}
return count, store.InsertEntry(ctx, filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry))
}
return count, nil
}
func processEntryLog(entry *filer_pb.Entry, commandEnv *CommandEnv, snapshotCount int, store *filer_leveldb.LevelDBStore, snapshotsToGenerate []time.Time, homeDirname string, snapshotPath string) (count int, err error) {
totalSize := filer.FileSize(entry)
buf := mem.Allocate(int(totalSize))
if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.ReadAll(buf, commandEnv.MasterClient, entry.GetChunks())
}); err != nil && err != filer_pb.ErrNotFound {
return snapshotCount, err
}
idx := uint32(0)
for idx < uint32(totalSize) {
logEntrySize := util.BytesToUint32(buf[idx : idx+4])
var logEntry filer_pb.LogEntry
err = proto.Unmarshal(buf[idx+4:idx+4+logEntrySize], &logEntry)
if err != nil {
return snapshotCount, err
}
idx = idx + 4 + logEntrySize
snapshotCount, err = processMetaDataEvents(store, logEntry.Data, snapshotCount, snapshotsToGenerate, homeDirname, snapshotPath)
if err != nil {
return snapshotCount, err
}
}
return snapshotCount, err
}
func generateSnapshots(scrDir, dest string) error {
entries, err := os.ReadDir(scrDir)
if err := createIfNotExists(dest, 0755); err != nil {
return err
}
if err != nil {
return err
}
for _, entry := range entries {
sourcePath := filepath.Join(scrDir, entry.Name())
destPath := filepath.Join(dest, entry.Name())
fileInfo, err := os.Stat(sourcePath)
if err != nil {
return err
}
switch fileInfo.Mode() & os.ModeType {
case os.ModeDir:
if err := createIfNotExists(destPath, 0755); err != nil {
return err
}
if err := generateSnapshots(sourcePath, destPath); err != nil {
return err
}
default:
if err := copy(sourcePath, destPath); err != nil {
return err
}
}
}
return nil
}
func copy(srcFile, dstFile string) error {
out, err := os.Create(dstFile)
if err != nil {
return err
}
defer out.Close()
in, err := os.Open(srcFile)
defer in.Close()
if err != nil {
return err
}
_, err = io.Copy(out, in)
if err != nil {
return err
}
return nil
}
func exists(filePath string) bool {
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return false
}
return true
}
func createIfNotExists(dir string, perm os.FileMode) error {
if exists(dir) {
return nil
}
if err := os.MkdirAll(dir, perm); err != nil {
return fmt.Errorf("failed to create directory: '%s', error: '%s'", dir, err.Error())
}
return nil
}
func setupLevelDb(levelDbPath string, levelDbBootstrapPath string) (store *filer_leveldb.LevelDBStore, err error) {
store = &filer_leveldb.LevelDBStore{}
err = os.RemoveAll(levelDbPath)
if err != nil {
return
}
if len(levelDbBootstrapPath) != 0 {
// copy the latest snapshot as starting point
err = generateSnapshots(levelDbBootstrapPath, levelDbPath)
if err != nil {
return
}
}
config := SnapshotConfig{
dir: levelDbPath,
}
store.Initialize(config, "")
return
}
func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv, _writer io.Writer) (err error) {
fsMetaSnapshotsCreateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
snapshotPath := fsMetaSnapshotsCreateCommand.String("path", "", "the path to store generated snapshot files")
count := fsMetaSnapshotsCreateCommand.Int("count", 3, "number of snapshots generated")
intervalDays := fsMetaSnapshotsCreateCommand.Int("interval-days", 7, "the duration interval between each generated snapshot")
if err = fsMetaSnapshotsCreateCommand.Parse(args); err != nil {
return err
}
homeDirname, err := os.UserHomeDir()
if err != nil {
return err
}
snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err := computeRequirements(homeDirname, *snapshotPath, *count, *intervalDays)
if err != nil {
return err
}
levelDbPath := filepath.Join(homeDirname, LevelDbPath)
store, err := setupLevelDb(levelDbPath, filepath.Join(homeDirname, *snapshotPath, levelDbBootstrapPath))
if err != nil {
return err
}
// sort to make sure we are processing ascending list of snapshots to generate
sort.Slice(snapshotsToGenerate, func(i, j int) bool {
return snapshotsToGenerate[i].Before(snapshotsToGenerate[j])
})
changeLogPath := filer.SystemLogDir
var processEntry func(entry *filer_pb.Entry, isLast bool) error
var levelDbBootstrapDate string
if levelDbBootstrapPath != "" {
levelDbBootstrapDate = levelDbBootstrapPath[:len(DateFormat)]
}
snapshotCount := 0
processEntry = func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory {
// skip logs prior to the latest previous snapshot
if levelDbBootstrapDate != "" && entry.GetName() <= levelDbBootstrapDate {
return nil
}
return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath+"/"+entry.Name), "", processEntry)
}
snapshotCount, err = processEntryLog(entry, commandEnv, snapshotCount, store, snapshotsToGenerate, homeDirname, *snapshotPath)
return err
}
err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath), "", processEntry)
if err != nil {
return err
}
// edge case
// there might be unfinished snapshot left over in the duration gaps.
// process meta event only triggers snapshots when there are event after the snapshot time.
for snapshotCount < len(snapshotsToGenerate) {
generatePath := filepath.Join(homeDirname, *snapshotPath, snapshotsToGenerate[snapshotCount].Format(DateFormat)+SnapshotDirPostFix)
err = createIfNotExists(generatePath, 0755)
if err != nil {
return err
}
err = generateSnapshots(levelDbPath, generatePath)
if err != nil {
return err
}
snapshotCount++
}
// remove previous snapshots.
for _, snapshot := range snapshotsToRemove {
err = os.RemoveAll(snapshot)
if err != nil {
return err
}
}
return nil
}

86
weed/shell/command_fs_meta_snapshots_create_scheduler.go

@ -0,0 +1,86 @@
package shell
import (
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"
)
func computeRequirementsFromDirectory(previousSnapshots []string, homeDirectory string, snapshotPath string, count int, durationDays int, targetDate time.Time) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, levelDbBootstrapPath string, err error) {
levelDbBootstrapPath = previousSnapshots[len(previousSnapshots)-1]
lastSnapshotDate, err := time.Parse(DateFormat, levelDbBootstrapPath[:len(DateFormat)])
if err != nil {
return
}
if err != nil {
return
}
// the snapshots cover the last nanosecond of the current date
lastSnapshotDate = lastSnapshotDate.AddDate(0, 0, 1).Add(-1 * time.Nanosecond)
gapDuration := targetDate.Sub(lastSnapshotDate)
oneSnapshotInterval := 24 * time.Hour * time.Duration(durationDays)
totalSnapshotsInterval := 24 * time.Hour * time.Duration(durationDays*count)
// gap too small no snapshot will be generated
if gapDuration < oneSnapshotInterval {
return snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, errors.New(fmt.Sprintf("last snapshot was generated at %v no need to generate new snapshots", lastSnapshotDate.Format(DateFormat)))
} else if gapDuration > totalSnapshotsInterval {
// gap too large generate from targetDate
// and remove all previous snapshots
_, snapshotsToGenerate, _, err = computeRequirementsFromEmpty(count, durationDays, targetDate)
for _, file := range previousSnapshots {
snapshotsToRemove = append(snapshotsToRemove, filepath.Join(homeDirectory, snapshotPath, file))
}
return
}
snapshotDate := lastSnapshotDate.AddDate(0, 0, durationDays)
for snapshotDate.Before(targetDate) || snapshotDate.Equal(targetDate) {
snapshotsToGenerate = append(snapshotsToGenerate, snapshotDate)
snapshotDate = snapshotDate.AddDate(0, 0, durationDays)
}
totalCount := len(previousSnapshots) + len(snapshotsToGenerate)
toRemoveIdx := 0
for toRemoveIdx < len(previousSnapshots) && totalCount-toRemoveIdx > count {
snapshotsToRemove = append(snapshotsToRemove, filepath.Join(homeDirectory, snapshotPath, previousSnapshots[toRemoveIdx]))
toRemoveIdx += 1
}
return
}
func computeRequirementsFromEmpty(count int, durationDays int, targetDate time.Time) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, levelDbBootstrapPath string, err error) {
snapshotDate := targetDate
for i := 0; i < count; i++ {
snapshotsToGenerate = append(snapshotsToGenerate, snapshotDate)
snapshotDate = snapshotDate.AddDate(0, 0, -1*durationDays)
}
return snapshotsToRemove, snapshotsToGenerate, "", nil
}
// compute number of snapshot need to be generated and number of snapshots to remove from give directory.
func computeRequirements(homeDirectory string, snapshotPath string, count int, durationDays int) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, levelDbBootstrapPath string, err error) {
snapshotDirectory := filepath.Join(homeDirectory, snapshotPath)
files, _ := os.ReadDir(snapshotDirectory)
// sort files by name
sort.Slice(files, func(i, j int) bool {
return files[i].Name() < files[j].Name()
})
// filter for snapshots file name only
var prevSnapshotFiles []string
for _, file := range files {
if strings.HasSuffix(file.Name(), SnapshotDirPostFix) {
prevSnapshotFiles = append(prevSnapshotFiles, file.Name())
}
}
curDate := time.Now()
curDateStr := curDate.Format(DateFormat)
// ensure snapshot start at today 00:00 - 1ns
today, err := time.Parse(DateFormat, curDateStr)
targetDate := today.Add(-1 * time.Nanosecond)
if len(prevSnapshotFiles) == 0 {
return computeRequirementsFromEmpty(count, durationDays, targetDate)
}
return computeRequirementsFromDirectory(prevSnapshotFiles, homeDirectory, snapshotPath, count, durationDays, targetDate)
}

64
weed/shell/command_fs_meta_snapshots_create_scheduler_test.go

@ -0,0 +1,64 @@
package shell
import (
"github.com/stretchr/testify/assert"
"path/filepath"
"reflect"
"testing"
"time"
)
func TestComputeRequirementsFromDirectory(t *testing.T) {
homeDir := "home"
snapDir := "test"
// case1: we have previous snapshots, target date is relative close to the latest snapshot date, we will use previous snapshots to generate future snapshots and remove some previous snapshots.
var prevSnapshots []string
prevSnapshots = append(prevSnapshots, "2022-01-01")
prevSnapshots = append(prevSnapshots, "2022-02-01")
prevSnapshots = append(prevSnapshots, "2022-03-01")
targetDate, _ := time.Parse(DateFormat, "2022-04-01")
targetDate = targetDate.Add(-1 * time.Nanosecond)
snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err := computeRequirementsFromDirectory(prevSnapshots, homeDir, snapDir, 3, 15, targetDate)
assert.Nil(t, err)
assert.Equal(t, levelDbBootstrapPath, "2022-03-01", "latest previous snapshot should be 2022-03-01")
expectedToRemoveSnapshots := []string{filepath.Join(homeDir, snapDir, "2022-01-01"), filepath.Join(homeDir, snapDir, "2022-02-01")}
assert.True(t, reflect.DeepEqual(snapshotsToRemove, expectedToRemoveSnapshots))
expectedSnapshotsGenerationDate := []string{"2022-03-16", "2022-03-31"}
for i, date := range expectedSnapshotsGenerationDate {
assert.Equal(t, snapshotsToGenerate[i].Format(DateFormat), date)
}
// case2: we have previous snapshots, target date is too close to the latest snapshot date, no change will happen.
targetDate, _ = time.Parse(DateFormat, "2022-03-02")
snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err = computeRequirementsFromDirectory(prevSnapshots, "home", "test", 3, 15, targetDate)
assert.NotNil(t, err)
assert.Containsf(t, err.Error(), "no need to generate new snapshots", "expected error containing %q, got %s", "no need to generate new snapshots", err)
assert.Empty(t, snapshotsToRemove)
assert.Empty(t, snapshotsToGenerate)
assert.Equal(t, levelDbBootstrapPath, "2022-03-01", "latest previous snapshot should be 2022-03-01")
// case3: we have previous snapshots, target date is too far out to the latest snapshot date, all previous snapshots will be removed, snapshots will be generated going backward starting on target date.
targetDate, _ = time.Parse(DateFormat, "2022-12-01")
targetDate = targetDate.Add(-1 * time.Nanosecond)
snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err = computeRequirementsFromDirectory(prevSnapshots, "home", "test", 3, 15, targetDate)
assert.Nil(t, err)
expectedToRemoveSnapshots = []string{filepath.Join(homeDir, snapDir, "2022-01-01"), filepath.Join(homeDir, snapDir, "2022-02-01"), filepath.Join(homeDir, snapDir, "2022-03-01")}
assert.True(t, reflect.DeepEqual(snapshotsToRemove, expectedToRemoveSnapshots))
// we still need to skip all logs prior to 2022-03-02
assert.Equal(t, levelDbBootstrapPath, "2022-03-01", "latest previous snapshot should be 2022-03-01")
expectedSnapshotsGenerationDate = []string{"2022-11-30", "2022-11-15", "2022-10-31"}
for i, date := range expectedSnapshotsGenerationDate {
assert.Equal(t, snapshotsToGenerate[i].Format(DateFormat), date)
}
// case4: the target date is exactly n snapshots away from previous snapshot date
targetDate, _ = time.Parse(DateFormat, "2022-03-04")
targetDate = targetDate.Add(-1 * time.Nanosecond)
snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err = computeRequirementsFromDirectory(prevSnapshots, "home", "test", 3, 1, targetDate)
expectedToRemoveSnapshots = []string{filepath.Join(homeDir, snapDir, "2022-01-01"), filepath.Join(homeDir, snapDir, "2022-02-01")}
assert.True(t, reflect.DeepEqual(snapshotsToRemove, expectedToRemoveSnapshots))
expectedSnapshotsGenerationDate = []string{"2022-03-02", "2022-03-03"}
for i, date := range expectedSnapshotsGenerationDate {
assert.Equal(t, snapshotsToGenerate[i].Format(DateFormat), date)
}
}
Loading…
Cancel
Save