From 3b05efbdbcea15ef5b96a1d3d059c3ebbfabb5f8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 29 Jan 2026 14:42:10 -0800 Subject: [PATCH] shell: fix potential deadlock in fs.meta.save BFS traversal Refactored doTraverseBfsAndSaving to use context cancellation. If the saving process fails, the traversal is stopped immediately to prevent workers from blocking on the output channel. --- weed/shell/command_fs_meta_save.go | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index e9e787d83..5f5388dca 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -102,7 +102,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. cipherKey = util.GenCipherKey() } - err = doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { + err = doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(ctx context.Context, entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { if !entry.Entry.IsDirectory { ext := filepath.Ext(entry.Entry.Name) if encrypted, encErr := util.Encrypt([]byte(entry.Entry.Name), cipherKey); encErr == nil { @@ -116,7 +116,11 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. return } - outputChan <- bytes + select { + case outputChan <- bytes: + case <-ctx.Done(): + return ctx.Err() + } return nil }, func(outputChan chan interface{}) error { sizeBuf := make([]byte, 4) @@ -143,14 +147,21 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. } -func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{}) error) error { +func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(ctx context.Context, entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{}) error) error { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() var wg sync.WaitGroup wg.Add(1) outputChan := make(chan interface{}, 1024) saveErrChan := make(chan error, 1) go func() { - saveErrChan <- saveFn(outputChan) + saveErr := saveFn(outputChan) + saveErrChan <- saveErr + if saveErr != nil { + cancel() + } wg.Done() }() @@ -160,7 +171,7 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, var hasErr atomic.Bool // also save the directory itself (path) if it exists in the filer - if e, getErr := filer_pb.GetEntry(context.Background(), filerClient, util.FullPath(path)); getErr != nil { + if e, getErr := filer_pb.GetEntry(ctx, filerClient, util.FullPath(path)); getErr != nil { // Entry not found is expected and can be ignored; log other errors. if !errors.Is(getErr, filer_pb.ErrNotFound) { fmt.Fprintf(writer, "failed to get entry %s: %v\n", path, getErr) @@ -172,7 +183,7 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, Entry: e, } - if genErr := genFn(protoMessage, outputChan); genErr != nil { + if genErr := genFn(ctx, protoMessage, outputChan); genErr != nil { once.Do(func() { firstErr = genErr hasErr.Store(true) @@ -187,7 +198,6 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, } } - ctx, cancel := context.WithCancel(context.Background()) defer cancel() err := filer_pb.TraverseBfs(ctx, filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error { @@ -205,7 +215,7 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, // fail-fast: stop traversal once an error is observed. return firstErr } - if genErr := genFn(protoMessage, outputChan); genErr != nil { + if genErr := genFn(ctx, protoMessage, outputChan); genErr != nil { once.Do(func() { firstErr = genErr hasErr.Store(true)