Browse Source

shell: fix potential deadlock in fs.meta.save BFS traversal

Refactored doTraverseBfsAndSaving to use context cancellation.
If the saving process fails, the traversal is stopped immediately
to prevent workers from blocking on the output channel.
pull/7183/merge
Chris Lu 1 day ago
parent
commit
3b05efbdbc
  1. 26
      weed/shell/command_fs_meta_save.go

26
weed/shell/command_fs_meta_save.go

@ -102,7 +102,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
cipherKey = util.GenCipherKey() cipherKey = util.GenCipherKey()
} }
err = doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
err = doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(ctx context.Context, entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
if !entry.Entry.IsDirectory { if !entry.Entry.IsDirectory {
ext := filepath.Ext(entry.Entry.Name) ext := filepath.Ext(entry.Entry.Name)
if encrypted, encErr := util.Encrypt([]byte(entry.Entry.Name), cipherKey); encErr == nil { if encrypted, encErr := util.Encrypt([]byte(entry.Entry.Name), cipherKey); encErr == nil {
@ -116,7 +116,11 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
return return
} }
outputChan <- bytes
select {
case outputChan <- bytes:
case <-ctx.Done():
return ctx.Err()
}
return nil return nil
}, func(outputChan chan interface{}) error { }, func(outputChan chan interface{}) error {
sizeBuf := make([]byte, 4) sizeBuf := make([]byte, 4)
@ -143,14 +147,21 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
} }
func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{}) error) error {
func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(ctx context.Context, entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{}) error) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
outputChan := make(chan interface{}, 1024) outputChan := make(chan interface{}, 1024)
saveErrChan := make(chan error, 1) saveErrChan := make(chan error, 1)
go func() { go func() {
saveErrChan <- saveFn(outputChan)
saveErr := saveFn(outputChan)
saveErrChan <- saveErr
if saveErr != nil {
cancel()
}
wg.Done() wg.Done()
}() }()
@ -160,7 +171,7 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer,
var hasErr atomic.Bool var hasErr atomic.Bool
// also save the directory itself (path) if it exists in the filer // also save the directory itself (path) if it exists in the filer
if e, getErr := filer_pb.GetEntry(context.Background(), filerClient, util.FullPath(path)); getErr != nil {
if e, getErr := filer_pb.GetEntry(ctx, filerClient, util.FullPath(path)); getErr != nil {
// Entry not found is expected and can be ignored; log other errors. // Entry not found is expected and can be ignored; log other errors.
if !errors.Is(getErr, filer_pb.ErrNotFound) { if !errors.Is(getErr, filer_pb.ErrNotFound) {
fmt.Fprintf(writer, "failed to get entry %s: %v\n", path, getErr) fmt.Fprintf(writer, "failed to get entry %s: %v\n", path, getErr)
@ -172,7 +183,7 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer,
Entry: e, Entry: e,
} }
if genErr := genFn(protoMessage, outputChan); genErr != nil {
if genErr := genFn(ctx, protoMessage, outputChan); genErr != nil {
once.Do(func() { once.Do(func() {
firstErr = genErr firstErr = genErr
hasErr.Store(true) hasErr.Store(true)
@ -187,7 +198,6 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer,
} }
} }
ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
err := filer_pb.TraverseBfs(ctx, filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error { err := filer_pb.TraverseBfs(ctx, filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error {
@ -205,7 +215,7 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer,
// fail-fast: stop traversal once an error is observed. // fail-fast: stop traversal once an error is observed.
return firstErr return firstErr
} }
if genErr := genFn(protoMessage, outputChan); genErr != nil {
if genErr := genFn(ctx, protoMessage, outputChan); genErr != nil {
once.Do(func() { once.Do(func() {
firstErr = genErr firstErr = genErr
hasErr.Store(true) hasErr.Store(true)

Loading…
Cancel
Save