From 4bc2b6d62b70cbc322706191e0e98228af4729e3 Mon Sep 17 00:00:00 2001 From: Bruce Zou Date: Wed, 10 Dec 2025 13:13:29 +0800 Subject: [PATCH] fix nfs list with prefix batch scan (#7694) * fix nfs list with prefix batch scan * remove else branch --- weed/mount/weedfs_dir_read.go | 99 ++++++++++++++++++++++++----------- 1 file changed, 69 insertions(+), 30 deletions(-) diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go index 22af4cc5c..492b5422d 100644 --- a/weed/mount/weedfs_dir_read.go +++ b/weed/mount/weedfs_dir_read.go @@ -2,7 +2,6 @@ package mount import ( "context" - "math" "sync" "github.com/hanwen/go-fuse/v2/fuse" @@ -16,6 +15,7 @@ type DirectoryHandleId uint64 const ( directoryStreamBaseOffset = 2 // . & .. + batchSize = 1000 ) // DirectoryHandle represents an open directory handle. @@ -166,11 +166,17 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl } var dirEntry fuse.DirEntry - processEachEntryFn := func(entry *filer.Entry) bool { + + // index is the position in entryStream, used to calculate the offset for next readdir + processEachEntryFn := func(entry *filer.Entry, index int64) bool { dirEntry.Name = entry.Name() dirEntry.Mode = toSyscallMode(entry.Mode) inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, false) dirEntry.Ino = inode + + // Set Off to the next offset so client can resume from correct position + dirEntry.Off = dh.entryStreamOffset + uint64(index) + 1 + if !isPlusMode { if !out.AddDirEntry(dirEntry) { isEarlyTerminated = true @@ -195,59 +201,92 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl if input.Offset < directoryStreamBaseOffset { if !isPlusMode { if input.Offset == 0 { - out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."}) + out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".", Off: 1}) } - out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."}) + out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "..", Off: 2}) } else { if input.Offset == 0 { - out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."}) + out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".", Off: 1}) } - out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."}) + out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "..", Off: 2}) } input.Offset = directoryStreamBaseOffset } var lastEntryName string + + // Read from cache first, then load next batch if needed if input.Offset >= dh.entryStreamOffset { + // Handle case: new handle with non-zero offset but empty cache + // This happens when NFS-Ganesha opens multiple directory handles + if len(dh.entryStream) == 0 && input.Offset > dh.entryStreamOffset { + skipCount := int64(input.Offset - dh.entryStreamOffset) + + if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil { + glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) + return fuse.EIO + } + + // Load entries from beginning to fill cache up to the requested offset + loadErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, skipCount+int64(batchSize), func(entry *filer.Entry) (bool, error) { + dh.entryStream = append(dh.entryStream, entry) + return true, nil + }) + if loadErr != nil { + glog.Errorf("list meta cache: %v", loadErr) + return fuse.EIO + } + } + if input.Offset > dh.entryStreamOffset { entryPreviousIndex := (input.Offset - dh.entryStreamOffset) - 1 if uint64(len(dh.entryStream)) > entryPreviousIndex { lastEntryName = dh.entryStream[entryPreviousIndex].Name() - dh.entryStream = dh.entryStream[entryPreviousIndex:] - dh.entryStreamOffset = input.Offset - 1 } } - entryCurrentIndex := input.Offset - dh.entryStreamOffset - for uint64(len(dh.entryStream)) > entryCurrentIndex { + + entryCurrentIndex := int64(input.Offset - dh.entryStreamOffset) + for int64(len(dh.entryStream)) > entryCurrentIndex { entry := dh.entryStream[entryCurrentIndex] - if processEachEntryFn(entry) { + if processEachEntryFn(entry, entryCurrentIndex) { lastEntryName = entry.Name() entryCurrentIndex++ } else { - // early terminated return fuse.OK } } - } - if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil { - glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) - return fuse.EIO - } - listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) (bool, error) { - dh.entryStream = append(dh.entryStream, entry) - if !processEachEntryFn(entry) { - return false, nil - } - return true, nil - }) - if listErr != nil { - glog.Errorf("list meta cache: %v", listErr) - return fuse.EIO - } + // Cache exhausted, load next batch + if !isEarlyTerminated { + if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil { + glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) + return fuse.EIO + } + + // Batch loading: fetch batchSize entries starting from lastEntryName + loadedCount := 0 + bufferFull := false + loadErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(batchSize), func(entry *filer.Entry) (bool, error) { + currentIndex := int64(len(dh.entryStream)) + dh.entryStream = append(dh.entryStream, entry) + loadedCount++ + if !processEachEntryFn(entry, currentIndex) { + bufferFull = true + return false, nil + } + return true, nil + }) + if loadErr != nil { + glog.Errorf("list meta cache: %v", loadErr) + return fuse.EIO + } - if !isEarlyTerminated { - dh.isFinished = true + // Mark finished only when loading completed normally (not buffer full) + // and we got fewer entries than requested + if !bufferFull && loadedCount < batchSize { + dh.isFinished = true + } + } } return fuse.OK