From a5fe6e21bce464017fde036160e986d4c3bb81db Mon Sep 17 00:00:00 2001 From: Max Denushev <mdenushev@ya.ru> Date: Thu, 14 Nov 2024 19:40:55 +0300 Subject: [PATCH] feat(filer.backup): add ignore errors option (#6235) * feat(filer.backup): add ignore errors option * feat(filer.backup): fix 404 error wrap * feat(filer.backup): fix wrapping function * feat(filer.backup): fix wrapping errors in genProcessFunction * Update weed/command/filer_backup.go * Update weed/command/filer_backup.go * Update weed/command/filer_backup.go --------- Co-authored-by: Max Denushev <denushev@tochka.com> Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com> --- weed/command/filer_backup.go | 52 +++++++++++++++++------ weed/command/filer_sync.go | 8 ++-- weed/util/http/http_global_client_util.go | 11 +++-- 3 files changed, 52 insertions(+), 19 deletions(-) diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 1344dfd2c..380540fd9 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -1,12 +1,15 @@ package command import ( + "errors" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/replication/source" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/http" "google.golang.org/grpc" "regexp" "strings" @@ -14,16 +17,18 @@ import ( ) type FilerBackupOptions struct { - isActivePassive *bool - filer *string - path *string - excludePaths *string - excludeFileName *string - debug *bool - proxyByFiler *bool - doDeleteFiles *bool - timeAgo *time.Duration - retentionDays *int + isActivePassive *bool + filer *string + path *string + excludePaths *string + excludeFileName *string + debug *bool + proxyByFiler *bool + doDeleteFiles *bool + disableErrorRetry *bool + ignore404Error *bool + timeAgo *time.Duration + retentionDays *int } var ( @@ -41,6 +46,8 @@ func init() { filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files") filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") filerBackupOptions.retentionDays = cmdFilerBackup.Flag.Int("retentionDays", 0, "incremental backup retention days") + filerBackupOptions.disableErrorRetry = cmdFilerBackup.Flag.Bool("disableErrorRetry", false, "disables errors retry, only logs will print") + filerBackupOptions.ignore404Error = cmdFilerBackup.Flag.Bool("ignore404Error", true, "ignore 404 errors from filer") } var cmdFilerBackup = &Command{ @@ -130,7 +137,23 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti *backupOption.proxyByFiler) dataSink.SetSourceFiler(filerSource) - processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug) + var processEventFn func(*filer_pb.SubscribeMetadataResponse) error + if *backupOption.ignore404Error { + processEventFnGenerated := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug) + processEventFn = func(resp *filer_pb.SubscribeMetadataResponse) error { + err := processEventFnGenerated(resp) + if err == nil { + return nil + } + if errors.Is(err, http.ErrNotFound) { + glog.V(0).Infof("got 404 error, ignore it: %s", err.Error()) + return nil + } + return err + } + } else { + processEventFn = genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug) + } processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3)) @@ -154,6 +177,11 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti prefix = prefix + "/" } + eventErrorType := pb.RetryForeverOnError + if *backupOption.disableErrorRetry { + eventErrorType = pb.TrivialOnError + } + metadataFollowOption := &pb.MetadataFollowOption{ ClientName: "backup_" + dataSink.GetName(), ClientId: clientId, @@ -164,7 +192,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti DirectoriesToWatch: nil, StartTsNs: startFrom.UnixNano(), StopTsNs: 0, - EventErrorType: pb.RetryForeverOnError, + EventErrorType: eventErrorType, } return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset) diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 90204af4a..8201aa712 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -436,7 +436,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str } key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil { - return fmt.Errorf("create entry1 : %v", err) + return fmt.Errorf("create entry1 : %w", err) } else { return nil } @@ -462,13 +462,13 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str // not able to find old entry if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil { - return fmt.Errorf("delete old entry %v: %v", oldKey, err) + return fmt.Errorf("delete old entry %v: %w", oldKey, err) } } // create the new entry newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil { - return fmt.Errorf("create entry2 : %v", err) + return fmt.Errorf("create entry2 : %w", err) } else { return nil } @@ -486,7 +486,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str // new key is in the watched directory key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil { - return fmt.Errorf("create entry3 : %v", err) + return fmt.Errorf("create entry3 : %w", err) } else { return nil } diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go index c3931a790..33d978d9e 100644 --- a/weed/util/http/http_global_client_util.go +++ b/weed/util/http/http_global_client_util.go @@ -5,8 +5,8 @@ import ( "encoding/json" "errors" "fmt" - "github.com/seaweedfs/seaweedfs/weed/util/mem" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/mem" "io" "net/http" "net/url" @@ -16,6 +16,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" ) +var ErrNotFound = fmt.Errorf("not found") + func Post(url string, values url.Values) ([]byte, error) { r, err := GetGlobalHttpClient().PostForm(url, values) if err != nil { @@ -311,7 +313,10 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte } defer CloseResponse(r) if r.StatusCode >= 400 { - retryable = r.StatusCode == http.StatusNotFound || r.StatusCode >= 499 + if r.StatusCode == http.StatusNotFound { + return true, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrNotFound) + } + retryable = r.StatusCode >= 499 return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status) } @@ -477,4 +482,4 @@ func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, return n, err -} \ No newline at end of file +}