diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index e9e787d83..5f5388dca 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -102,7 +102,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. cipherKey = util.GenCipherKey() } - err = doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { + err = doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(ctx context.Context, entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { if !entry.Entry.IsDirectory { ext := filepath.Ext(entry.Entry.Name) if encrypted, encErr := util.Encrypt([]byte(entry.Entry.Name), cipherKey); encErr == nil { @@ -116,7 +116,11 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. return } - outputChan <- bytes + select { + case outputChan <- bytes: + case <-ctx.Done(): + return ctx.Err() + } return nil }, func(outputChan chan interface{}) error { sizeBuf := make([]byte, 4) @@ -143,14 +147,21 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. } -func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{}) error) error { +func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(ctx context.Context, entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{}) error) error { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() var wg sync.WaitGroup wg.Add(1) outputChan := make(chan interface{}, 1024) saveErrChan := make(chan error, 1) go func() { - saveErrChan <- saveFn(outputChan) + saveErr := saveFn(outputChan) + saveErrChan <- saveErr + if saveErr != nil { + cancel() + } wg.Done() }() @@ -160,7 +171,7 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, var hasErr atomic.Bool // also save the directory itself (path) if it exists in the filer - if e, getErr := filer_pb.GetEntry(context.Background(), filerClient, util.FullPath(path)); getErr != nil { + if e, getErr := filer_pb.GetEntry(ctx, filerClient, util.FullPath(path)); getErr != nil { // Entry not found is expected and can be ignored; log other errors. if !errors.Is(getErr, filer_pb.ErrNotFound) { fmt.Fprintf(writer, "failed to get entry %s: %v\n", path, getErr) @@ -172,7 +183,7 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, Entry: e, } - if genErr := genFn(protoMessage, outputChan); genErr != nil { + if genErr := genFn(ctx, protoMessage, outputChan); genErr != nil { once.Do(func() { firstErr = genErr hasErr.Store(true) @@ -187,7 +198,6 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, } } - ctx, cancel := context.WithCancel(context.Background()) defer cancel() err := filer_pb.TraverseBfs(ctx, filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error { @@ -205,7 +215,7 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, // fail-fast: stop traversal once an error is observed. return firstErr } - if genErr := genFn(protoMessage, outputChan); genErr != nil { + if genErr := genFn(ctx, protoMessage, outputChan); genErr != nil { once.Do(func() { firstErr = genErr hasErr.Store(true)