|
|
@ -12,9 +12,22 @@ import ( |
|
|
|
|
|
|
|
type DirectoryHandleId uint64 |
|
|
|
|
|
|
|
const ( |
|
|
|
directoryStreamBaseOffset = 2 // . & ..
|
|
|
|
) |
|
|
|
|
|
|
|
type DirectoryHandle struct { |
|
|
|
isFinished bool |
|
|
|
lastEntryName string |
|
|
|
entryStream []*filer.Entry |
|
|
|
entryStreamOffset uint64 |
|
|
|
} |
|
|
|
|
|
|
|
func (dh *DirectoryHandle) reset() { |
|
|
|
*dh = DirectoryHandle{ |
|
|
|
isFinished: false, |
|
|
|
entryStream: []*filer.Entry{}, |
|
|
|
entryStreamOffset: directoryStreamBaseOffset, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
type DirectoryHandleToInode struct { |
|
|
@ -37,10 +50,8 @@ func (wfs *WFS) AcquireDirectoryHandle() (DirectoryHandleId, *DirectoryHandle) { |
|
|
|
|
|
|
|
wfs.dhmap.Lock() |
|
|
|
defer wfs.dhmap.Unlock() |
|
|
|
dh := &DirectoryHandle{ |
|
|
|
isFinished: false, |
|
|
|
lastEntryName: "", |
|
|
|
} |
|
|
|
dh := new(DirectoryHandle) |
|
|
|
dh.reset() |
|
|
|
wfs.dhmap.dir2inode[DirectoryHandleId(fh)] = dh |
|
|
|
return DirectoryHandleId(fh), dh |
|
|
|
} |
|
|
@ -51,11 +62,8 @@ func (wfs *WFS) GetDirectoryHandle(dhid DirectoryHandleId) *DirectoryHandle { |
|
|
|
if dh, found := wfs.dhmap.dir2inode[dhid]; found { |
|
|
|
return dh |
|
|
|
} |
|
|
|
dh := &DirectoryHandle{ |
|
|
|
isFinished: false, |
|
|
|
lastEntryName: "", |
|
|
|
} |
|
|
|
|
|
|
|
dh := new(DirectoryHandle) |
|
|
|
dh.reset() |
|
|
|
wfs.dhmap.dir2inode[dhid] = dh |
|
|
|
return dh |
|
|
|
} |
|
|
@ -130,13 +138,12 @@ func (wfs *WFS) ReadDirPlus(cancel <-chan struct{}, input *fuse.ReadIn, out *fus |
|
|
|
} |
|
|
|
|
|
|
|
func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPlusMode bool) fuse.Status { |
|
|
|
|
|
|
|
dh := wfs.GetDirectoryHandle(DirectoryHandleId(input.Fh)) |
|
|
|
if input.Offset == 0 { |
|
|
|
dh.isFinished = false |
|
|
|
dh.lastEntryName = "" |
|
|
|
} else { |
|
|
|
if dh.isFinished { |
|
|
|
dh.reset() |
|
|
|
} else if dh.isFinished && input.Offset >= directoryStreamBaseOffset { |
|
|
|
entryCurrentIndex := input.Offset - dh.entryStreamOffset |
|
|
|
if uint64(len(dh.entryStream)) <= entryCurrentIndex { |
|
|
|
return fuse.OK |
|
|
|
} |
|
|
|
} |
|
|
@ -148,29 +155,17 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl |
|
|
|
} |
|
|
|
|
|
|
|
var dirEntry fuse.DirEntry |
|
|
|
if input.Offset == 0 { |
|
|
|
if !isPlusMode { |
|
|
|
out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."}) |
|
|
|
out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."}) |
|
|
|
} else { |
|
|
|
out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."}) |
|
|
|
out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."}) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
processEachEntryFn := func(entry *filer.Entry, isLast bool) bool { |
|
|
|
processEachEntryFn := func(entry *filer.Entry) bool { |
|
|
|
dirEntry.Name = entry.Name() |
|
|
|
dirEntry.Mode = toSyscallMode(entry.Mode) |
|
|
|
if !isPlusMode { |
|
|
|
inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, false) |
|
|
|
inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, isPlusMode) |
|
|
|
dirEntry.Ino = inode |
|
|
|
if !isPlusMode { |
|
|
|
if !out.AddDirEntry(dirEntry) { |
|
|
|
isEarlyTerminated = true |
|
|
|
return false |
|
|
|
} |
|
|
|
} else { |
|
|
|
inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, true) |
|
|
|
dirEntry.Ino = inode |
|
|
|
entryOut := out.AddDirLookupEntry(dirEntry) |
|
|
|
if entryOut == nil { |
|
|
|
isEarlyTerminated = true |
|
|
@ -182,36 +177,59 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl |
|
|
|
} |
|
|
|
wfs.outputFilerEntry(entryOut, inode, entry) |
|
|
|
} |
|
|
|
dh.lastEntryName = entry.Name() |
|
|
|
return true |
|
|
|
} |
|
|
|
|
|
|
|
entryChan := make(chan *filer.Entry, 128) |
|
|
|
var err error |
|
|
|
go func() { |
|
|
|
if err = meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath, entryChan); err != nil { |
|
|
|
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) |
|
|
|
if input.Offset < directoryStreamBaseOffset { |
|
|
|
if !isPlusMode { |
|
|
|
if input.Offset == 0 { |
|
|
|
out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."}) |
|
|
|
} |
|
|
|
close(entryChan) |
|
|
|
}() |
|
|
|
hasData := false |
|
|
|
for entry := range entryChan { |
|
|
|
hasData = true |
|
|
|
processEachEntryFn(entry, false) |
|
|
|
out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."}) |
|
|
|
} else { |
|
|
|
if input.Offset == 0 { |
|
|
|
out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."}) |
|
|
|
} |
|
|
|
if err != nil { |
|
|
|
return fuse.EIO |
|
|
|
out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."}) |
|
|
|
} |
|
|
|
input.Offset = directoryStreamBaseOffset |
|
|
|
} |
|
|
|
|
|
|
|
if !hasData { |
|
|
|
listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, dh.lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool { |
|
|
|
return processEachEntryFn(entry, false) |
|
|
|
var lastEntryName string |
|
|
|
if input.Offset >= directoryStreamBaseOffset { |
|
|
|
if input.Offset > directoryStreamBaseOffset { |
|
|
|
entryPreviousIndex := (input.Offset - dh.entryStreamOffset) - 1 |
|
|
|
if uint64(len(dh.entryStream)) > entryPreviousIndex { |
|
|
|
lastEntryName = dh.entryStream[entryPreviousIndex].Name() |
|
|
|
dh.entryStream = dh.entryStream[entryPreviousIndex:] |
|
|
|
dh.entryStreamOffset = input.Offset - 1 |
|
|
|
} |
|
|
|
} |
|
|
|
entryCurrentIndex := input.Offset - dh.entryStreamOffset |
|
|
|
for uint64(len(dh.entryStream)) > entryCurrentIndex { |
|
|
|
entry := dh.entryStream[entryCurrentIndex] |
|
|
|
if processEachEntryFn(entry) { |
|
|
|
lastEntryName = entry.Name() |
|
|
|
entryCurrentIndex++ |
|
|
|
} else { |
|
|
|
// early terminated
|
|
|
|
return fuse.OK |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
var err error |
|
|
|
if err = meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath, nil); err != nil { |
|
|
|
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) |
|
|
|
} |
|
|
|
listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool { |
|
|
|
dh.entryStream = append(dh.entryStream, entry) |
|
|
|
return processEachEntryFn(entry) |
|
|
|
}) |
|
|
|
if listErr != nil { |
|
|
|
glog.Errorf("list meta cache: %v", listErr) |
|
|
|
return fuse.EIO |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if !isEarlyTerminated { |
|
|
|
dh.isFinished = true |
|
|
|