Browse Source

list entries while reading from remote

pull/2685/head
chrislu 3 years ago
parent
commit
49b84b6e2a
  1. 27
      weed/mount/meta_cache/meta_cache_init.go
  2. 2
      weed/mount/weedfs.go
  3. 2
      weed/mount/weedfs_dir_lookup.go
  4. 32
      weed/mount/weedfs_dir_read.go

27
weed/mount/meta_cache/meta_cache_init.go

@ -10,24 +10,32 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath, entryChan chan *filer.Entry) error {
currentPath := dirPath
for { for {
// the directory children are already cached // the directory children are already cached
// so no need for this and upper directories // so no need for this and upper directories
if mc.isCachedFn(dirPath) {
if mc.isCachedFn(currentPath) {
return nil return nil
} }
if err := doEnsureVisited(mc, client, dirPath); err != nil {
return err
if entryChan != nil && dirPath == currentPath {
if err := doEnsureVisited(mc, client, currentPath, entryChan); err != nil {
return err
}
} else {
if err := doEnsureVisited(mc, client, currentPath, nil); err != nil {
return err
}
} }
// continue to parent directory // continue to parent directory
if dirPath != "/" {
parent, _ := dirPath.DirAndName()
dirPath = util.FullPath(parent)
if currentPath != "/" {
parent, _ := currentPath.DirAndName()
currentPath = util.FullPath(parent)
} else { } else {
break break
} }
@ -37,7 +45,7 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
} }
func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error {
func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullPath, entryChan chan *filer.Entry) error {
glog.V(4).Infof("ReadDirAllEntries %s ...", path) glog.V(4).Infof("ReadDirAllEntries %s ...", path)
@ -51,6 +59,9 @@ func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullP
glog.V(0).Infof("read %s: %v", entry.FullPath, err) glog.V(0).Infof("read %s: %v", entry.FullPath, err)
return err return err
} }
if entryChan != nil {
entryChan <- entry
}
return nil return nil
}) })
}) })

2
weed/mount/weedfs.go

@ -140,7 +140,7 @@ func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.St
} }
// read from async meta cache // read from async meta cache
meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir), nil)
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
if cacheErr == filer_pb.ErrNotFound { if cacheErr == filer_pb.ErrNotFound {
return nil, fuse.ENOENT return nil, fuse.ENOENT

2
weed/mount/weedfs_dir_lookup.go

@ -24,7 +24,7 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin
fullFilePath := dirPath.Child(name) fullFilePath := dirPath.Child(name)
visitErr := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath)
visitErr := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath, nil)
if visitErr != nil { if visitErr != nil {
glog.Errorf("dir Lookup %s: %v", dirPath, visitErr) glog.Errorf("dir Lookup %s: %v", dirPath, visitErr)
return fuse.EIO return fuse.EIO

32
weed/mount/weedfs_dir_read.go

@ -179,17 +179,33 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
return true return true
} }
if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil {
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return fuse.EIO
entryChan := make(chan *filer.Entry, 128)
var err error
go func() {
if err = meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath, entryChan); err != nil {
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
}
close(entryChan)
}()
hasData := false
for entry := range entryChan {
hasData = true
processEachEntryFn(entry, false)
} }
listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, dh.lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
return processEachEntryFn(entry, false)
})
if listErr != nil {
glog.Errorf("list meta cache: %v", listErr)
if err != nil {
return fuse.EIO return fuse.EIO
} }
if !hasData {
listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, dh.lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
return processEachEntryFn(entry, false)
})
if listErr != nil {
glog.Errorf("list meta cache: %v", listErr)
return fuse.EIO
}
}
if !isEarlyTerminated { if !isEarlyTerminated {
dh.isFinished = true dh.isFinished = true
} }

Loading…
Cancel
Save