From 91c2cc0dcd42b651f3e7ab6060bea2d7a810b9f1 Mon Sep 17 00:00:00 2001 From: zhaozhi Date: Wed, 17 Feb 2016 17:49:24 +0800 Subject: [PATCH] finish redis_store's directory manager and its unit testing --- go/filer/redis_store/directory_manager.go | 224 ++++++++++++++++++ .../redis_store/directory_manager_test.go | 186 +++++++++++++++ go/filer/redis_store/redis_store.go | 44 ++++ 3 files changed, 454 insertions(+) create mode 100644 go/filer/redis_store/directory_manager.go create mode 100644 go/filer/redis_store/directory_manager_test.go diff --git a/go/filer/redis_store/directory_manager.go b/go/filer/redis_store/directory_manager.go new file mode 100644 index 000000000..138cd918e --- /dev/null +++ b/go/filer/redis_store/directory_manager.go @@ -0,0 +1,224 @@ +package redis_store + +import ( + "fmt" + "path/filepath" + "strconv" + "strings" + + "github.com/chrislusf/seaweedfs/go/filer" + "github.com/chrislusf/seaweedfs/go/filer/embedded_filer" + "github.com/chrislusf/seaweedfs/go/glog" + redis "gopkg.in/redis.v2" +) + +type DirectoryManager struct { + Client *redis.Client + dirMaxIdKey string + dirKeyPrefix string + dirFileKeyPrefix string +} + +func InitDirectoryManger(client *redis.Client) *DirectoryManager { + dirMaxIdKey := "swfs:dir-max-id" + dm := &DirectoryManager{ + Client: client, + dirMaxIdKey: dirMaxIdKey, + dirKeyPrefix: "d:", + dirFileKeyPrefix: "df:"} + + dm.initDirectoryId() + return dm +} + +//check whether directory id exists in redis, if not then init to zero. +func (dm *DirectoryManager) initDirectoryId() error { + err := dm.Client.Get(dm.dirMaxIdKey).Err() + if err == redis.Nil { + err = dm.Client.Set(dm.dirMaxIdKey, "1").Err() + if err != nil { + glog.Errorln("init dir max id error:", err) + } + } else if err != nil { + glog.Errorln("get dir max id error:", err) + } + return err +} + +//use redis lua script to make all parent dirs in the dirPath, like linux command `mkdir -p` +func (dm *DirectoryManager) MakeDirectory(dirPath string) (filer.DirectoryId, error) { + dirPath = embedded_filer.CleanFilePath(dirPath) + if dirPath == "/" { + return 1, nil + } + parts := strings.Split(dirPath, "/") + //'d' stands for directory. root must end with a slash + root := dm.dirKeyPrefix + "/" + script := redis.NewScript(` + local root=KEYS[1] + local dirMaxIdKey=KEYS[2] + local did, exists + for i, v in ipairs(ARGV) do + exists=redis.call('hexists', root, v) + if exists == 0 then + did=redis.call('incr', dirMaxIdKey) + redis.call('hset', root, v, did) + end + if i==1 then + root=root .. v + else + root=root..'/'..v + end + end + redis.call('set', 'last-dir-id', did) + return did + `) + result, err := script.Run(dm.Client, []string{root, dm.dirMaxIdKey}, parts[1:]).Result() + if err != nil { + glog.Errorln("redis eval make directory script error:", err) + return 0, err + } + did, ok := result.(int64) + if !ok { + glog.Errorln("convert result:", result, " get:", did) + } + return filer.DirectoryId(did), err +} + +//delete directory dirPath and its sub-directories recursively; +//it's not this function's responsibility to check whether dirPath is empty. +func (dm *DirectoryManager) DeleteDirectory(dirPath string) error { + dirPath = embedded_filer.CleanFilePath(dirPath) + if dirPath == "/" { + return fmt.Errorf("Can not delete %s", dirPath) + } + script := redis.NewScript(` + local delSubs + delSubs = function(dir) + local subs=redis.call('hgetall', dir); + for i, v in ipairs(subs) do + if i%2 ~= 0 then + local subd=dir..'/'..v; + delSubs(subd); + end + end + end + delSubs(KEYS[1]) + `) + err := script.Run(dm.Client, []string{dm.dirKeyPrefix + dirPath}, nil).Err() + return err +} + +func (dm *DirectoryManager) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { + entry, err := dm.findDirectory(dirPath) + return entry.Id, err +} + +func (dm *DirectoryManager) findDirectory(dirPath string) (filer.DirectoryEntry, error) { + dirPath = embedded_filer.CleanFilePath(dirPath) + if dirPath == "/" { + return filer.DirectoryEntry{Name: "/", Id: 1}, nil + } + basePart := filepath.Base(dirPath) + parentPart := filepath.Dir(dirPath) + did, err := dm.Client.HGet(dm.dirKeyPrefix+parentPart, basePart).Int64() + //adjust redis.Nil to golang nil + if err == redis.Nil { + err = nil + } + if err != nil { + return filer.DirectoryEntry{}, err + } + return filer.DirectoryEntry{Name: basePart, Id: filer.DirectoryId(did)}, nil +} + +/* +moving is finished by following steps: +1.get old dir's directory id +2.delete old dir from its parent's hash table +3.add old dir's directory id into new parent dir's hash table, if newName is not empty, use it +4.if the full old dir path key exists(its a non-empty directory), rename it to full new dir path key +*/ +func (dm *DirectoryManager) MoveUnderDirectory(oldDirPath string, newParentDirPath string, newName string) error { + oldDirPath = embedded_filer.CleanFilePath(oldDirPath) + oldDirName := filepath.Base(oldDirPath) //old dir's name without path + oldParentDir := filepath.Dir(oldDirPath) + + newParentDirPath = embedded_filer.CleanFilePath(newParentDirPath) + newParentDirName := filepath.Base(newParentDirPath) // new parent dir's name without path + newParentParentDir := filepath.Dir(newParentDirPath) //newParentDirPath's parent directory + + /* + lua script comments: + suppose oldDirPathKey is d:/a/b/c, newParentDirPathKey is d:/a/b/d, newName is ""; + then oldParentDirKey is d:/a/b, oldDirName is c, and + newParentParentDirKey is d:/a/b, newParentDirName is d; + newTargetName is c + */ + script := redis.NewScript(` + local oldDirPathKey=KEYS[1] + local newParentDirPathKey=KEYS[2] + local oldParentDirKey=ARGV[1] + local oldDirName=ARGV[2] + local newParentParentDirKey=ARGV[3] + local newParentDirName=ARGV[4] + local newTargetName=ARGV[5] + local oldDirId=redis.call('hget', oldParentDirKey, oldDirName) + local newParentDirId=redis.call('hget', newParentParentDirKey, newParentDirName) + if oldDirId == false then + return -1 + end + if newParentDirId == false then + return -2 + end + redis.call('hdel', oldParentDirKey, oldDirName) + redis.call('hset', newParentDirPathKey, newTargetName, oldDirId) + local exists=redis.call('exists', oldDirPathKey) + if exists==1 then + redis.call('rename', oldDirPathKey, newParentDirPathKey..'/'..newTargetName) + end + return 0 + `) + if newName == "" { + newName = oldDirName + } + result, err := script.Run( + dm.Client, + []string{ + dm.dirKeyPrefix + oldDirPath, + dm.dirKeyPrefix + newParentDirPath}, + []string{ + dm.dirKeyPrefix + oldParentDir, + oldDirName, + dm.dirKeyPrefix + newParentParentDir, + newParentDirName, newName}).Result() + if err != nil { + glog.Errorln("redis eval move directory script error:", err) + } else { + ret, ok := result.(int64) + if !ok { + glog.Errorln("convert result:", result, " get:", ret) + } + if ret == -1 { + err = fmt.Errorf("src dir: %s not exists", oldDirPath) + } else if ret == -2 { + err = fmt.Errorf("dest dir: %s not exists", newParentDirPath) + } + } + return err +} + +func (dm *DirectoryManager) ListDirectories(dirPath string) (dirNames []filer.DirectoryEntry, err error) { + dirPath = embedded_filer.CleanFilePath(dirPath) + result, le := dm.Client.HGetAllMap(dm.dirKeyPrefix + dirPath).Result() + if le != nil { + glog.Errorf("get sub-directories of %s error:%v", dirPath, err) + return dirNames, le + } + for k, v := range result { + did, _ := strconv.Atoi(v) + entry := filer.DirectoryEntry{Name: k, Id: filer.DirectoryId(did)} + dirNames = append(dirNames, entry) + } + return dirNames, nil +} diff --git a/go/filer/redis_store/directory_manager_test.go b/go/filer/redis_store/directory_manager_test.go new file mode 100644 index 000000000..30325f3cf --- /dev/null +++ b/go/filer/redis_store/directory_manager_test.go @@ -0,0 +1,186 @@ +package redis_store + +import ( + "fmt" + "os" + "path/filepath" + "testing" + + redis "gopkg.in/redis.v2" +) + +var dm *DirectoryManager = nil + +func TestDirectory(t *testing.T) { + //root dir + did, err := dm.MakeDirectory("/") + if err != nil { + t.Errorf("make root dir error:%v\n", err) + } + if did != 1 { + t.Errorf("root dir's id is not 1!") + } + //make some dirs + dir := "/a/b/c" + did, err = dm.MakeDirectory(dir) + if err != nil { + t.Errorf("make dirs:%s, error:%v\n", dir, err) + } + if did != 4 { + t.Errorf("dir:%s's id is %d, 4 is expected!", dir, did) + } + //check its parent dir + parentDir := filepath.Dir(dir) + did, err = dm.FindDirectory(parentDir) + if err != nil { + t.Errorf("get %s's parent dir %s, error:%v\n", dir, parentDir, err) + } + if did != 3 { + t.Errorf("dir:%s's id is %d, 3 is expected!", parentDir, did) + } + //make another dir with same parent + dir = "/a/b/d" + did, err = dm.MakeDirectory(dir) + if err != nil { + t.Errorf("make dirs:%s, error:%v\n", dir, err) + } + if did != 5 { + t.Errorf("dir:%s's id is %d, 5 is expected!", dir, did) + } + //check its parent dir + parentDir = filepath.Dir(dir) + did, err = dm.FindDirectory(parentDir) + if err != nil { + t.Errorf("get %s's parent dir %s, error:%v\n", dir, parentDir, err) + } + if did != 3 { + t.Errorf("dir:%s's id is %d, 3 is expected!", parentDir, did) + } + //find /a + dir = "/a" + did, err = dm.FindDirectory(dir) + if err != nil { + t.Errorf("find dir %s, error:%v\n", dir, err) + } + if did != 2 { + t.Errorf("dir:%s's id is %d, 2 is expected!", dir, did) + } + //make /a/b/c/e, so /a/b/c has a sub-directory + dir = "/a/b/c/e" + did, err = dm.MakeDirectory(dir) + if err != nil { + t.Errorf("make dirs:%s, error:%v\n", dir, err) + } + if did != 6 { + t.Errorf("dir:%s's id is %d, 6 is expected!", dir, did) + } + + /* + * + * move to an existing dir + * + * + */ + + //move /a/b/c under /a/b/d + from := "/a/b/c" + to := "/a/b/d" + err = dm.MoveUnderDirectory(from, to, "") + if err != nil { + t.Errorf("move %s to %s error:%v", from, to, err) + } + //now /a/b/c should not exist + did, err = dm.FindDirectory(from) + if err != nil { + t.Errorf("find from dir %s error:%v", from, err) + } + if did != 0 { + t.Errorf("%s still exists after moved under %s", from, to) + } + //now /a/b/d/c should exist + dir = filepath.Join(to, filepath.Base(from)) + did, err = dm.FindDirectory(dir) + if did != 4 { + t.Errorf("new dir %s has a wrong dir id:%d, 4 is expected", dir, did) + } + //now /a/b/d/c/e also should exist, this indicates c is moved under /a/b/d entirely, include its sub-directories + dir = "/a/b/d/c/e" + did, err = dm.FindDirectory(dir) + if did != 6 { + t.Errorf("new dir %s has a wrong dir id:%d, 6 is expected", dir, did) + } + + /* + * + * move to an new dir + * + */ + + //now move /a/b/d/c to /a/b/f, f is a new dir, this means c will be changed to f, but dir id will not be changed + from = "/a/b/d/c" + to = "/a/b/f" + err = dm.MoveUnderDirectory(from, filepath.Dir(to), filepath.Base(to)) + if err != nil { + t.Errorf("move %s to %s error:%v", from, to, err) + } + // /a/b/d/c should not exist + dir = from + did, err = dm.FindDirectory(dir) + if err != nil { + t.Errorf("find dir %s error:%v", dir, err) + } + if did != 0 { + t.Errorf("%s still exists after moved to %s", from, to) + } + // /a/b/f should exist + dir = to + did, err = dm.FindDirectory(dir) + if did != 4 { + t.Errorf("new dir %s has a wrong dir id:%d, 4 is expected", dir, did) + } + // /a/b/f/e also should exist + dir = to + "/e" + did, err = dm.FindDirectory(dir) + if did != 6 { + t.Errorf("new dir %s has a wrong dir id:%d, 6 is expected", dir, did) + } + +} + +func clearRedisKeys(client *redis.Client, dirKeyPrefix string, dirMaxIdKey string) error { + result, err := client.Keys(dirKeyPrefix + "*").Result() + if err != nil { + fmt.Println("get redis keys error:", err) + return err + } + if len(result) > 0 { + n, err := client.Del(result...).Result() + if err != nil { + fmt.Println("del keys error:", err) + } else { + fmt.Println("del", n, " keys.") + } + } + n, err := client.Del(dirMaxIdKey).Result() + if err != nil { + fmt.Printf("del dirMaxIdKey:%s, error:%v\n", dirMaxIdKey, err) + } else { + fmt.Println("del", n, " keys.") + } + return err +} +func TestMain(m *testing.M) { + redisClient := redis.NewTCPClient(&redis.Options{ + Addr: "localhost:6379", + Password: "", // no password set + DB: 0, + }) + err := clearRedisKeys(redisClient, "d:", "swfs:dir-max-id") + if err != nil { + os.Exit(-1) + } + dm = InitDirectoryManger(redisClient) + ret := m.Run() + redisClient.Close() + os.Exit(ret) +} diff --git a/go/filer/redis_store/redis_store.go b/go/filer/redis_store/redis_store.go index e71776845..a31f98232 100644 --- a/go/filer/redis_store/redis_store.go +++ b/go/filer/redis_store/redis_store.go @@ -1,6 +1,8 @@ package redis_store import ( + "github.com/chrislusf/seaweedfs/go/filer" + "github.com/chrislusf/seaweedfs/go/filer/flat_namespace" redis "gopkg.in/redis.v2" ) @@ -17,6 +19,31 @@ func NewRedisStore(hostPort string, database int) *RedisStore { return &RedisStore{Client: client} } +/* +func (s *RedisStore) Get(fullFileName string) (fid string, err error) { + fid, err = s.Client.Get(fullFileName).Result() + if err == redis.Nil { + err = nil + } + return fid, err +} +func (s *RedisStore) Put(fullFileName string, fid string) (err error) { + _, err = s.Client.Set(fullFileName, fid).Result() + if err == redis.Nil { + err = nil + } + return err +} + +// Currently the fid is not returned +func (s *RedisStore) Delete(fullFileName string) (fid string, err error) { + _, err = s.Client.Del(fullFileName).Result() + if err == redis.Nil { + err = nil + } + return "", err +} +*/ func (s *RedisStore) Get(fullFileName string) (fid string, err error) { fid, err = s.Client.Get(fullFileName).Result() if err == redis.Nil { @@ -46,3 +73,20 @@ func (s *RedisStore) Close() { s.Client.Close() } } + +func (c *RedisStore) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { + return 0, flat_namespace.ErrNotImplemented +} +func (c *RedisStore) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) { + return nil, flat_namespace.ErrNotImplemented +} +func (c *RedisStore) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { + return nil, flat_namespace.ErrNotImplemented +} +func (c *RedisStore) DeleteDirectory(dirPath string, recursive bool) (err error) { + return flat_namespace.ErrNotImplemented +} + +func (c *RedisStore) Move(fromPath string, toPath string) error { + return flat_namespace.ErrNotImplemented +}