diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index 3454ea72c..c339e4ba4 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -157,26 +157,18 @@ func (metaBackup *FilerMetaBackupOptions) shouldInclude(fullpath string) bool { } func (metaBackup *FilerMetaBackupOptions) traverseMetadata() (err error) { - var saveErr error - - traverseErr := filer_pb.TraverseBfs(metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) { + return filer_pb.TraverseBfs(context.Background(), metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) error { fullpath := string(parentPath.Child(entry.Name)) if !metaBackup.shouldInclude(fullpath) { - return + return nil } println("+", fullpath) if err := metaBackup.store.InsertEntry(context.Background(), filer.FromPbEntry(string(parentPath), entry)); err != nil { - saveErr = fmt.Errorf("insert entry error: %w\n", err) - return + return fmt.Errorf("insert entry error: %w\n", err) } - + return nil }) - - if traverseErr != nil { - return fmt.Errorf("traverse: %w", traverseErr) - } - return saveErr } var ( diff --git a/weed/pb/filer_pb/filer_client_bfs.go b/weed/pb/filer_pb/filer_client_bfs.go index 9425b84d7..5a72d5cad 100644 --- a/weed/pb/filer_pb/filer_client_bfs.go +++ b/weed/pb/filer_pb/filer_client_bfs.go @@ -12,14 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) -func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry)) (err error) { - return TraverseBfsWithContext(context.Background(), filerClient, parentPath, func(parentPath util.FullPath, entry *Entry) error { - fn(parentPath, entry) - return nil - }) -} - -func TraverseBfsWithContext(ctx context.Context, filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry) error) (err error) { +func TraverseBfs(ctx context.Context, filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry) error) (err error) { K := 5 ctx, cancel := context.WithCancel(ctx) diff --git a/weed/shell/command_fs_merge_volumes.go b/weed/shell/command_fs_merge_volumes.go index e5ae54285..2ed2c0a61 100644 --- a/weed/shell/command_fs_merge_volumes.go +++ b/weed/shell/command_fs_merge_volumes.go @@ -109,9 +109,9 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer defer util_http.GetGlobalHttpClient().CloseIdleConnections() return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { - return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) { + return filer_pb.TraverseBfs(context.Background(), commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) error { if entry.IsDirectory { - return + return nil } for _, chunk := range entry.Chunks { chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId) @@ -141,6 +141,7 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer fmt.Printf("failed to update %s: %v\n", path, err) } } + return nil }) }) } diff --git a/weed/shell/command_fs_meta_change_volume_id.go b/weed/shell/command_fs_meta_change_volume_id.go index ec7cba729..2eded1afd 100644 --- a/weed/shell/command_fs_meta_change_volume_id.go +++ b/weed/shell/command_fs_meta_change_volume_id.go @@ -75,13 +75,13 @@ func (c *commandFsMetaChangeVolumeId) Do(args []string, commandEnv *CommandEnv, } return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return filer_pb.TraverseBfs(commandEnv, util.FullPath(*dir), func(parentPath util.FullPath, entry *filer_pb.Entry) { + return filer_pb.TraverseBfs(context.Background(), commandEnv, util.FullPath(*dir), func(parentPath util.FullPath, entry *filer_pb.Entry) error { if !entry.IsDirectory { var hasChanges bool for _, chunk := range entry.Chunks { if chunk.IsChunkManifest { fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name) - return + return nil } chunkVolumeId := chunk.Fid.VolumeId if toVolumeId, found := mapping[needle.VolumeId(chunkVolumeId)]; found { @@ -102,6 +102,7 @@ func (c *commandFsMetaChangeVolumeId) Do(args []string, commandEnv *CommandEnv, } } } + return nil }) }) } diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go index ea40b662d..911bff7dc 100644 --- a/weed/shell/command_fs_meta_notify.go +++ b/weed/shell/command_fs_meta_notify.go @@ -1,6 +1,7 @@ package shell import ( + "context" "fmt" "io" @@ -51,7 +52,7 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i var dirCount, fileCount uint64 - err = filer_pb.TraverseBfs(commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { + err = filer_pb.TraverseBfs(context.Background(), commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error { if entry.IsDirectory { dirCount++ @@ -69,7 +70,7 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i if notifyErr != nil { fmt.Fprintf(writer, "fail to notify new entry event for %s: %v\n", parentPath.Child(entry.Name), notifyErr) } - + return nil }) if err == nil { diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index 0c4b51eb9..c6db7c2d2 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -192,7 +192,7 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := filer_pb.TraverseBfsWithContext(ctx, filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error { + err := filer_pb.TraverseBfs(ctx, filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error { if strings.HasPrefix(string(parentPath), filer.SystemLogDir) { return nil