diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index 3454ea72c..a76c92d80 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -157,26 +157,18 @@ func (metaBackup *FilerMetaBackupOptions) shouldInclude(fullpath string) bool { } func (metaBackup *FilerMetaBackupOptions) traverseMetadata() (err error) { - var saveErr error - - traverseErr := filer_pb.TraverseBfs(metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) { + return filer_pb.TraverseBfs(context.Background(), metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) error { fullpath := string(parentPath.Child(entry.Name)) if !metaBackup.shouldInclude(fullpath) { - return + return nil } println("+", fullpath) if err := metaBackup.store.InsertEntry(context.Background(), filer.FromPbEntry(string(parentPath), entry)); err != nil { - saveErr = fmt.Errorf("insert entry error: %w\n", err) - return + return fmt.Errorf("insert entry error: %w", err) } - + return nil }) - - if traverseErr != nil { - return fmt.Errorf("traverse: %w", traverseErr) - } - return saveErr } var ( diff --git a/weed/pb/filer_pb/filer_client_bfs.go b/weed/pb/filer_pb/filer_client_bfs.go index a3b4e98c9..f6dcdf880 100644 --- a/weed/pb/filer_pb/filer_client_bfs.go +++ b/weed/pb/filer_pb/filer_client_bfs.go @@ -12,58 +12,93 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) -func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry)) (err error) { +func TraverseBfs(ctx context.Context, filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry) error) (err error) { K := 5 - var jobQueueWg sync.WaitGroup + ctx, cancel := context.WithCancel(ctx) + defer cancel() + queue := util.NewQueue[util.FullPath]() - jobQueueWg.Add(1) + var pending sync.WaitGroup + pending.Add(1) queue.Enqueue(parentPath) - terminates := make([]chan bool, K) + var once sync.Once + var firstErr error + + enqueue := func(p util.FullPath) bool { + // Stop expanding traversal once canceled (e.g. first error encountered). + if ctx.Err() != nil { + return false + } + pending.Add(1) + queue.Enqueue(p) + return true + } + + done := make(chan struct{}) + var workers sync.WaitGroup for i := 0; i < K; i++ { - terminates[i] = make(chan bool) - go func(j int) { + workers.Add(1) + go func() { + defer workers.Done() for { select { - case <-terminates[j]: + case <-done: return default: - t := queue.Dequeue() - if t == "" { - time.Sleep(329 * time.Millisecond) + } + + dir := queue.Dequeue() + if dir == "" { + // queue is empty for now + select { + case <-done: + return + case <-time.After(50 * time.Millisecond): continue } - dir := t - processErr := processOneDirectory(filerClient, dir, queue, &jobQueueWg, fn) + } + + // Always mark the directory as done so the closer can finish. + if ctx.Err() == nil { + processErr := processOneDirectory(ctx, filerClient, dir, enqueue, fn) if processErr != nil { - err = processErr + once.Do(func() { + firstErr = processErr + cancel() + }) } - jobQueueWg.Done() } + pending.Done() } - }(i) - } - jobQueueWg.Wait() - for i := 0; i < K; i++ { - close(terminates[i]) + }() } - return + + pending.Wait() + close(done) + + workers.Wait() + + return firstErr } -func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queue *util.Queue[util.FullPath], jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *Entry)) (err error) { +func processOneDirectory(ctx context.Context, filerClient FilerClient, parentPath util.FullPath, enqueue func(p util.FullPath) bool, fn func(parentPath util.FullPath, entry *Entry) error) (err error) { - return ReadDirAllEntries(context.Background(), filerClient, parentPath, "", func(entry *Entry, isLast bool) error { + return ReadDirAllEntries(ctx, filerClient, parentPath, "", func(entry *Entry, isLast bool) error { - fn(parentPath, entry) + if err := fn(parentPath, entry); err != nil { + return err + } if entry.IsDirectory { subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name) if parentPath == "/" { subDir = "/" + entry.Name } - jobQueueWg.Add(1) - queue.Enqueue(util.FullPath(subDir)) + if !enqueue(util.FullPath(subDir)) { + return ctx.Err() + } } return nil }) diff --git a/weed/shell/command_fs_merge_volumes.go b/weed/shell/command_fs_merge_volumes.go index e5ae54285..2ed2c0a61 100644 --- a/weed/shell/command_fs_merge_volumes.go +++ b/weed/shell/command_fs_merge_volumes.go @@ -109,9 +109,9 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer defer util_http.GetGlobalHttpClient().CloseIdleConnections() return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { - return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) { + return filer_pb.TraverseBfs(context.Background(), commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) error { if entry.IsDirectory { - return + return nil } for _, chunk := range entry.Chunks { chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId) @@ -141,6 +141,7 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer fmt.Printf("failed to update %s: %v\n", path, err) } } + return nil }) }) } diff --git a/weed/shell/command_fs_meta_change_volume_id.go b/weed/shell/command_fs_meta_change_volume_id.go index ec7cba729..2eded1afd 100644 --- a/weed/shell/command_fs_meta_change_volume_id.go +++ b/weed/shell/command_fs_meta_change_volume_id.go @@ -75,13 +75,13 @@ func (c *commandFsMetaChangeVolumeId) Do(args []string, commandEnv *CommandEnv, } return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return filer_pb.TraverseBfs(commandEnv, util.FullPath(*dir), func(parentPath util.FullPath, entry *filer_pb.Entry) { + return filer_pb.TraverseBfs(context.Background(), commandEnv, util.FullPath(*dir), func(parentPath util.FullPath, entry *filer_pb.Entry) error { if !entry.IsDirectory { var hasChanges bool for _, chunk := range entry.Chunks { if chunk.IsChunkManifest { fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name) - return + return nil } chunkVolumeId := chunk.Fid.VolumeId if toVolumeId, found := mapping[needle.VolumeId(chunkVolumeId)]; found { @@ -102,6 +102,7 @@ func (c *commandFsMetaChangeVolumeId) Do(args []string, commandEnv *CommandEnv, } } } + return nil }) }) } diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go index ea40b662d..911bff7dc 100644 --- a/weed/shell/command_fs_meta_notify.go +++ b/weed/shell/command_fs_meta_notify.go @@ -1,6 +1,7 @@ package shell import ( + "context" "fmt" "io" @@ -51,7 +52,7 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i var dirCount, fileCount uint64 - err = filer_pb.TraverseBfs(commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { + err = filer_pb.TraverseBfs(context.Background(), commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error { if entry.IsDirectory { dirCount++ @@ -69,7 +70,7 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i if notifyErr != nil { fmt.Fprintf(writer, "fail to notify new entry event for %s: %v\n", parentPath.Child(entry.Name), notifyErr) } - + return nil }) if err == nil { diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index 377c5e12e..e9e787d83 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -118,14 +118,21 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. outputChan <- bytes return nil - }, func(outputChan chan interface{}) { + }, func(outputChan chan interface{}) error { sizeBuf := make([]byte, 4) for item := range outputChan { b := item.([]byte) util.Uint32toBytes(sizeBuf, uint32(len(b))) - dst.Write(sizeBuf) - dst.Write(b) + _, err := dst.Write(sizeBuf) + if err != nil { + return err + } + _, err = dst.Write(b) + if err != nil { + return err + } } + return nil }) if err == nil { @@ -136,17 +143,21 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. } -func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{})) error { +func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{}) error) error { var wg sync.WaitGroup wg.Add(1) outputChan := make(chan interface{}, 1024) + saveErrChan := make(chan error, 1) go func() { - saveFn(outputChan) + saveErrChan <- saveFn(outputChan) wg.Done() }() var dirCount, fileCount uint64 + var once sync.Once + var firstErr error + var hasErr atomic.Bool // also save the directory itself (path) if it exists in the filer if e, getErr := filer_pb.GetEntry(context.Background(), filerClient, util.FullPath(path)); getErr != nil { @@ -160,8 +171,13 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, Dir: parentDir, Entry: e, } + if genErr := genFn(protoMessage, outputChan); genErr != nil { - fmt.Fprintf(writer, "marshall error: %v\n", genErr) + once.Do(func() { + firstErr = genErr + hasErr.Store(true) + }) + return genErr } else { if e.IsDirectory { atomic.AddUint64(&dirCount, 1) @@ -171,10 +187,13 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, } } - err := filer_pb.TraverseBfs(filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := filer_pb.TraverseBfs(ctx, filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error { - if strings.HasPrefix(string(parentPath), filer.SystemLogDir) { - return + parent := string(parentPath) + if parent == filer.SystemLogDir || strings.HasPrefix(parent, filer.SystemLogDir+"/") { + return nil } protoMessage := &filer_pb.FullEntry{ @@ -182,9 +201,17 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, Entry: entry, } - if err := genFn(protoMessage, outputChan); err != nil { - fmt.Fprintf(writer, "marshall error: %v\n", err) - return + if hasErr.Load() { + // fail-fast: stop traversal once an error is observed. + return firstErr + } + if genErr := genFn(protoMessage, outputChan); genErr != nil { + once.Do(func() { + firstErr = genErr + hasErr.Store(true) + cancel() + }) + return genErr } if entry.IsDirectory { @@ -197,14 +224,23 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, println(parentPath.Child(entry.Name)) } + return nil }) close(outputChan) wg.Wait() + saveErr := <-saveErrChan - if err == nil && writer != nil { + if err != nil { + return err + } + if saveErr != nil { + return saveErr + } + + if writer != nil { fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount) } - return err + return nil } diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go index 78f154405..c09f0536a 100644 --- a/weed/shell/command_fs_verify.go +++ b/weed/shell/command_fs_verify.go @@ -300,7 +300,7 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errC } return nil }, - func(outputChan chan interface{}) { + func(outputChan chan interface{}) error { var wg sync.WaitGroup itemErrCount := atomic.NewUint64(0) for itemEntry := range outputChan { @@ -315,5 +315,6 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errC } wg.Wait() errCount = itemErrCount.Load() + return nil }) } diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 5d95f369a..14209f0d6 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -266,7 +266,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m } return nil }, - func(outputChan chan interface{}) { + func(outputChan chan interface{}) error { buffer := make([]byte, readbufferSize) for item := range outputChan { i := item.(*Item) @@ -274,8 +274,12 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m util.Uint64toBytes(buffer, i.fileKey) util.Uint32toBytes(buffer[8:], i.cookie) util.Uint32toBytes(buffer[12:], uint32(len(i.path))) - f.Write(buffer) - f.Write([]byte(i.path)) + if _, err := f.Write(buffer); err != nil { + return err + } + if _, err := f.Write([]byte(i.path)); err != nil { + return err + } } else if *c.findMissingChunksInFiler && len(c.volumeIds) == 0 { fmt.Fprintf(c.writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path) if purgeAbsent { @@ -284,6 +288,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m } } } + return nil }) } @@ -482,10 +487,10 @@ func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleI break } if readErr != nil { - return readErr + return fmt.Errorf("read fid header for volume %d: %w", volumeId, readErr) } if readSize != readbufferSize { - return fmt.Errorf("readSize mismatch") + return fmt.Errorf("read fid header size mismatch for volume %d: got %d want %d", volumeId, readSize, readbufferSize) } item.fileKey = util.BytesToUint64(buffer[:8]) item.cookie = util.BytesToUint32(buffer[8:12]) @@ -493,10 +498,10 @@ func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleI pathBytes := make([]byte, int(pathSize)) n, err := io.ReadFull(br, pathBytes) if err != nil { - fmt.Fprintf(c.writer, "%d,%x%08x in unexpected error: %v\n", volumeId, item.fileKey, item.cookie, err) + return fmt.Errorf("read fid path for volume %d,%x%08x: %w", volumeId, item.fileKey, item.cookie, err) } if n != int(pathSize) { - fmt.Fprintf(c.writer, "%d,%x%08x %d unexpected file name size %d\n", volumeId, item.fileKey, item.cookie, pathSize, n) + return fmt.Errorf("read fid path size mismatch for volume %d,%x%08x: got %d want %d", volumeId, item.fileKey, item.cookie, n, pathSize) } item.path = util.FullPath(pathBytes) needleId := types.NeedleId(item.fileKey) @@ -748,10 +753,10 @@ func writeToFile(bytes []byte, fileName string) error { flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC dst, err := os.OpenFile(fileName, flags, 0644) if err != nil { - return nil + return err } defer dst.Close() - dst.Write(bytes) - return nil + _, err = dst.Write(bytes) + return err }