Browse Source

Merge pull request #645 from chrislusf/vasto_filer

merge back filer related refactoring
pull/655/head
Chris Lu 7 years ago
committed by GitHub
parent
commit
4d5e1e5947
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      weed/command/mount_std.go
  2. 19
      weed/filer/client_operations.go
  3. 8
      weed/filer/embedded_filer/directory.go
  4. 22
      weed/filer/embedded_filer/directory_in_map.go
  5. 8
      weed/filer/embedded_filer/directory_test.go
  6. 34
      weed/filer/embedded_filer/filer_embedded.go
  7. 10
      weed/filer/embedded_filer/files_in_leveldb.go
  8. 11
      weed/filer/filer.go
  9. 10
      weed/filer/flat_namespace/flat_namespace_filer.go
  10. 14
      weed/filer/postgres_store/postgres_native.go
  11. 21
      weed/filer/postgres_store/postgres_store.go
  12. 45
      weed/filer/vasto_store/design.txt
  13. 9
      weed/server/filer_server_handlers_api.go
  14. 6
      weed/server/filer_ui/templates.go

27
weed/command/mount_std.go

@ -59,27 +59,27 @@ func (WFS) Root() (fs.Node, error) {
var fileIdMap = make(map[uint64]filer.FileId) var fileIdMap = make(map[uint64]filer.FileId)
type Dir struct { type Dir struct {
Id uint64
Path string Path string
DirentMap map[string]*fuse.Dirent DirentMap map[string]*fuse.Dirent
} }
func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error { func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error {
attr.Inode = dir.Id
attr.Mode = os.ModeDir | 0555 attr.Mode = os.ModeDir | 0555
return nil return nil
} }
func (dir *Dir) Lookup(ctx context.Context, name string) (fs.Node, error) { func (dir *Dir) Lookup(ctx context.Context, name string) (fs.Node, error) {
if dirent, ok := dir.DirentMap[name]; ok {
if dirent.Type == fuse.DT_File {
return &File{Id: dirent.Inode, FileId: fileIdMap[dirent.Inode], Name: dirent.Name}, nil
if entry, err := filer.LookupDirectoryEntry(*mountOptions.filer, dir.Path, name); err == nil {
if !entry.Found {
return nil, fuse.ENOENT
}
if entry.FileId != "" {
return &File{FileId: filer.FileId(entry.FileId), Name: name}, nil
} else {
return &Dir{Path: path.Join(dir.Path, name)}, nil
} }
return &Dir{
Id: dirent.Inode,
Path: path.Join(dir.Path, dirent.Name),
}, nil
} }
return nil, fuse.ENOENT return nil, fuse.ENOENT
} }
@ -90,17 +90,16 @@ func (dir *Dir) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
} }
if dirs, e := filer.ListDirectories(*mountOptions.filer, dir.Path); e == nil { if dirs, e := filer.ListDirectories(*mountOptions.filer, dir.Path); e == nil {
for _, d := range dirs.Directories { for _, d := range dirs.Directories {
dirId := uint64(d.Id)
dirent := fuse.Dirent{Inode: dirId, Name: d.Name, Type: fuse.DT_Dir}
dirent := fuse.Dirent{Name: string(d), Type: fuse.DT_Dir}
ret = append(ret, dirent) ret = append(ret, dirent)
dir.DirentMap[d.Name] = &dirent
dir.DirentMap[string(d)] = &dirent
} }
} }
if files, e := filer.ListFiles(*mountOptions.filer, dir.Path, ""); e == nil { if files, e := filer.ListFiles(*mountOptions.filer, dir.Path, ""); e == nil {
for _, f := range files.Files { for _, f := range files.Files {
if fileId, e := storage.ParseFileId(string(f.Id)); e == nil { if fileId, e := storage.ParseFileId(string(f.Id)); e == nil {
fileInode := uint64(fileId.VolumeId)<<48 + fileId.Key fileInode := uint64(fileId.VolumeId)<<48 + fileId.Key
dirent := fuse.Dirent{Inode: fileInode, Name: f.Name, Type: fuse.DT_File}
dirent := fuse.Dirent{Name: f.Name, Type: fuse.DT_File}
ret = append(ret, dirent) ret = append(ret, dirent)
dir.DirentMap[f.Name] = &dirent dir.DirentMap[f.Name] = &dirent
fileIdMap[fileInode] = f.Id fileIdMap[fileInode] = f.Id
@ -120,13 +119,11 @@ func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
} }
type File struct { type File struct {
Id uint64
FileId filer.FileId FileId filer.FileId
Name string Name string
} }
func (file *File) Attr(context context.Context, attr *fuse.Attr) error { func (file *File) Attr(context context.Context, attr *fuse.Attr) error {
attr.Inode = file.Id
attr.Mode = 0444 attr.Mode = 0444
ret, err := filer.GetFileSize(*mountOptions.filer, string(file.FileId)) ret, err := filer.GetFileSize(*mountOptions.filer, string(file.FileId))
if err == nil { if err == nil {

19
weed/filer/client_operations.go

@ -65,7 +65,7 @@ func GetFileContent(server string, fileId string) (ret *GetFileContentResult, er
} }
type ListDirectoriesResult struct { type ListDirectoriesResult struct {
Directories []DirectoryEntry
Directories []DirectoryName
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
@ -80,6 +80,23 @@ func ListDirectories(server string, directory string) (ret *ListDirectoriesResul
return nil, err return nil, err
} }
type LookupDirectoryEntryResult struct {
Found bool
FileId string
Error string `json:"error,omitempty"`
}
func LookupDirectoryEntry(server string, directory string, name string) (ret *LookupDirectoryEntryResult, err error) {
ret = new(LookupDirectoryEntryResult)
if err := call(server, ApiRequest{Command: "lookupDirectoryEntry", Directory: directory, FileName: name}, ret); err == nil {
if ret.Error != "" {
return nil, errors.New(ret.Error)
}
return ret, nil
}
return nil, err
}
func DeleteDirectoryOrFile(server string, path string, isDir bool) error { func DeleteDirectoryOrFile(server string, path string, isDir bool) error {
destUrl := fmt.Sprintf("http://%s%s", server, path) destUrl := fmt.Sprintf("http://%s%s", server, path)
if isDir { if isDir {

8
weed/filer/embedded_filer/directory.go

@ -5,11 +5,11 @@ import (
) )
type DirectoryManager interface { type DirectoryManager interface {
FindDirectory(dirPath string) (filer.DirectoryId, error)
ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error)
MakeDirectory(currentDirPath string, dirName string) (filer.DirectoryId, error)
FindDirectory(dirPath string) (DirectoryId, error)
ListDirectories(dirPath string) (dirs []filer.DirectoryName, err error)
MakeDirectory(currentDirPath string, dirName string) (DirectoryId, error)
MoveUnderDirectory(oldDirPath string, newParentDirPath string) error MoveUnderDirectory(oldDirPath string, newParentDirPath string) error
DeleteDirectory(dirPath string) error DeleteDirectory(dirPath string) error
//functions used by FUSE //functions used by FUSE
FindDirectoryById(filer.DirectoryId, error)
FindDirectoryById(DirectoryId, error)
} }

22
weed/filer/embedded_filer/directory_in_map.go

@ -16,12 +16,14 @@ import (
var writeLock sync.Mutex //serialize changes to dir.log var writeLock sync.Mutex //serialize changes to dir.log
type DirectoryId int32
type DirectoryEntryInMap struct { type DirectoryEntryInMap struct {
sync.Mutex sync.Mutex
Name string Name string
Parent *DirectoryEntryInMap Parent *DirectoryEntryInMap
subDirectories map[string]*DirectoryEntryInMap subDirectories map[string]*DirectoryEntryInMap
Id filer.DirectoryId
Id DirectoryId
} }
func (de *DirectoryEntryInMap) getChild(dirName string) (*DirectoryEntryInMap, bool) { func (de *DirectoryEntryInMap) getChild(dirName string) (*DirectoryEntryInMap, bool) {
@ -45,18 +47,18 @@ func (de *DirectoryEntryInMap) hasChildren() bool {
defer de.Unlock() defer de.Unlock()
return len(de.subDirectories) > 0 return len(de.subDirectories) > 0
} }
func (de *DirectoryEntryInMap) children() (dirNames []filer.DirectoryEntry) {
func (de *DirectoryEntryInMap) children() (dirNames []filer.DirectoryName) {
de.Lock() de.Lock()
defer de.Unlock() defer de.Unlock()
for k, v := range de.subDirectories {
dirNames = append(dirNames, filer.DirectoryEntry{Name: k, Id: v.Id})
for k, _ := range de.subDirectories {
dirNames = append(dirNames, filer.DirectoryName(k))
} }
return dirNames return dirNames
} }
type DirectoryManagerInMap struct { type DirectoryManagerInMap struct {
Root *DirectoryEntryInMap Root *DirectoryEntryInMap
max filer.DirectoryId
max DirectoryId
logFile *os.File logFile *os.File
isLoading bool isLoading bool
} }
@ -113,7 +115,7 @@ func (dm *DirectoryManagerInMap) processEachLine(line string) error {
if pe != nil { if pe != nil {
return pe return pe
} }
if e := dm.loadDirectory(parts[1], filer.DirectoryId(v)); e != nil {
if e := dm.loadDirectory(parts[1], DirectoryId(v)); e != nil {
return e return e
} }
case "mov": case "mov":
@ -172,7 +174,7 @@ func (dm *DirectoryManagerInMap) findDirectory(dirPath string) (*DirectoryEntryI
} }
return dir, nil return dir, nil
} }
func (dm *DirectoryManagerInMap) FindDirectory(dirPath string) (filer.DirectoryId, error) {
func (dm *DirectoryManagerInMap) findDirectoryId(dirPath string) (DirectoryId, error) {
d, e := dm.findDirectory(dirPath) d, e := dm.findDirectory(dirPath)
if e == nil { if e == nil {
return d.Id, nil return d.Id, nil
@ -180,7 +182,7 @@ func (dm *DirectoryManagerInMap) FindDirectory(dirPath string) (filer.DirectoryI
return dm.Root.Id, e return dm.Root.Id, e
} }
func (dm *DirectoryManagerInMap) loadDirectory(dirPath string, dirId filer.DirectoryId) error {
func (dm *DirectoryManagerInMap) loadDirectory(dirPath string, dirId DirectoryId) error {
dirPath = CleanFilePath(dirPath) dirPath = CleanFilePath(dirPath)
if dirPath == "/" { if dirPath == "/" {
return nil return nil
@ -248,7 +250,7 @@ func (dm *DirectoryManagerInMap) makeDirectory(dirPath string) (dir *DirectoryEn
return dir, created return dir, created
} }
func (dm *DirectoryManagerInMap) MakeDirectory(dirPath string) (filer.DirectoryId, error) {
func (dm *DirectoryManagerInMap) MakeDirectory(dirPath string) (DirectoryId, error) {
dir, _ := dm.makeDirectory(dirPath) dir, _ := dm.makeDirectory(dirPath)
return dir.Id, nil return dir.Id, nil
} }
@ -275,7 +277,7 @@ func (dm *DirectoryManagerInMap) MoveUnderDirectory(oldDirPath string, newParent
return nil return nil
} }
func (dm *DirectoryManagerInMap) ListDirectories(dirPath string) (dirNames []filer.DirectoryEntry, err error) {
func (dm *DirectoryManagerInMap) ListDirectories(dirPath string) (dirNames []filer.DirectoryName, err error) {
d, e := dm.findDirectory(dirPath) d, e := dm.findDirectory(dirPath)
if e != nil { if e != nil {
return dirNames, e return dirNames, e

8
weed/filer/embedded_filer/directory_test.go

@ -19,17 +19,17 @@ func TestDirectory(t *testing.T) {
dm.MakeDirectory("/a/b/e/f") dm.MakeDirectory("/a/b/e/f")
dm.MakeDirectory("/a/b/e/f/g") dm.MakeDirectory("/a/b/e/f/g")
dm.MoveUnderDirectory("/a/b/e/f/g", "/a/b", "t") dm.MoveUnderDirectory("/a/b/e/f/g", "/a/b", "t")
if _, err := dm.FindDirectory("/a/b/e/f/g"); err == nil {
if _, err := dm.findDirectoryId("/a/b/e/f/g"); err == nil {
t.Fatal("/a/b/e/f/g should not exist any more after moving") t.Fatal("/a/b/e/f/g should not exist any more after moving")
} }
if _, err := dm.FindDirectory("/a/b/t"); err != nil {
if _, err := dm.findDirectoryId("/a/b/t"); err != nil {
t.Fatal("/a/b/t should exist after moving") t.Fatal("/a/b/t should exist after moving")
} }
if _, err := dm.FindDirectory("/a/b/g"); err == nil {
if _, err := dm.findDirectoryId("/a/b/g"); err == nil {
t.Fatal("/a/b/g should not exist after moving") t.Fatal("/a/b/g should not exist after moving")
} }
dm.MoveUnderDirectory("/a/b/e/f", "/a/b", "") dm.MoveUnderDirectory("/a/b/e/f", "/a/b", "")
if _, err := dm.FindDirectory("/a/b/f"); err != nil {
if _, err := dm.findDirectoryId("/a/b/f"); err != nil {
t.Fatal("/a/b/g should not exist after moving") t.Fatal("/a/b/g should not exist after moving")
} }
dm.MakeDirectory("/a/b/g/h/i") dm.MakeDirectory("/a/b/g/h/i")

34
weed/filer/embedded_filer/filer_embedded.go

@ -45,27 +45,37 @@ func (filer *FilerEmbedded) CreateFile(filePath string, fid string) (err error)
} }
func (filer *FilerEmbedded) FindFile(filePath string) (fid string, err error) { func (filer *FilerEmbedded) FindFile(filePath string) (fid string, err error) {
dir, file := filepath.Split(filePath) dir, file := filepath.Split(filePath)
dirId, e := filer.directories.FindDirectory(dir)
return filer.findFileEntry(dir, file)
}
func (filer *FilerEmbedded) findFileEntry(parentPath string, fileName string) (fid string, err error) {
dirId, e := filer.directories.findDirectoryId(parentPath)
if e != nil { if e != nil {
return "", e return "", e
} }
return filer.files.FindFile(dirId, file)
return filer.files.FindFile(dirId, fileName)
} }
func (filer *FilerEmbedded) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) {
return filer.directories.FindDirectory(dirPath)
func (filer *FilerEmbedded) LookupDirectoryEntry(dirPath string, name string) (found bool, fileId string, err error) {
if _, err = filer.directories.findDirectory(filepath.Join(dirPath, name)); err == nil {
return true, "", nil
}
if fileId, err = filer.findFileEntry(dirPath, name); err == nil {
return true, fileId, nil
}
return false, "", err
} }
func (filer *FilerEmbedded) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) {
func (filer *FilerEmbedded) ListDirectories(dirPath string) (dirs []filer.DirectoryName, err error) {
return filer.directories.ListDirectories(dirPath) return filer.directories.ListDirectories(dirPath)
} }
func (filer *FilerEmbedded) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { func (filer *FilerEmbedded) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
dirId, e := filer.directories.FindDirectory(dirPath)
dirId, e := filer.directories.findDirectoryId(dirPath)
if e != nil { if e != nil {
return nil, e return nil, e
} }
return filer.files.ListFiles(dirId, lastFileName, limit), nil return filer.files.ListFiles(dirId, lastFileName, limit), nil
} }
func (filer *FilerEmbedded) DeleteDirectory(dirPath string, recursive bool) (err error) { func (filer *FilerEmbedded) DeleteDirectory(dirPath string, recursive bool) (err error) {
dirId, e := filer.directories.FindDirectory(dirPath)
dirId, e := filer.directories.findDirectoryId(dirPath)
if e != nil { if e != nil {
return e return e
} }
@ -74,7 +84,7 @@ func (filer *FilerEmbedded) DeleteDirectory(dirPath string, recursive bool) (err
return fmt.Errorf("Fail to delete directory %s: %d sub directories found!", dirPath, len(sub_dirs)) return fmt.Errorf("Fail to delete directory %s: %d sub directories found!", dirPath, len(sub_dirs))
} }
for _, sub := range sub_dirs { for _, sub := range sub_dirs {
if delete_sub_err := filer.DeleteDirectory(filepath.Join(dirPath, sub.Name), recursive); delete_sub_err != nil {
if delete_sub_err := filer.DeleteDirectory(filepath.Join(dirPath, string(sub)), recursive); delete_sub_err != nil {
return delete_sub_err return delete_sub_err
} }
} }
@ -108,7 +118,7 @@ func (filer *FilerEmbedded) DeleteDirectory(dirPath string, recursive bool) (err
func (filer *FilerEmbedded) DeleteFile(filePath string) (fid string, err error) { func (filer *FilerEmbedded) DeleteFile(filePath string) (fid string, err error) {
dir, file := filepath.Split(filePath) dir, file := filepath.Split(filePath)
dirId, e := filer.directories.FindDirectory(dir)
dirId, e := filer.directories.findDirectoryId(dir)
if e != nil { if e != nil {
return "", e return "", e
} }
@ -126,8 +136,8 @@ func (filer *FilerEmbedded) Move(fromPath string, toPath string) error {
filer.mvMutex.Lock() filer.mvMutex.Lock()
defer filer.mvMutex.Unlock() defer filer.mvMutex.Unlock()
if _, dir_err := filer.FindDirectory(fromPath); dir_err == nil {
if _, err := filer.FindDirectory(toPath); err == nil {
if _, dir_err := filer.directories.findDirectoryId(fromPath); dir_err == nil {
if _, err := filer.directories.findDirectoryId(toPath); err == nil {
// move folder under an existing folder // move folder under an existing folder
return filer.directories.MoveUnderDirectory(fromPath, toPath, "") return filer.directories.MoveUnderDirectory(fromPath, toPath, "")
} }
@ -135,7 +145,7 @@ func (filer *FilerEmbedded) Move(fromPath string, toPath string) error {
return filer.directories.MoveUnderDirectory(fromPath, filepath.Dir(toPath), filepath.Base(toPath)) return filer.directories.MoveUnderDirectory(fromPath, filepath.Dir(toPath), filepath.Base(toPath))
} }
if fid, file_err := filer.DeleteFile(fromPath); file_err == nil { if fid, file_err := filer.DeleteFile(fromPath); file_err == nil {
if _, err := filer.FindDirectory(toPath); err == nil {
if _, err := filer.directories.findDirectoryId(toPath); err == nil {
// move file under an existing folder // move file under an existing folder
return filer.CreateFile(filepath.Join(toPath, filepath.Base(fromPath)), fid) return filer.CreateFile(filepath.Join(toPath, filepath.Base(fromPath)), fid)
} }

10
weed/filer/embedded_filer/files_in_leveldb.go

@ -28,7 +28,7 @@ func NewFileListInLevelDb(dir string) (fl *FileListInLevelDb, err error) {
return return
} }
func genKey(dirId filer.DirectoryId, fileName string) []byte {
func genKey(dirId DirectoryId, fileName string) []byte {
ret := make([]byte, 0, 4+len(fileName)) ret := make([]byte, 0, 4+len(fileName))
for i := 3; i >= 0; i-- { for i := 3; i >= 0; i-- {
ret = append(ret, byte(dirId>>(uint(i)*8))) ret = append(ret, byte(dirId>>(uint(i)*8)))
@ -37,11 +37,11 @@ func genKey(dirId filer.DirectoryId, fileName string) []byte {
return ret return ret
} }
func (fl *FileListInLevelDb) CreateFile(dirId filer.DirectoryId, fileName string, fid string) (err error) {
func (fl *FileListInLevelDb) CreateFile(dirId DirectoryId, fileName string, fid string) (err error) {
glog.V(4).Infoln("directory", dirId, "fileName", fileName, "fid", fid) glog.V(4).Infoln("directory", dirId, "fileName", fileName, "fid", fid)
return fl.db.Put(genKey(dirId, fileName), []byte(fid), nil) return fl.db.Put(genKey(dirId, fileName), []byte(fid), nil)
} }
func (fl *FileListInLevelDb) DeleteFile(dirId filer.DirectoryId, fileName string) (fid string, err error) {
func (fl *FileListInLevelDb) DeleteFile(dirId DirectoryId, fileName string) (fid string, err error) {
if fid, err = fl.FindFile(dirId, fileName); err != nil { if fid, err = fl.FindFile(dirId, fileName); err != nil {
if err == leveldb.ErrNotFound { if err == leveldb.ErrNotFound {
return "", nil return "", nil
@ -51,7 +51,7 @@ func (fl *FileListInLevelDb) DeleteFile(dirId filer.DirectoryId, fileName string
err = fl.db.Delete(genKey(dirId, fileName), nil) err = fl.db.Delete(genKey(dirId, fileName), nil)
return fid, err return fid, err
} }
func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string) (fid string, err error) {
func (fl *FileListInLevelDb) FindFile(dirId DirectoryId, fileName string) (fid string, err error) {
data, e := fl.db.Get(genKey(dirId, fileName), nil) data, e := fl.db.Get(genKey(dirId, fileName), nil)
if e == leveldb.ErrNotFound { if e == leveldb.ErrNotFound {
return "", filer.ErrNotFound return "", filer.ErrNotFound
@ -60,7 +60,7 @@ func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string)
} }
return string(data), nil return string(data), nil
} }
func (fl *FileListInLevelDb) ListFiles(dirId filer.DirectoryId, lastFileName string, limit int) (files []filer.FileEntry) {
func (fl *FileListInLevelDb) ListFiles(dirId DirectoryId, lastFileName string, limit int) (files []filer.FileEntry) {
glog.V(4).Infoln("directory", dirId, "lastFileName", lastFileName, "limit", limit) glog.V(4).Infoln("directory", dirId, "lastFileName", lastFileName, "limit", limit)
dirKey := genKey(dirId, "") dirKey := genKey(dirId, "")
iter := fl.db.NewIterator(&util.Range{Start: genKey(dirId, lastFileName)}, nil) iter := fl.db.NewIterator(&util.Range{Start: genKey(dirId, lastFileName)}, nil)

11
weed/filer/filer.go

@ -11,12 +11,7 @@ type FileEntry struct {
Id FileId `json:"fid,omitempty"` Id FileId `json:"fid,omitempty"`
} }
type DirectoryId int32
type DirectoryEntry struct {
Name string //dir name without path
Id DirectoryId
}
type DirectoryName string
type Filer interface { type Filer interface {
CreateFile(fullFileName string, fid string) (err error) CreateFile(fullFileName string, fid string) (err error)
@ -24,11 +19,11 @@ type Filer interface {
DeleteFile(fullFileName string) (fid string, err error) DeleteFile(fullFileName string) (fid string, err error)
//Optional functions. embedded filer support these //Optional functions. embedded filer support these
FindDirectory(dirPath string) (dirId DirectoryId, err error)
ListDirectories(dirPath string) (dirs []DirectoryEntry, err error)
ListDirectories(dirPath string) (dirs []DirectoryName, err error)
ListFiles(dirPath string, lastFileName string, limit int) (files []FileEntry, err error) ListFiles(dirPath string, lastFileName string, limit int) (files []FileEntry, err error)
DeleteDirectory(dirPath string, recursive bool) (err error) DeleteDirectory(dirPath string, recursive bool) (err error)
Move(fromPath string, toPath string) (err error) Move(fromPath string, toPath string) (err error)
LookupDirectoryEntry(dirPath string, name string) (found bool, fileId string, err error)
} }
var ErrNotFound = errors.New("filer: no entry is found in filer store") var ErrNotFound = errors.New("filer: no entry is found in filer store")

10
weed/filer/flat_namespace/flat_namespace_filer.go

@ -4,6 +4,7 @@ import (
"errors" "errors"
"github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer"
"path/filepath"
) )
type FlatNamespaceFiler struct { type FlatNamespaceFiler struct {
@ -28,10 +29,13 @@ func (filer *FlatNamespaceFiler) CreateFile(fullFileName string, fid string) (er
func (filer *FlatNamespaceFiler) FindFile(fullFileName string) (fid string, err error) { func (filer *FlatNamespaceFiler) FindFile(fullFileName string) (fid string, err error) {
return filer.store.Get(fullFileName) return filer.store.Get(fullFileName)
} }
func (filer *FlatNamespaceFiler) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) {
return 0, ErrNotImplemented
func (filer *FlatNamespaceFiler) LookupDirectoryEntry(dirPath string, name string) (found bool, fileId string, err error) {
if fileId, err = filer.FindFile(filepath.Join(dirPath, name)); err == nil {
return true, fileId, nil
}
return false, "", err
} }
func (filer *FlatNamespaceFiler) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) {
func (filer *FlatNamespaceFiler) ListDirectories(dirPath string) (dirs []filer.DirectoryName, err error) {
return nil, ErrNotImplemented return nil, ErrNotImplemented
} }
func (filer *FlatNamespaceFiler) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { func (filer *FlatNamespaceFiler) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {

14
weed/filer/postgres_store/postgres_native.go

@ -14,6 +14,8 @@ import (
"strings" "strings"
) )
type DirectoryId int32
func databaseExists(db *sql.DB, databaseName string) (bool, error) { func databaseExists(db *sql.DB, databaseName string) (bool, error) {
sqlStatement := "SELECT datname from pg_database WHERE datname='%s'" sqlStatement := "SELECT datname from pg_database WHERE datname='%s'"
row := db.QueryRow(fmt.Sprintf(sqlStatement, databaseName)) row := db.QueryRow(fmt.Sprintf(sqlStatement, databaseName))
@ -293,13 +295,13 @@ func (s *PostgresStore) delete(uriPath string) error {
return nil return nil
} }
func (s *PostgresStore) lookupDirectory(dirPath string) (filer.DirectoryId, string, error) {
func (s *PostgresStore) lookupDirectory(dirPath string) (DirectoryId, string, error) {
directoryRoot, directoryName := s.mySplitPath(dirPath) directoryRoot, directoryName := s.mySplitPath(dirPath)
sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName) sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName)
row := s.db.QueryRow(sqlStatement, directoryRoot, directoryName) row := s.db.QueryRow(sqlStatement, directoryRoot, directoryName)
var id filer.DirectoryId
var id DirectoryId
var dirRoot string var dirRoot string
var dirName string var dirName string
err := row.Scan(&id, &dirRoot, &dirName) err := row.Scan(&id, &dirRoot, &dirName)
@ -312,7 +314,7 @@ func (s *PostgresStore) lookupDirectory(dirPath string) (filer.DirectoryId, stri
return id, filepath.Join(dirRoot, dirName), err return id, filepath.Join(dirRoot, dirName), err
} }
func (s *PostgresStore) findDirectories(dirPath string, limit int) (dirs []filer.DirectoryEntry, err error) {
func (s *PostgresStore) findDirectories(dirPath string, limit int) (dirs []filer.DirectoryName, err error) {
sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName != '' ORDER BY id LIMIT $2", directoriesTableName) sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName != '' ORDER BY id LIMIT $2", directoriesTableName)
rows, err := s.db.Query(sqlStatement, dirPath, limit) rows, err := s.db.Query(sqlStatement, dirPath, limit)
@ -323,7 +325,7 @@ func (s *PostgresStore) findDirectories(dirPath string, limit int) (dirs []filer
if rows != nil { if rows != nil {
defer rows.Close() defer rows.Close()
for rows.Next() { for rows.Next() {
var id filer.DirectoryId
var id DirectoryId
var directoryRoot string var directoryRoot string
var directoryName string var directoryName string
@ -331,7 +333,7 @@ func (s *PostgresStore) findDirectories(dirPath string, limit int) (dirs []filer
if scanErr != nil { if scanErr != nil {
err = scanErr err = scanErr
} }
dirs = append(dirs, filer.DirectoryEntry{Name: (directoryName), Id: id})
dirs = append(dirs, filer.DirectoryName(directoryName))
} }
} }
return return
@ -344,7 +346,7 @@ func (s *PostgresStore) safeToDeleteDirectory(dirPath string, recursive bool) bo
sqlStatement := fmt.Sprintf("SELECT id FROM %s WHERE directoryRoot LIKE $1 LIMIT 1", directoriesTableName) sqlStatement := fmt.Sprintf("SELECT id FROM %s WHERE directoryRoot LIKE $1 LIMIT 1", directoriesTableName)
row := s.db.QueryRow(sqlStatement, dirPath+"%") row := s.db.QueryRow(sqlStatement, dirPath+"%")
var id filer.DirectoryId
var id DirectoryId
err := row.Scan(&id) err := row.Scan(&id)
if err != nil { if err != nil {
if err == sql.ErrNoRows { if err == sql.ErrNoRows {

21
weed/filer/postgres_store/postgres_store.go

@ -10,6 +10,7 @@ import (
_ "github.com/lib/pq" _ "github.com/lib/pq"
_ "path/filepath" _ "path/filepath"
"path/filepath"
) )
const ( const (
@ -74,6 +75,17 @@ func (s *PostgresStore) FindFile(fullFilePath string) (fid string, err error) {
return fid, err return fid, err
} }
func (s *PostgresStore) LookupDirectoryEntry(dirPath string, name string) (found bool, fileId string, err error) {
fullPath := filepath.Join(dirPath, name)
if fileId, err = s.FindFile(fullPath); err == nil {
return true, fileId, nil
}
if _, _, err := s.lookupDirectory(fullPath); err == nil {
return true, "", err
}
return false, "", err
}
func (s *PostgresStore) DeleteFile(fullFilePath string) (fid string, err error) { func (s *PostgresStore) DeleteFile(fullFilePath string) (fid string, err error) {
if err != nil { if err != nil {
return "", fmt.Errorf("PostgresStore Delete operation can not parse file path %s: err is %v", fullFilePath, err) return "", fmt.Errorf("PostgresStore Delete operation can not parse file path %s: err is %v", fullFilePath, err)
@ -90,14 +102,9 @@ func (s *PostgresStore) DeleteFile(fullFilePath string) (fid string, err error)
} }
} }
func (s *PostgresStore) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) {
dirId, _, err = s.lookupDirectory(dirPath)
return dirId, err
}
func (s *PostgresStore) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) {
func (s *PostgresStore) ListDirectories(dirPath string) (dirs []filer.DirectoryName, err error) {
dirs, err = s.findDirectories(dirPath, 20)
dirs, err = s.findDirectories(dirPath, 1000)
glog.V(3).Infof("Postgres ListDirs = found %d directories under %s", len(dirs), dirPath) glog.V(3).Infof("Postgres ListDirs = found %d directories under %s", len(dirs), dirPath)

45
weed/filer/vasto_store/design.txt

@ -0,0 +1,45 @@
There are two main components of a filer: directories and files.
My previous approach was to use some sequance number to generate directoryId.
However, this is not scalable. The id generation itself is a bottleneck.
It needs careful locking and deduplication checking to get a directoryId.
In a second design, each directory is deterministically mapped to UUID version 3,
which uses MD5 to map a tuple of <uuid, name> to a version 3 UUID.
However, this UUID3 approach is logically the same as storing the full path.
Storing the full path is the simplest design.
separator is a special byte, 0x00.
When writing a file:
<file parent full path, separator, file name> => fildId, file properties
For folders:
The filer breaks the directory path into folders.
for each folder:
if it is not in cache:
check whether the folder is created in the KVS, if not:
set <folder parent full path, separator, folder name> => directory properties
if no permission for the folder:
break
The filer caches the most recently used folder permissions with a TTL.
So any folder permission change needs to wait TTL interval to take effect.
When listing the directory:
prefix scan of using (the folder full path + separator) as the prefix
The downside:
1. Rename a folder will need to recursively process all sub folders and files.
2. Move a folder will need to recursively process all sub folders and files.
So these operations are not allowed if the folder is not empty.
Allowing:
1. Rename a file
2. Move a file to a different folder
3. Delete an empty folder

9
weed/server/filer_server_handlers_api.go

@ -21,6 +21,13 @@ func (fs *FilerServer) apiHandler(w http.ResponseWriter, r *http.Request) {
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
} }
switch apiRequest.Command { switch apiRequest.Command {
case "lookupDirectoryEntry":
res := filer.LookupDirectoryEntryResult{}
res.Found, res.FileId, err = fs.filer.LookupDirectoryEntry(apiRequest.Directory, apiRequest.FileName)
if err != nil {
res.Error = err.Error()
}
writeJsonQuiet(w, r, http.StatusOK, res)
case "listDirectories": case "listDirectories":
res := filer.ListDirectoriesResult{} res := filer.ListDirectoriesResult{}
res.Directories, err = fs.filer.ListDirectories(apiRequest.Directory) res.Directories, err = fs.filer.ListDirectories(apiRequest.Directory)
@ -30,7 +37,7 @@ func (fs *FilerServer) apiHandler(w http.ResponseWriter, r *http.Request) {
writeJsonQuiet(w, r, http.StatusOK, res) writeJsonQuiet(w, r, http.StatusOK, res)
case "listFiles": case "listFiles":
res := filer.ListFilesResult{} res := filer.ListFilesResult{}
res.Files, err = fs.filer.ListFiles(apiRequest.Directory, apiRequest.FileName, 100)
res.Files, err = fs.filer.ListFiles(apiRequest.Directory, apiRequest.FileName, 1000)
if err != nil { if err != nil {
res.Error = err.Error() res.Error = err.Error()
} }

6
weed/server/filer_ui/templates.go

@ -28,15 +28,17 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
{{$path := .Path }} {{$path := .Path }}
{{ range $dirs_index, $dir := .Directories }} {{ range $dirs_index, $dir := .Directories }}
<li> <li>
<a href= {{ print $path $dir.Name "/"}} >
{{ $dir.Name }}
<a href={{ print $path $dir "/"}} >
{{ $dir }}
</a> </a>
</li> </li>
{{ end }} {{ end }}
{{ range $file_index, $file := .Files }} {{ range $file_index, $file := .Files }}
<li> <li>
<a href={{ print $path $file.Name}} >
{{ $file.Name }} {{ $file.Name }}
</a>
</li> </li>
{{ end }} {{ end }}
</ul> </ul>

Loading…
Cancel
Save