@ -17,7 +17,6 @@ import (
)
)
const LevelDbPath = "/tmp/snapshots.db"
const LevelDbPath = "/tmp/snapshots.db"
const SnapshotsParentPath = "/seaweedfs_meta_snapshots/"
const DateFormat = "2006-01-02"
const DateFormat = "2006-01-02"
func init ( ) {
func init ( ) {
@ -34,28 +33,26 @@ func (c *commandFsMetaSnapshotsCreate) Name() string {
func ( c * commandFsMetaSnapshotsCreate ) Help ( ) string {
func ( c * commandFsMetaSnapshotsCreate ) Help ( ) string {
return ` create snapshots of meta data from given time range .
return ` create snapshots of meta data from given time range .
fs . meta . snapshots . create - s = 2022 - 0 8 - 0 9 - e = 2022 - 10 - 12 - f 7 create snapshot starting from 2022 - 0 8 - 0 9 ending at 2022 - 10 - 12 every seven days .
//These snapshot maybe later used to backup the system to certain timestamp.
fs . meta . snapshots . create - snapshot - interval = 7 - snapshot - cnt = 3 - path = / your / path
// fs.meta.snapshots.create will generate desired number of snapshots with desired duration interval from yesterday the generated files will be saved from input path.
// These snapshot maybe later used to backup the system to certain timestamp.
`
`
}
}
func processMetaDataEvents ( store * filer_leveldb . LevelDBStore , data [ ] byte , snapshotCnt int , snapshotC heckPoints [ ] time . Time , homeDir string ) ( err error ) {
func processMetaDataEvents ( store * filer_leveldb . LevelDBStore , data [ ] byte , snapshotCheckPoints [ ] time . Time , homeDir string , snapshotPath string ) ( err error ) {
var event filer_pb . SubscribeMetadataResponse
var event filer_pb . SubscribeMetadataResponse
err = proto . Unmarshal ( data , & event )
err = proto . Unmarshal ( data , & event )
if err != nil {
if err != nil {
return err
return err
}
}
fmt . Printf ( "%+v\n" , event )
eventTime := event . TsNs
eventTime := event . TsNs
println ( time . Unix ( 0 , eventTime ) . Format ( DateFormat ) )
for snapshotCnt < len ( snapshotCheckPoints ) && time . Unix ( 0 , eventTime ) . After ( snapshotCheckPoints [ snapshotCnt ] ) {
snapshotPath := homeDir + SnapshotsParen tPath + snapshotCheckPoints [ snapshotCnt ] . Format ( DateFormat )
snapshotCnt := len ( snapshotCheckPoints ) - 1
for snapshotCnt >= 0 && time . Unix ( 0 , eventTime ) . After ( snapshotCheckPoints [ snapshotCnt ] ) {
snapshotPath := homeDir + snapsho tPath + snapshotCheckPoints [ snapshotCnt ] . Format ( DateFormat )
err = CreateIfNotExists ( snapshotPath , 0755 )
err = CreateIfNotExists ( snapshotPath , 0755 )
if err != nil {
if err != nil {
return err
return err
}
}
println ( "generating snapshots of metadata at: " + snapshotPath )
err = generateSnapshots ( homeDir + LevelDbPath , snapshotPath )
err = generateSnapshots ( homeDir + LevelDbPath , snapshotPath )
if err != nil {
if err != nil {
return err
return err
@ -69,23 +66,17 @@ func processMetaDataEvents(store *filer_leveldb.LevelDBStore, data []byte, snaps
if filer_pb . IsEmpty ( & event ) {
if filer_pb . IsEmpty ( & event ) {
return nil
return nil
} else if filer_pb . IsCreate ( & event ) {
} else if filer_pb . IsCreate ( & event ) {
println ( "+" , util . FullPath ( event . EventNotification . NewParentPath ) . Child ( event . EventNotification . NewEntry . Name ) )
entry := filer . FromPbEntry ( event . EventNotification . NewParentPath , event . EventNotification . NewEntry )
entry := filer . FromPbEntry ( event . EventNotification . NewParentPath , event . EventNotification . NewEntry )
return store . InsertEntry ( ctx , entry )
return store . InsertEntry ( ctx , entry )
} else if filer_pb . IsDelete ( & event ) {
} else if filer_pb . IsDelete ( & event ) {
println ( "-" , util . FullPath ( event . Directory ) . Child ( event . EventNotification . OldEntry . Name ) )
return store . DeleteEntry ( ctx , util . FullPath ( event . Directory ) . Child ( event . EventNotification . OldEntry . Name ) )
return store . DeleteEntry ( ctx , util . FullPath ( event . Directory ) . Child ( event . EventNotification . OldEntry . Name ) )
} else if filer_pb . IsUpdate ( & event ) {
} else if filer_pb . IsUpdate ( & event ) {
println ( "~" , util . FullPath ( event . EventNotification . NewParentPath ) . Child ( event . EventNotification . NewEntry . Name ) )
entry := filer . FromPbEntry ( event . EventNotification . NewParentPath , event . EventNotification . NewEntry )
entry := filer . FromPbEntry ( event . EventNotification . NewParentPath , event . EventNotification . NewEntry )
return store . UpdateEntry ( ctx , entry )
return store . UpdateEntry ( ctx , entry )
} else {
} else {
// renaming
println ( "-" , util . FullPath ( event . Directory ) . Child ( event . EventNotification . OldEntry . Name ) )
if err := store . DeleteEntry ( ctx , util . FullPath ( event . Directory ) . Child ( event . EventNotification . OldEntry . Name ) ) ; err != nil {
if err := store . DeleteEntry ( ctx , util . FullPath ( event . Directory ) . Child ( event . EventNotification . OldEntry . Name ) ) ; err != nil {
return err
return err
}
}
println ( "+" , util . FullPath ( event . EventNotification . NewParentPath ) . Child ( event . EventNotification . NewEntry . Name ) )
return store . InsertEntry ( ctx , filer . FromPbEntry ( event . EventNotification . NewParentPath , event . EventNotification . NewEntry ) )
return store . InsertEntry ( ctx , filer . FromPbEntry ( event . EventNotification . NewParentPath , event . EventNotification . NewEntry ) )
}
}
return nil
return nil
@ -166,26 +157,18 @@ func CreateIfNotExists(dir string, perm os.FileMode) error {
func ( c * commandFsMetaSnapshotsCreate ) Do ( args [ ] string , commandEnv * CommandEnv , writer io . Writer ) ( err error ) {
func ( c * commandFsMetaSnapshotsCreate ) Do ( args [ ] string , commandEnv * CommandEnv , writer io . Writer ) ( err error ) {
fsMetaSnapshotsCreateCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
fsMetaSnapshotsCreateCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
star t := fsMetaSnapshotsCreateCommand . String ( "s " , "" , "start date of metadata compaction in yyyy-mm-dd format " )
end := fsMetaSnapshotsCreateCommand . String ( "e" , "" , "end date of metadata compaction in yyyy-mm-dd format ")
frequency := fsMetaSnapshotsCreateCommand . Int ( "f " , 7 , "the frequency of generating the metadata snapshots in days " )
snapsho tP ath := fsMetaSnapshotsCreateCommand . String ( "path " , "" , "the path to store generated snapshot files " )
snapshotCnt := fsMetaSnapshotsCreateCommand . Int ( "snapshot-cnt" , 3 , "number of snapshots generated ")
snapshotInterval := fsMetaSnapshotsCreateCommand . Int ( "snapshot-interval " , 7 , "the duration interval between each generated snapshot " )
if err = fsMetaSnapshotsCreateCommand . Parse ( args ) ; err != nil {
if err = fsMetaSnapshotsCreateCommand . Parse ( args ) ; err != nil {
return err
return err
}
}
startDate , err := time . Parse ( DateFormat , * start )
if err != nil {
return err
}
endDate , err := time . Parse ( DateFormat , * end )
if err != nil {
return err
}
snapshotdate := time . Now ( ) . AddDate ( 0 , 0 , - 1 )
var snapshotCheckPoints [ ] time . Time
var snapshotCheckPoints [ ] time . Time
for startDate . Before ( endDate ) || startDate . Equal ( endDate ) {
snapshotCheckPoints = append ( snapshotCheckPoints , startD ate )
startDate = startD ate . AddDate ( 0 , 0 , * frequency )
for i := 0 ; i < * snapshotCnt ; i ++ {
snapshotCheckPoints = append ( snapshotCheckPoints , snapshotdate )
snapshotdate = snapshotdate . AddDate ( 0 , 0 , - 1 * * snapshotInterval )
}
}
snapshotCnt := 0
homeDirname , err := os . UserHomeDir ( )
homeDirname , err := os . UserHomeDir ( )
if err != nil {
if err != nil {
return err
return err
@ -197,11 +180,11 @@ func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv,
}
}
store := & filer_leveldb . LevelDBStore { }
store := & filer_leveldb . LevelDBStore { }
store . CustomInitialize ( levelDbPath )
store . CustomInitialize ( levelDbPath )
p ath := filer . SystemLogDir
changeLogP ath := filer . SystemLogDir
var processEntry func ( entry * filer_pb . Entry , isLast bool ) error
var processEntry func ( entry * filer_pb . Entry , isLast bool ) error
processEntry = func ( entry * filer_pb . Entry , isLast bool ) error {
processEntry = func ( entry * filer_pb . Entry , isLast bool ) error {
if entry . IsDirectory {
if entry . IsDirectory {
return filer_pb . ReadDirAllEntries ( commandEnv , util . FullPath ( p ath+ "/" + entry . Name ) , "" , processEntry )
return filer_pb . ReadDirAllEntries ( commandEnv , util . FullPath ( changeLogP ath+ "/" + entry . Name ) , "" , processEntry )
}
}
totalSize := filer . FileSize ( entry )
totalSize := filer . FileSize ( entry )
buf := mem . Allocate ( int ( totalSize ) )
buf := mem . Allocate ( int ( totalSize ) )
@ -219,7 +202,7 @@ func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv,
return err
return err
}
}
idx = idx + 4 + logEntrySize
idx = idx + 4 + logEntrySize
err = processMetaDataEvents ( store , logEntry . Data , snapshotCnt , snapshotC heckPoints , homeDirname )
err = processMetaDataEvents ( store , logEntry . Data , snapshotCheckPoints , homeDirname , * snapshotPath )
if err != nil {
if err != nil {
return err
return err
}
}
@ -228,6 +211,6 @@ func (c *commandFsMetaSnapshotsCreate) Do(args []string, commandEnv *CommandEnv,
return nil
return nil
}
}
err = filer_pb . ReadDirAllEntries ( commandEnv , util . FullPath ( p ath) , "" , processEntry )
err = filer_pb . ReadDirAllEntries ( commandEnv , util . FullPath ( changeLogP ath) , "" , processEntry )
return err
return err
}
}