From 038a025c32f404b52714f6a324492f69ac699866 Mon Sep 17 00:00:00 2001 From: zhaozhi Date: Fri, 19 Feb 2016 15:12:00 +0800 Subject: [PATCH] redis store now support directory manipulations --- go/filer/redis_store/directory_manager.go | 30 +++++- go/filer/redis_store/redis_store.go | 111 ++++++++++++---------- 2 files changed, 88 insertions(+), 53 deletions(-) diff --git a/go/filer/redis_store/directory_manager.go b/go/filer/redis_store/directory_manager.go index cfbbd1970..c4fa53826 100644 --- a/go/filer/redis_store/directory_manager.go +++ b/go/filer/redis_store/directory_manager.go @@ -155,6 +155,8 @@ moving is finished by following steps: 2.delete old dir from its parent's hash table 3.add old dir's directory id into new parent dir's hash table, if newName is not empty, use it 4.if the full old dir path key exists(its a non-empty directory), rename it to full new dir path key + +after moving is done, it returns 0, -1 indicates that source dir is not found, -2 indicates that destination dir is not found */ func (dm *DirectoryManager) MoveUnderDirectory(oldDirPath string, newParentDirPath string, newName string) error { oldDirPath = embedded_filer.CleanFilePath(oldDirPath) @@ -250,6 +252,12 @@ func (dm *DirectoryManager) ListDirectories(dirPath string) (dirNames []filer.Di return dirNames, nil } +//get the amount of directories under a directory +func (dm *DirectoryManager) GetSubDirectoriesNum(dirPath string) (int64, error) { + dirPath = embedded_filer.CleanFilePath(dirPath) + return dm.getSubItemsNum(dm.dirKeyPrefix + dirPath) +} + //use lua script to make directories and then put file for atomic func (dm *DirectoryManager) PutFile(fullFileName string, fid string) error { fullFileName = embedded_filer.CleanFilePath(fullFileName) @@ -310,7 +318,9 @@ func (dm *DirectoryManager) DeleteFile(fullFileName string) (fid string, err err local dirPathKey=KEYS[1] local fname = ARGV[1] local fid=redis.call('hget', dirPathKey, fname) - redis.call('hdel', dirPathKey, fname) + if fid != false then + redis.call('hdel', dirPathKey, fname) + end return fid `) result, err := script.Run(dm.Client, []string{dm.dirFileKeyPrefix + dirPath}, []string{fname}).Result() @@ -356,3 +366,21 @@ func (dm *DirectoryManager) ListFiles(dirPath string, lastFileName string, limit } return files, le } + +//get the amount of files under a directory +func (dm *DirectoryManager) GetFilesNum(dirPath string) (int64, error) { + dirPath = embedded_filer.CleanFilePath(dirPath) + return dm.getSubItemsNum(dm.dirFileKeyPrefix + dirPath) +} + +//get a hash key's fields number +func (dm *DirectoryManager) getSubItemsNum(key string) (int64, error) { + result, err := dm.Client.HLen(key).Result() + if err == redis.Nil { + err = nil + } + if err != nil { + glog.Errorf("hlen %s error:%v", key, err) + } + return result, err +} diff --git a/go/filer/redis_store/redis_store.go b/go/filer/redis_store/redis_store.go index 59317219a..ef6197144 100644 --- a/go/filer/redis_store/redis_store.go +++ b/go/filer/redis_store/redis_store.go @@ -1,8 +1,10 @@ package redis_store import ( + "fmt" + "path/filepath" + "github.com/chrislusf/seaweedfs/go/filer" - "github.com/chrislusf/seaweedfs/go/filer/flat_namespace" redis "gopkg.in/redis.v2" ) @@ -21,53 +23,16 @@ func NewRedisStore(hostPort string, database int) *RedisStore { return &RedisStore{Client: client, dm: dm} } -/* func (s *RedisStore) Get(fullFileName string) (fid string, err error) { - fid, err = s.Client.Get(fullFileName).Result() - if err == redis.Nil { - err = nil - } - return fid, err + return s.dm.FindFile(fullFileName) } func (s *RedisStore) Put(fullFileName string, fid string) (err error) { - _, err = s.Client.Set(fullFileName, fid).Result() - if err == redis.Nil { - err = nil - } - return err + return s.dm.PutFile(fullFileName, fid) } -// Currently the fid is not returned +// Currently the fid is returned func (s *RedisStore) Delete(fullFileName string) (fid string, err error) { - _, err = s.Client.Del(fullFileName).Result() - if err == redis.Nil { - err = nil - } - return "", err -} -*/ -func (s *RedisStore) Get(fullFileName string) (fid string, err error) { - fid, err = s.Client.Get(fullFileName).Result() - if err == redis.Nil { - err = nil - } - return fid, err -} -func (s *RedisStore) Put(fullFileName string, fid string) (err error) { - _, err = s.Client.Set(fullFileName, fid).Result() - if err == redis.Nil { - err = nil - } - return err -} - -// Currently the fid is not returned -func (s *RedisStore) Delete(fullFileName string) (fid string, err error) { - _, err = s.Client.Del(fullFileName).Result() - if err == redis.Nil { - err = nil - } - return "", err + return s.dm.DeleteFile(fullFileName) } func (s *RedisStore) Close() { @@ -76,19 +41,61 @@ func (s *RedisStore) Close() { } } -func (c *RedisStore) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { - return c.dm.FindDirectory(dirPath) +func (s *RedisStore) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { + return s.dm.FindDirectory(dirPath) } -func (c *RedisStore) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) { - return c.dm.ListDirectories(dirPath) +func (s *RedisStore) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) { + return s.dm.ListDirectories(dirPath) } -func (c *RedisStore) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { - return nil, flat_namespace.ErrNotImplemented +func (s *RedisStore) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { + return s.dm.ListFiles(dirPath, lastFileName, limit) } -func (c *RedisStore) DeleteDirectory(dirPath string, recursive bool) (err error) { - return flat_namespace.ErrNotImplemented +func (s *RedisStore) DeleteDirectory(dirPath string, recursive bool) (err error) { + if recursive { + nDirs, _ := s.dm.GetSubDirectoriesNum(dirPath) + if nDirs > 0 { + return fmt.Errorf("Fail to delete directory %s: %d sub directories found!", dirPath, nDirs) + } + nFiles, _ := s.dm.GetFilesNum(dirPath) + if nFiles > 0 { + return fmt.Errorf("Fail to delete directory %s: %d files found!", dirPath, nFiles) + + } + + } else { + err = s.dm.DeleteDirectory(dirPath) + } + return } -func (c *RedisStore) Move(fromPath string, toPath string) error { - return flat_namespace.ErrNotImplemented +/* +Move a folder or a file, with 4 Use cases: +mv fromDir toNewDir +mv fromDir toOldDir +mv fromFile toDir +mv fromFile toFile +*/ +func (s *RedisStore) Move(fromPath string, toPath string) error { + //first check whether fromPath is a directory + fromDid, _ := s.dm.FindDirectory(fromPath) + if fromDid > 0 { + toDid, _ := s.dm.FindDirectory(toPath) + if toDid > 0 { + //move under an existing dir + return s.dm.MoveUnderDirectory(fromPath, toPath, "") + } + //move to a new dir + return s.dm.MoveUnderDirectory(fromPath, filepath.Dir(toPath), filepath.Base(toPath)) + } + //whether fromPath is a file path + if fid, err := s.dm.DeleteFile(fromPath); err == nil { + toDid, _ := s.dm.FindDirectory(toPath) + if toDid > 0 { + //move file under an existing dir + return s.dm.PutFile(filepath.Join(toPath, filepath.Base(fromPath)), fid) + } + //move to a folder with a new name + return s.dm.PutFile(toPath, fid) + } + return fmt.Errorf("File %s is not found!", fromPath) }