diff --git a/weed/mount/meta_cache/meta_cache_init.go b/weed/mount/meta_cache/meta_cache_init.go index 679cb953d..4ef1757d1 100644 --- a/weed/mount/meta_cache/meta_cache_init.go +++ b/weed/mount/meta_cache/meta_cache_init.go @@ -68,8 +68,9 @@ func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullP if err != nil { err = fmt.Errorf("list %s: %v", path, err) + } else { + mc.markCachedFn(path) } - mc.markCachedFn(path) return err } diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go index a1b4ac0d5..1baa68fa2 100644 --- a/weed/mount/weedfs_dir_read.go +++ b/weed/mount/weedfs_dir_read.go @@ -12,9 +12,22 @@ import ( type DirectoryHandleId uint64 +const ( + directoryStreamBaseOffset = 2 // . & .. +) + type DirectoryHandle struct { - isFinished bool - lastEntryName string + isFinished bool + entryStream []*filer.Entry + entryStreamOffset uint64 +} + +func (dh *DirectoryHandle) reset() { + *dh = DirectoryHandle{ + isFinished: false, + entryStream: []*filer.Entry{}, + entryStreamOffset: directoryStreamBaseOffset, + } } type DirectoryHandleToInode struct { @@ -37,10 +50,8 @@ func (wfs *WFS) AcquireDirectoryHandle() (DirectoryHandleId, *DirectoryHandle) { wfs.dhmap.Lock() defer wfs.dhmap.Unlock() - dh := &DirectoryHandle{ - isFinished: false, - lastEntryName: "", - } + dh := new(DirectoryHandle) + dh.reset() wfs.dhmap.dir2inode[DirectoryHandleId(fh)] = dh return DirectoryHandleId(fh), dh } @@ -51,11 +62,8 @@ func (wfs *WFS) GetDirectoryHandle(dhid DirectoryHandleId) *DirectoryHandle { if dh, found := wfs.dhmap.dir2inode[dhid]; found { return dh } - dh := &DirectoryHandle{ - isFinished: false, - lastEntryName: "", - } - + dh := new(DirectoryHandle) + dh.reset() wfs.dhmap.dir2inode[dhid] = dh return dh } @@ -130,13 +138,12 @@ func (wfs *WFS) ReadDirPlus(cancel <-chan struct{}, input *fuse.ReadIn, out *fus } func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPlusMode bool) fuse.Status { - dh := wfs.GetDirectoryHandle(DirectoryHandleId(input.Fh)) if input.Offset == 0 { - dh.isFinished = false - dh.lastEntryName = "" - } else { - if dh.isFinished { + dh.reset() + } else if dh.isFinished && input.Offset >= dh.entryStreamOffset { + entryCurrentIndex := input.Offset - dh.entryStreamOffset + if uint64(len(dh.entryStream)) <= entryCurrentIndex { return fuse.OK } } @@ -148,29 +155,17 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl } var dirEntry fuse.DirEntry - if input.Offset == 0 { - if !isPlusMode { - out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."}) - out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."}) - } else { - out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."}) - out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."}) - } - } - - processEachEntryFn := func(entry *filer.Entry, isLast bool) bool { + processEachEntryFn := func(entry *filer.Entry) bool { dirEntry.Name = entry.Name() dirEntry.Mode = toSyscallMode(entry.Mode) + inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, isPlusMode) + dirEntry.Ino = inode if !isPlusMode { - inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, false) - dirEntry.Ino = inode if !out.AddDirEntry(dirEntry) { isEarlyTerminated = true return false } } else { - inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, true) - dirEntry.Ino = inode entryOut := out.AddDirLookupEntry(dirEntry) if entryOut == nil { isEarlyTerminated = true @@ -182,35 +177,59 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl } wfs.outputFilerEntry(entryOut, inode, entry) } - dh.lastEntryName = entry.Name() return true } - entryChan := make(chan *filer.Entry, 128) - var err error - go func() { - if err = meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath, entryChan); err != nil { - glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) + if input.Offset < directoryStreamBaseOffset { + if !isPlusMode { + if input.Offset == 0 { + out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."}) + } + out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."}) + } else { + if input.Offset == 0 { + out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."}) + } + out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."}) } - close(entryChan) - }() - hasData := false - for entry := range entryChan { - hasData = true - processEachEntryFn(entry, false) - } - if err != nil { - return fuse.EIO + input.Offset = directoryStreamBaseOffset } - if !hasData { - listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, dh.lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool { - return processEachEntryFn(entry, false) - }) - if listErr != nil { - glog.Errorf("list meta cache: %v", listErr) - return fuse.EIO + var lastEntryName string + if input.Offset >= dh.entryStreamOffset { + if input.Offset > dh.entryStreamOffset { + entryPreviousIndex := (input.Offset - dh.entryStreamOffset) - 1 + if uint64(len(dh.entryStream)) > entryPreviousIndex { + lastEntryName = dh.entryStream[entryPreviousIndex].Name() + dh.entryStream = dh.entryStream[entryPreviousIndex:] + dh.entryStreamOffset = input.Offset - 1 + } } + entryCurrentIndex := input.Offset - dh.entryStreamOffset + for uint64(len(dh.entryStream)) > entryCurrentIndex { + entry := dh.entryStream[entryCurrentIndex] + if processEachEntryFn(entry) { + lastEntryName = entry.Name() + entryCurrentIndex++ + } else { + // early terminated + return fuse.OK + } + } + } + + var err error + if err = meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath, nil); err != nil { + glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) + return fuse.EIO + } + listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool { + dh.entryStream = append(dh.entryStream, entry) + return processEachEntryFn(entry) + }) + if listErr != nil { + glog.Errorf("list meta cache: %v", listErr) + return fuse.EIO } if !isEarlyTerminated {