Browse Source

fix

pull/4038/head
root 2 years ago
parent
commit
b7394a48f0
  1. 189
      weed/shell/command_fs_meta_snapshots_create.go
  2. 86
      weed/shell/command_fs_meta_snapshots_create_scheduler.go
  3. 64
      weed/shell/command_fs_meta_snapshots_create_scheduler_test.go

189
weed/shell/command_fs_meta_snapshots_create.go

@ -2,7 +2,6 @@ package shell
import (
"context"
"errors"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
@ -15,7 +14,6 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"time"
)
@ -67,46 +65,71 @@ func (c *commandFsMetaSnapshotsCreate) Help() string {
`
}
func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, unfinshiedSnapshotCnt int, snapshotCheckPoints []time.Time, homeDir string, snapshotPath string) (todoCnt int, err error) {
func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, count int, snapshotCheckPoints []time.Time, homeDir string, snapshotPath string) (SnapshotCount int, err error) {
var event filer_pb.SubscribeMetadataResponse
err = proto.Unmarshal(data, &event)
if err != nil {
return unfinshiedSnapshotCnt, err
return count, err
}
eventTime := event.TsNs
for unfinshiedSnapshotCnt >= 0 && time.Unix(0, eventTime).After(snapshotCheckPoints[unfinshiedSnapshotCnt]) {
snapshotPath := filepath.Join(homeDir, snapshotPath, snapshotCheckPoints[unfinshiedSnapshotCnt].Format(DateFormat)+SnapshotDirPostFix)
for count < len(snapshotCheckPoints) && time.Unix(0, eventTime).After(snapshotCheckPoints[count].Add(-time.Microsecond)) {
snapshotPath := filepath.Join(homeDir, snapshotPath, snapshotCheckPoints[count].Format(DateFormat)+SnapshotDirPostFix)
err = createIfNotExists(snapshotPath, 0755)
if err != nil {
return unfinshiedSnapshotCnt, err
return count, err
}
err = generateSnapshots(homeDir+LevelDbPath, snapshotPath)
if err != nil {
return unfinshiedSnapshotCnt, err
return count, err
}
unfinshiedSnapshotCnt--
count++
}
if unfinshiedSnapshotCnt < 0 {
return unfinshiedSnapshotCnt, nil
if count < 0 {
return count, nil
}
ctx := context.Background()
if filer_pb.IsEmpty(&event) {
return unfinshiedSnapshotCnt, nil
return count, nil
} else if filer_pb.IsCreate(&event) {
entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
return unfinshiedSnapshotCnt, store.InsertEntry(ctx, entry)
return count, store.InsertEntry(ctx, entry)
} else if filer_pb.IsDelete(&event) {
return unfinshiedSnapshotCnt, store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name))
return count, store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name))
} else if filer_pb.IsUpdate(&event) {
entry := filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry)
return unfinshiedSnapshotCnt, store.UpdateEntry(ctx, entry)
return count, store.UpdateEntry(ctx, entry)
} else {
if err := store.DeleteEntry(ctx, util.FullPath(event.Directory).Child(event.EventNotification.OldEntry.Name)); err != nil {
return unfinshiedSnapshotCnt, err
return count, err
}
return unfinshiedSnapshotCnt, store.InsertEntry(ctx, filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry))
return count, store.InsertEntry(ctx, filer.FromPbEntry(event.EventNotification.NewParentPath, event.EventNotification.NewEntry))
}
return unfinshiedSnapshotCnt, nil
return count, nil
}
func processEntryLog(entry *filer_pb.Entry, commandEnv *CommandEnv, snapshotCount int, store *filer_leveldb.LevelDBStore, snapshotsToGenerate []time.Time, homeDirname string, snapshotPath string) (count int, err error) {
totalSize := filer.FileSize(entry)
buf := mem.Allocate(int(totalSize))
if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.ReadAll(buf, commandEnv.MasterClient, entry.GetChunks())
}); err != nil && err != filer_pb.ErrNotFound {
return snapshotCount, err
}
idx := uint32(0)
for idx < uint32(totalSize) {
logEntrySize := util.BytesToUint32(buf[idx : idx+4])
var logEntry filer_pb.LogEntry
err = proto.Unmarshal(buf[idx+4:idx+4+logEntrySize], &logEntry)
if err != nil {
return snapshotCount, err
}
idx = idx + 4 + logEntrySize
snapshotCount, err = processMetaDataEvents(store, logEntry.Data, snapshotCount, snapshotsToGenerate, homeDirname, snapshotPath)
if err != nil {
return snapshotCount, err
}
}
return snapshotCount, err
}
func generateSnapshots(scrDir, dest string) error {
@ -181,85 +204,18 @@ func createIfNotExists(dir string, perm os.FileMode) error {
return nil
}
func computeRequirementsFromDirectory(previousSnapshots []os.DirEntry, homeDirectory string, snapshotPath string, count int, durationDays int) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, err error) {
lastSnapshotDate, err := time.Parse(DateFormat, previousSnapshots[len(previousSnapshots)-1].Name()[:len(DateFormat)])
func setupLevelDb(levelDbPath string, levelDbBootstrapPath string) (store *filer_leveldb.LevelDBStore, err error) {
err = os.RemoveAll(levelDbPath)
if err != nil {
return snapshotsToRemove, snapshotsToGenerate, err
return &filer_leveldb.LevelDBStore{}, err
}
yesterday := time.Now().Add(-time.Hour * 24)
yesterdayStr := yesterday.Format(DateFormat)
// ensure snapshot start at yesterday 00:00
yesterday, err = time.Parse(DateFormat, yesterdayStr)
if len(levelDbBootstrapPath) != 0 {
// copy the latest snapshot as starting point
err = generateSnapshots(levelDbBootstrapPath, levelDbPath)
if err != nil {
return snapshotsToRemove, snapshotsToGenerate, err
}
gapDays := int(yesterday.Sub(lastSnapshotDate).Hours() / 24)
// gap too small no snapshot will be generated
if gapDays < durationDays {
return snapshotsToRemove, snapshotsToGenerate, errors.New(fmt.Sprintf("last snapshot was generated at %v no need to generate new snapshots", lastSnapshotDate.Format(DateFormat)))
} else if gapDays > durationDays*count {
// gap too large generate from yesterday
// and remove all previous snapshots
_, snapshotsToGenerate, err = computeRequirementsFromEmpty(homeDirectory, count, durationDays)
for _, file := range previousSnapshots {
snapshotsToRemove = append(snapshotsToRemove, filepath.Join(homeDirectory, snapshotPath, file.Name()))
}
return
}
snapshotDate := lastSnapshotDate.AddDate(0, 0, 1*durationDays)
for snapshotDate.Before(yesterday) || snapshotDate.Equal(yesterday) {
snapshotsToGenerate = append(snapshotsToGenerate, snapshotDate)
snapshotDate = lastSnapshotDate.AddDate(0, 0, 1*durationDays)
}
totalCount := len(previousSnapshots) + len(snapshotsToGenerate)
toRemoveIdx := 0
for toRemoveIdx < len(previousSnapshots) && totalCount-toRemoveIdx > count {
snapshotsToRemove = append(snapshotsToRemove, filepath.Join(homeDirectory, snapshotPath, previousSnapshots[toRemoveIdx].Name()))
toRemoveIdx += 1
}
return
}
func computeRequirementsFromEmpty(homeDirectory string, count int, durationDays int) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, err error) {
yesterday := time.Now().Add(-time.Hour * 24).Format(DateFormat)
// ensure snapshot start at yesterday 00:00
snapshotDate, err := time.Parse(DateFormat, yesterday)
if err != nil {
return snapshotsToRemove, snapshotsToGenerate, err
}
for i := 0; i < count; i++ {
snapshotsToGenerate = append(snapshotsToGenerate, snapshotDate)
snapshotDate = snapshotDate.AddDate(0, 0, -1*durationDays)
}
return snapshotsToRemove, snapshotsToGenerate, nil
}
// compute number of snapshot need to be generated and number of snapshots to remove from give directory.
func computeRequirements(homeDirectory string, snapshotPath string, count int, durationDays int) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, err error) {
snapshotDirectory := filepath.Join(homeDirectory, snapshotPath)
files, _ := os.ReadDir(snapshotDirectory)
if len(files) == 0 {
return computeRequirementsFromEmpty(homeDirectory, count, durationDays)
}
// sort files by name
sort.Slice(files, func(i, j int) bool {
return files[i].Name() < files[j].Name()
})
// filter for snapshots file only
var prevSnapshotFiles []os.DirEntry
for _, file := range files {
if strings.HasSuffix(file.Name(), SnapshotDirPostFix) {
prevSnapshotFiles = append(prevSnapshotFiles, file)
}
}
return computeRequirementsFromDirectory(prevSnapshotFiles, homeDirectory, snapshotPath, count, durationDays)
}
func setupLevelDb(levelDbPath string) (store *filer_leveldb.LevelDBStore, err error) {
err = os.RemoveAll(levelDbPath)
if err != nil {
return &filer_leveldb.LevelDBStore{}, err
}
config := SnapshotConfig{
dir: levelDbPath,
}
@ -279,43 +235,36 @@ func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv,
if err != nil {
return err
}
snapshotsToRemove, snapshotsToGenerate, err := computeRequirements(homeDirname, *snapshotPath, *count, *intervalDays)
snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err := computeRequirements(homeDirname, *snapshotPath, *count, *intervalDays)
if err != nil {
return err
}
levelDbPath := filepath.Join(homeDirname, LevelDbPath)
store, err := setupLevelDb(levelDbPath)
store, err := setupLevelDb(levelDbPath, filepath.Join(homeDirname, *snapshotPath, levelDbBootstrapPath))
if err != nil {
return err
}
unfinishedSnapshotCnt := len(snapshotsToGenerate) - 1
// sort to make sure we are processing ascending list of snapshots to generate
sort.Slice(snapshotsToGenerate, func(i, j int) bool {
return snapshotsToGenerate[i].Before(snapshotsToGenerate[j])
})
changeLogPath := filer.SystemLogDir
var processEntry func(entry *filer_pb.Entry, isLast bool) error
var levelDbBootstrapDate string
if levelDbBootstrapPath != "" {
levelDbBootstrapDate = levelDbBootstrapPath[:len(DateFormat)]
}
snapshotCount := 0
processEntry = func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory {
return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath+"/"+entry.Name), "", processEntry)
}
totalSize := filer.FileSize(entry)
buf := mem.Allocate(int(totalSize))
if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.ReadAll(buf, commandEnv.MasterClient, entry.GetChunks())
}); err != nil && err != filer_pb.ErrNotFound {
return err
}
idx := uint32(0)
for idx < uint32(totalSize) {
logEntrySize := util.BytesToUint32(buf[idx : idx+4])
var logEntry filer_pb.LogEntry
err = proto.Unmarshal(buf[idx+4:idx+4+logEntrySize], &logEntry)
if err != nil {
return err
}
idx = idx + 4 + logEntrySize
unfinishedSnapshotCnt, err = processMetaDataEvents(store, logEntry.Data, unfinishedSnapshotCnt, snapshotsToGenerate, homeDirname, *snapshotPath)
if err != nil {
return err
// skip logs prior to the latest previous snapshot
if entry.GetName() <= levelDbBootstrapDate {
println(entry.GetName())
return nil
}
return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(changeLogPath+"/"+entry.Name), "", processEntry)
}
snapshotCount, err = processEntryLog(entry, commandEnv, snapshotCount, store, snapshotsToGenerate, homeDirname, *snapshotPath)
return err
}
@ -325,9 +274,9 @@ func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv,
}
// edge case
// there might be unfinished snapshot left over in the duration gaps.
// process meta event only triggers snapshots when there are event after the snapshot time
for unfinishedSnapshotCnt >= 0 {
generatePath := filepath.Join(homeDirname, *snapshotPath, snapshotsToGenerate[unfinishedSnapshotCnt].Format(DateFormat))
// process meta event only triggers snapshots when there are event after the snapshot time.
for snapshotCount < len(snapshotsToGenerate) {
generatePath := filepath.Join(homeDirname, *snapshotPath, snapshotsToGenerate[snapshotCount].Format(DateFormat))
err = createIfNotExists(generatePath, 0755)
if err != nil {
return err
@ -336,9 +285,9 @@ func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv,
if err != nil {
return err
}
unfinishedSnapshotCnt--
snapshotCount++
}
// remove previous snapshot if needed.
// remove previous snapshots.
for _, snapshot := range snapshotsToRemove {
err = os.RemoveAll(snapshot)
if err != nil {

86
weed/shell/command_fs_meta_snapshots_create_scheduler.go

@ -0,0 +1,86 @@
package shell
import (
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"
)
func computeRequirementsFromDirectory(previousSnapshots []string, homeDirectory string, snapshotPath string, count int, durationDays int, targetDate time.Time) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, levelDbBootstrapPath string, err error) {
levelDbBootstrapPath = previousSnapshots[len(previousSnapshots)-1]
lastSnapshotDate, err := time.Parse(DateFormat, levelDbBootstrapPath[:len(DateFormat)])
if err != nil {
return
}
if err != nil {
return
}
// the snapshots cover the last nanosecond of the current date
lastSnapshotDate = lastSnapshotDate.AddDate(0, 0, 1).Add(-1 * time.Nanosecond)
gapDuration := targetDate.Sub(lastSnapshotDate)
oneSnapshotInterval := 24 * time.Hour * time.Duration(durationDays)
totalSnapshotsInterval := 24 * time.Hour * time.Duration(durationDays*count)
// gap too small no snapshot will be generated
if gapDuration < oneSnapshotInterval {
return snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, errors.New(fmt.Sprintf("last snapshot was generated at %v no need to generate new snapshots", lastSnapshotDate.Format(DateFormat)))
} else if gapDuration > totalSnapshotsInterval {
// gap too large generate from targetDate
// and remove all previous snapshots
_, snapshotsToGenerate, _, err = computeRequirementsFromEmpty(count, durationDays, targetDate)
for _, file := range previousSnapshots {
snapshotsToRemove = append(snapshotsToRemove, filepath.Join(homeDirectory, snapshotPath, file))
}
return
}
snapshotDate := lastSnapshotDate.AddDate(0, 0, durationDays)
for snapshotDate.Before(targetDate) || snapshotDate.Equal(targetDate) {
snapshotsToGenerate = append(snapshotsToGenerate, snapshotDate)
snapshotDate = snapshotDate.AddDate(0, 0, durationDays)
}
totalCount := len(previousSnapshots) + len(snapshotsToGenerate)
toRemoveIdx := 0
for toRemoveIdx < len(previousSnapshots) && totalCount-toRemoveIdx > count {
snapshotsToRemove = append(snapshotsToRemove, filepath.Join(homeDirectory, snapshotPath, previousSnapshots[toRemoveIdx]))
toRemoveIdx += 1
}
return
}
func computeRequirementsFromEmpty(count int, durationDays int, targetDate time.Time) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, levelDbBootstrapPath string, err error) {
snapshotDate := targetDate
for i := 0; i < count; i++ {
snapshotsToGenerate = append(snapshotsToGenerate, snapshotDate)
snapshotDate = snapshotDate.AddDate(0, 0, -1*durationDays)
}
return snapshotsToRemove, snapshotsToGenerate, "", nil
}
// compute number of snapshot need to be generated and number of snapshots to remove from give directory.
func computeRequirements(homeDirectory string, snapshotPath string, count int, durationDays int) (snapshotsToRemove []string, snapshotsToGenerate []time.Time, levelDbBootstrapPath string, err error) {
snapshotDirectory := filepath.Join(homeDirectory, snapshotPath)
files, _ := os.ReadDir(snapshotDirectory)
// sort files by name
sort.Slice(files, func(i, j int) bool {
return files[i].Name() < files[j].Name()
})
// filter for snapshots file name only
var prevSnapshotFiles []string
for _, file := range files {
if strings.HasSuffix(file.Name(), SnapshotDirPostFix) {
prevSnapshotFiles = append(prevSnapshotFiles, file.Name())
}
}
curDate := time.Now()
curDateStr := curDate.Format(DateFormat)
// ensure snapshot start at today 00:00 - 1ns
today, err := time.Parse(DateFormat, curDateStr)
targetDate := today.Add(-1 * time.Nanosecond)
if len(prevSnapshotFiles) == 0 {
return computeRequirementsFromEmpty(count, durationDays, targetDate)
}
return computeRequirementsFromDirectory(prevSnapshotFiles, homeDirectory, snapshotPath, count, durationDays, targetDate)
}

64
weed/shell/command_fs_meta_snapshots_create_scheduler_test.go

@ -0,0 +1,64 @@
package shell
import (
"github.com/stretchr/testify/assert"
"path/filepath"
"reflect"
"testing"
"time"
)
func TestComputeRequirementsFromDirectory(t *testing.T) {
homeDir := "home"
snapDir := "test"
// case1: we have previous snapshots, target date is relative close to the latest snapshot date, we will use previous snapshots to generate future snapshots and remove some previous snapshots.
var prevSnapshots []string
prevSnapshots = append(prevSnapshots, "2022-01-01")
prevSnapshots = append(prevSnapshots, "2022-02-01")
prevSnapshots = append(prevSnapshots, "2022-03-01")
targetDate, _ := time.Parse(DateFormat, "2022-04-01")
targetDate = targetDate.Add(-1 * time.Nanosecond)
snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err := computeRequirementsFromDirectory(prevSnapshots, homeDir, snapDir, 3, 15, targetDate)
assert.Nil(t, err)
assert.Equal(t, levelDbBootstrapPath, "2022-03-01", "latest previous snapshot should be 2022-03-01")
expectedToRemoveSnapshots := []string{filepath.Join(homeDir, snapDir, "2022-01-01"), filepath.Join(homeDir, snapDir, "2022-02-01")}
assert.True(t, reflect.DeepEqual(snapshotsToRemove, expectedToRemoveSnapshots))
expectedSnapshotsGenerationDate := []string{"2022-03-16", "2022-03-31"}
for i, date := range expectedSnapshotsGenerationDate {
assert.Equal(t, snapshotsToGenerate[i].Format(DateFormat), date)
}
// case2: we have previous snapshots, target date is too close to the latest snapshot date, no change will happen.
targetDate, _ = time.Parse(DateFormat, "2022-03-02")
snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err = computeRequirementsFromDirectory(prevSnapshots, "home", "test", 3, 15, targetDate)
assert.NotNil(t, err)
assert.Containsf(t, err.Error(), "no need to generate new snapshots", "expected error containing %q, got %s", "no need to generate new snapshots", err)
assert.Empty(t, snapshotsToRemove)
assert.Empty(t, snapshotsToGenerate)
assert.Equal(t, levelDbBootstrapPath, "2022-03-01", "latest previous snapshot should be 2022-03-01")
// case3: we have previous snapshots, target date is too far out to the latest snapshot date, all previous snapshots will be removed, snapshots will be generated going backward starting on target date.
targetDate, _ = time.Parse(DateFormat, "2022-12-01")
targetDate = targetDate.Add(-1 * time.Nanosecond)
snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err = computeRequirementsFromDirectory(prevSnapshots, "home", "test", 3, 15, targetDate)
assert.Nil(t, err)
expectedToRemoveSnapshots = []string{filepath.Join(homeDir, snapDir, "2022-01-01"), filepath.Join(homeDir, snapDir, "2022-02-01"), filepath.Join(homeDir, snapDir, "2022-03-01")}
assert.True(t, reflect.DeepEqual(snapshotsToRemove, expectedToRemoveSnapshots))
// we still need to skip all logs prior to 2022-03-02
assert.Equal(t, levelDbBootstrapPath, "2022-03-01", "latest previous snapshot should be 2022-03-01")
expectedSnapshotsGenerationDate = []string{"2022-11-30", "2022-11-15", "2022-10-31"}
for i, date := range expectedSnapshotsGenerationDate {
assert.Equal(t, snapshotsToGenerate[i].Format(DateFormat), date)
}
// case4: the target date is exactly n snapshots away from previous snapshot date
targetDate, _ = time.Parse(DateFormat, "2022-03-04")
targetDate = targetDate.Add(-1 * time.Nanosecond)
snapshotsToRemove, snapshotsToGenerate, levelDbBootstrapPath, err = computeRequirementsFromDirectory(prevSnapshots, "home", "test", 3, 1, targetDate)
expectedToRemoveSnapshots = []string{filepath.Join(homeDir, snapDir, "2022-01-01"), filepath.Join(homeDir, snapDir, "2022-02-01")}
assert.True(t, reflect.DeepEqual(snapshotsToRemove, expectedToRemoveSnapshots))
expectedSnapshotsGenerationDate = []string{"2022-03-02", "2022-03-03"}
for i, date := range expectedSnapshotsGenerationDate {
assert.Equal(t, snapshotsToGenerate[i].Format(DateFormat), date)
}
}
Loading…
Cancel
Save