From 49b84b6e2a5565cc133c184e1a25a4110f76b24b Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 16 Feb 2022 21:32:15 -0800 Subject: [PATCH] list entries while reading from remote --- weed/mount/meta_cache/meta_cache_init.go | 27 ++++++++++++++------ weed/mount/weedfs.go | 2 +- weed/mount/weedfs_dir_lookup.go | 2 +- weed/mount/weedfs_dir_read.go | 32 ++++++++++++++++++------ 4 files changed, 45 insertions(+), 18 deletions(-) diff --git a/weed/mount/meta_cache/meta_cache_init.go b/weed/mount/meta_cache/meta_cache_init.go index cd9c71668..ef14fbb3f 100644 --- a/weed/mount/meta_cache/meta_cache_init.go +++ b/weed/mount/meta_cache/meta_cache_init.go @@ -10,24 +10,32 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error { +func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath, entryChan chan *filer.Entry) error { + + currentPath := dirPath for { // the directory children are already cached // so no need for this and upper directories - if mc.isCachedFn(dirPath) { + if mc.isCachedFn(currentPath) { return nil } - if err := doEnsureVisited(mc, client, dirPath); err != nil { - return err + if entryChan != nil && dirPath == currentPath { + if err := doEnsureVisited(mc, client, currentPath, entryChan); err != nil { + return err + } + } else { + if err := doEnsureVisited(mc, client, currentPath, nil); err != nil { + return err + } } // continue to parent directory - if dirPath != "/" { - parent, _ := dirPath.DirAndName() - dirPath = util.FullPath(parent) + if currentPath != "/" { + parent, _ := currentPath.DirAndName() + currentPath = util.FullPath(parent) } else { break } @@ -37,7 +45,7 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full } -func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error { +func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullPath, entryChan chan *filer.Entry) error { glog.V(4).Infof("ReadDirAllEntries %s ...", path) @@ -51,6 +59,9 @@ func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullP glog.V(0).Infof("read %s: %v", entry.FullPath, err) return err } + if entryChan != nil { + entryChan <- entry + } return nil }) }) diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 51bf641ef..b070fdd6c 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -140,7 +140,7 @@ func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.St } // read from async meta cache - meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir)) + meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir), nil) cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) if cacheErr == filer_pb.ErrNotFound { return nil, fuse.ENOENT diff --git a/weed/mount/weedfs_dir_lookup.go b/weed/mount/weedfs_dir_lookup.go index a4befa7fa..852e08444 100644 --- a/weed/mount/weedfs_dir_lookup.go +++ b/weed/mount/weedfs_dir_lookup.go @@ -24,7 +24,7 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin fullFilePath := dirPath.Child(name) - visitErr := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath) + visitErr := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath, nil) if visitErr != nil { glog.Errorf("dir Lookup %s: %v", dirPath, visitErr) return fuse.EIO diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go index 40726a694..a14f4d960 100644 --- a/weed/mount/weedfs_dir_read.go +++ b/weed/mount/weedfs_dir_read.go @@ -179,17 +179,33 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl return true } - if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil { - glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) - return fuse.EIO + entryChan := make(chan *filer.Entry, 128) + var err error + go func() { + if err = meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath, entryChan); err != nil { + glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) + } + close(entryChan) + }() + hasData := false + for entry := range entryChan { + hasData = true + processEachEntryFn(entry, false) } - listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, dh.lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool { - return processEachEntryFn(entry, false) - }) - if listErr != nil { - glog.Errorf("list meta cache: %v", listErr) + if err != nil { return fuse.EIO } + + if !hasData { + listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, dh.lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool { + return processEachEntryFn(entry, false) + }) + if listErr != nil { + glog.Errorf("list meta cache: %v", listErr) + return fuse.EIO + } + } + if !isEarlyTerminated { dh.isFinished = true }