diff --git a/weed/pb/filer_pb/filer_client_bfs.go b/weed/pb/filer_pb/filer_client_bfs.go index a3b4e98c9..9425b84d7 100644 --- a/weed/pb/filer_pb/filer_client_bfs.go +++ b/weed/pb/filer_pb/filer_client_bfs.go @@ -13,57 +13,97 @@ import ( ) func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry)) (err error) { + return TraverseBfsWithContext(context.Background(), filerClient, parentPath, func(parentPath util.FullPath, entry *Entry) error { + fn(parentPath, entry) + return nil + }) +} + +func TraverseBfsWithContext(ctx context.Context, filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry) error) (err error) { K := 5 - var jobQueueWg sync.WaitGroup + ctx, cancel := context.WithCancel(ctx) + defer cancel() + queue := util.NewQueue[util.FullPath]() - jobQueueWg.Add(1) + var pending sync.WaitGroup + pending.Add(1) queue.Enqueue(parentPath) - terminates := make([]chan bool, K) + var once sync.Once + var firstErr error + + enqueue := func(p util.FullPath) bool { + // Stop expanding traversal once canceled (e.g. first error encountered). + if ctx.Err() != nil { + return false + } + pending.Add(1) + queue.Enqueue(p) + return true + } + + done := make(chan struct{}) + var workers sync.WaitGroup for i := 0; i < K; i++ { - terminates[i] = make(chan bool) - go func(j int) { + workers.Add(1) + go func() { + defer workers.Done() for { select { - case <-terminates[j]: + case <-done: return default: - t := queue.Dequeue() - if t == "" { - time.Sleep(329 * time.Millisecond) + } + + dir := queue.Dequeue() + if dir == "" { + // queue is empty for now + select { + case <-done: + return + case <-time.After(50 * time.Millisecond): continue } - dir := t - processErr := processOneDirectory(filerClient, dir, queue, &jobQueueWg, fn) + } + + // Always mark the directory as done so the closer can finish. + if ctx.Err() == nil { + processErr := processOneDirectory(ctx, filerClient, dir, enqueue, fn) if processErr != nil { - err = processErr + once.Do(func() { + firstErr = processErr + cancel() + }) } - jobQueueWg.Done() } + pending.Done() } - }(i) + }() } - jobQueueWg.Wait() - for i := 0; i < K; i++ { - close(terminates[i]) - } - return + + pending.Wait() + close(done) + + workers.Wait() + + return firstErr } -func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queue *util.Queue[util.FullPath], jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *Entry)) (err error) { +func processOneDirectory(ctx context.Context, filerClient FilerClient, parentPath util.FullPath, enqueue func(p util.FullPath) bool, fn func(parentPath util.FullPath, entry *Entry) error) (err error) { - return ReadDirAllEntries(context.Background(), filerClient, parentPath, "", func(entry *Entry, isLast bool) error { + return ReadDirAllEntries(ctx, filerClient, parentPath, "", func(entry *Entry, isLast bool) error { - fn(parentPath, entry) + if err := fn(parentPath, entry); err != nil { + return err + } if entry.IsDirectory { subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name) if parentPath == "/" { subDir = "/" + entry.Name } - jobQueueWg.Add(1) - queue.Enqueue(util.FullPath(subDir)) + enqueue(util.FullPath(subDir)) } return nil }) diff --git a/weed/server/master_grpc_server_raft.go b/weed/server/master_grpc_server_raft.go index 4aab7fe6e..aa7c6ff28 100644 --- a/weed/server/master_grpc_server_raft.go +++ b/weed/server/master_grpc_server_raft.go @@ -14,22 +14,47 @@ func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_ resp := &master_pb.RaftListClusterServersResponse{} ms.Topo.RaftServerAccessLock.RLock() - if ms.Topo.HashicorpRaft == nil { + if ms.Topo.HashicorpRaft == nil && ms.Topo.RaftServer == nil { ms.Topo.RaftServerAccessLock.RUnlock() return resp, nil } - servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers - _, leaderId := ms.Topo.HashicorpRaft.LeaderWithID() - ms.Topo.RaftServerAccessLock.RUnlock() + if ms.Topo.HashicorpRaft != nil { + servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers + _, leaderId := ms.Topo.HashicorpRaft.LeaderWithID() + ms.Topo.RaftServerAccessLock.RUnlock() - for _, server := range servers { + for _, server := range servers { + resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{ + Id: string(server.ID), + Address: string(server.Address), + Suffrage: server.Suffrage.String(), + IsLeader: server.ID == leaderId, + }) + } + } else if ms.Topo.RaftServer != nil { + peers := ms.Topo.RaftServer.Peers() + leader := ms.Topo.RaftServer.Leader() + currentServerName := ms.Topo.RaftServer.Name() + ms.Topo.RaftServerAccessLock.RUnlock() + + // Add the current server itself (Peers() only returns other peers) resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{ - Id: string(server.ID), - Address: string(server.Address), - Suffrage: server.Suffrage.String(), - IsLeader: server.ID == leaderId, + Id: currentServerName, + Address: string(ms.option.Master), + Suffrage: "Voter", + IsLeader: currentServerName == leader, }) + + // Add all other peers + for _, peer := range peers { + resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{ + Id: peer.Name, + Address: peer.ConnectionString, + Suffrage: "Voter", + IsLeader: peer.Name == leader, + }) + } } return resp, nil } diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index 377c5e12e..0c4b51eb9 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -118,14 +118,21 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. outputChan <- bytes return nil - }, func(outputChan chan interface{}) { + }, func(outputChan chan interface{}) error { sizeBuf := make([]byte, 4) for item := range outputChan { b := item.([]byte) util.Uint32toBytes(sizeBuf, uint32(len(b))) - dst.Write(sizeBuf) - dst.Write(b) + _, err := dst.Write(sizeBuf) + if err != nil { + return err + } + _, err = dst.Write(b) + if err != nil { + return err + } } + return nil }) if err == nil { @@ -136,17 +143,21 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. } -func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{})) error { +func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{}) error) error { var wg sync.WaitGroup wg.Add(1) outputChan := make(chan interface{}, 1024) + saveErrChan := make(chan error, 1) go func() { - saveFn(outputChan) + saveErrChan <- saveFn(outputChan) wg.Done() }() var dirCount, fileCount uint64 + var once sync.Once + var firstErr error + var hasErr atomic.Bool // also save the directory itself (path) if it exists in the filer if e, getErr := filer_pb.GetEntry(context.Background(), filerClient, util.FullPath(path)); getErr != nil { @@ -160,8 +171,16 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, Dir: parentDir, Entry: e, } + if hasErr.Load() { + // fail-fast: do not continue emitting partial results after first error + return firstErr + } if genErr := genFn(protoMessage, outputChan); genErr != nil { - fmt.Fprintf(writer, "marshall error: %v\n", genErr) + once.Do(func() { + firstErr = genErr + hasErr.Store(true) + }) + return genErr } else { if e.IsDirectory { atomic.AddUint64(&dirCount, 1) @@ -171,10 +190,12 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, } } - err := filer_pb.TraverseBfs(filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := filer_pb.TraverseBfsWithContext(ctx, filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error { if strings.HasPrefix(string(parentPath), filer.SystemLogDir) { - return + return nil } protoMessage := &filer_pb.FullEntry{ @@ -182,9 +203,17 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, Entry: entry, } - if err := genFn(protoMessage, outputChan); err != nil { - fmt.Fprintf(writer, "marshall error: %v\n", err) - return + if hasErr.Load() { + // fail-fast: stop traversal once an error is observed. + return firstErr + } + if genErr := genFn(protoMessage, outputChan); genErr != nil { + once.Do(func() { + firstErr = genErr + hasErr.Store(true) + cancel() + }) + return genErr } if entry.IsDirectory { @@ -197,11 +226,20 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, println(parentPath.Child(entry.Name)) } + return nil }) close(outputChan) wg.Wait() + saveErr := <-saveErrChan + + if firstErr != nil { + return firstErr + } + if saveErr != nil { + return saveErr + } if err == nil && writer != nil { fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount) diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go index 78f154405..c09f0536a 100644 --- a/weed/shell/command_fs_verify.go +++ b/weed/shell/command_fs_verify.go @@ -300,7 +300,7 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errC } return nil }, - func(outputChan chan interface{}) { + func(outputChan chan interface{}) error { var wg sync.WaitGroup itemErrCount := atomic.NewUint64(0) for itemEntry := range outputChan { @@ -315,5 +315,6 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errC } wg.Wait() errCount = itemErrCount.Load() + return nil }) } diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 5d95f369a..14209f0d6 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -266,7 +266,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m } return nil }, - func(outputChan chan interface{}) { + func(outputChan chan interface{}) error { buffer := make([]byte, readbufferSize) for item := range outputChan { i := item.(*Item) @@ -274,8 +274,12 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m util.Uint64toBytes(buffer, i.fileKey) util.Uint32toBytes(buffer[8:], i.cookie) util.Uint32toBytes(buffer[12:], uint32(len(i.path))) - f.Write(buffer) - f.Write([]byte(i.path)) + if _, err := f.Write(buffer); err != nil { + return err + } + if _, err := f.Write([]byte(i.path)); err != nil { + return err + } } else if *c.findMissingChunksInFiler && len(c.volumeIds) == 0 { fmt.Fprintf(c.writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path) if purgeAbsent { @@ -284,6 +288,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m } } } + return nil }) } @@ -482,10 +487,10 @@ func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleI break } if readErr != nil { - return readErr + return fmt.Errorf("read fid header for volume %d: %w", volumeId, readErr) } if readSize != readbufferSize { - return fmt.Errorf("readSize mismatch") + return fmt.Errorf("read fid header size mismatch for volume %d: got %d want %d", volumeId, readSize, readbufferSize) } item.fileKey = util.BytesToUint64(buffer[:8]) item.cookie = util.BytesToUint32(buffer[8:12]) @@ -493,10 +498,10 @@ func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleI pathBytes := make([]byte, int(pathSize)) n, err := io.ReadFull(br, pathBytes) if err != nil { - fmt.Fprintf(c.writer, "%d,%x%08x in unexpected error: %v\n", volumeId, item.fileKey, item.cookie, err) + return fmt.Errorf("read fid path for volume %d,%x%08x: %w", volumeId, item.fileKey, item.cookie, err) } if n != int(pathSize) { - fmt.Fprintf(c.writer, "%d,%x%08x %d unexpected file name size %d\n", volumeId, item.fileKey, item.cookie, pathSize, n) + return fmt.Errorf("read fid path size mismatch for volume %d,%x%08x: got %d want %d", volumeId, item.fileKey, item.cookie, n, pathSize) } item.path = util.FullPath(pathBytes) needleId := types.NeedleId(item.fileKey) @@ -748,10 +753,10 @@ func writeToFile(bytes []byte, fileName string) error { flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC dst, err := os.OpenFile(fileName, flags, 0644) if err != nil { - return nil + return err } defer dst.Close() - dst.Write(bytes) - return nil + _, err = dst.Write(bytes) + return err }