From 2b229e98cee862b66f9aa988e53da6b1ec79d716 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Fri, 17 Nov 2023 18:25:09 +0500 Subject: [PATCH] fix: add doDeleteFile option for filer backup --- weed/command/filer_backup.go | 5 +++-- weed/command/filer_sync.go | 19 +++++++++++-------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index cd63f49f7..691b1c0b5 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -20,6 +20,7 @@ type FilerBackupOptions struct { excludeFileName *string debug *bool proxyByFiler *bool + doDeleteFiles *bool timeAgo *time.Duration retentionDays *int } @@ -35,10 +36,10 @@ func init() { filerBackupOptions.excludePaths = cmdFilerBackup.Flag.String("filerExcludePaths", "", "exclude directories to sync on filer") filerBackupOptions.excludeFileName = cmdFilerBackup.Flag.String("filerExcludeFileName", "", "exclude file names that match the regexp to sync on filer") filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers") + filerBackupOptions.doDeleteFiles = cmdFilerBackup.Flag.Bool("doDeleteFiles", false, "delete files on the destination") filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files") filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") filerBackupOptions.retentionDays = cmdFilerBackup.Flag.Int("retentionDays", 0, "incremental backup retention days") - } var cmdFilerBackup = &Command{ @@ -129,7 +130,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti *backupOption.proxyByFiler) dataSink.SetSourceFiler(filerSource) - processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, debug) + processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug) processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3)) diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 38d29cdc6..33dcf4073 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -251,7 +251,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) filerSink.SetSourceFiler(filerSource) - persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, debug) + persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, true, debug) processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification @@ -369,7 +369,7 @@ func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature } -func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, reExcludeFileName *regexp.Regexp, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error { +func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, reExcludeFileName *regexp.Regexp, dataSink sink.ReplicationSink, doDeleteFiles bool, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error { // process function processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification @@ -397,16 +397,19 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str if reExcludeFileName != nil && reExcludeFileName.MatchString(message.NewEntry.Name) { return nil } + if dataSink.IsIncremental() { + doDeleteFiles = false + } // handle deletions if filer_pb.IsDelete(resp) { + if doDeleteFiles { + return nil + } if !strings.HasPrefix(string(sourceOldKey), sourcePath) { return nil } key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) - if !dataSink.IsIncremental() { - return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) - } - return nil + return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) } // handle new entries @@ -432,7 +435,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str // old key is in the watched directory if strings.HasPrefix(string(sourceNewKey), sourcePath) { // new key is also in the watched directory - if !dataSink.IsIncremental() { + if doDeleteFiles { oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):]) foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures) @@ -455,7 +458,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str } else { // new key is outside of the watched directory - if !dataSink.IsIncremental() { + if doDeleteFiles { key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) }