diff --git a/go/filer/distributed_filer/distributed_filer.go b/go/filer/distributed_filer/distributed_filer.go new file mode 100644 index 000000000..7e3f9fcd4 --- /dev/null +++ b/go/filer/distributed_filer/distributed_filer.go @@ -0,0 +1,42 @@ +package distributed_filer + +import "github.com/chrislusf/seaweedfs/go/filer" + +type DistributedFiler struct { + master string + store DistributedStore +} + +func NewDistributedFiler(master string, store DistributedStore) *DistributedFiler { + return &DistributedFiler{ + master: master, + store: store, + } +} + +func (filer *DistributedFiler) CreateFile(fullFileName string, fid string) (err error) { + return filer.store.Put(fullFileName, fid) +} +func (filer *DistributedFiler) FindFile(fullFileName string) (fid string, err error) { + return filer.store.Get(fullFileName) +} +func (filer *DistributedFiler) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { + return filer.store.FindDirectory(dirPath) +} +func (filer *DistributedFiler) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) { + return filer.store.ListDirectories(dirPath) +} +func (filer *DistributedFiler) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { + return filer.store.ListFiles(dirPath, lastFileName, limit) +} +func (filer *DistributedFiler) DeleteDirectory(dirPath string, recursive bool) (err error) { + return filer.store.DeleteDirectory(dirPath, recursive) +} + +func (filer *DistributedFiler) DeleteFile(fullFileName string) (fid string, err error) { + return filer.store.Delete(fullFileName) +} + +func (filer *DistributedFiler) Move(fromPath string, toPath string) error { + return filer.store.Move(fromPath, toPath) +} diff --git a/go/filer/distributed_filer/distributed_store.go b/go/filer/distributed_filer/distributed_store.go new file mode 100644 index 000000000..fe6d5a2b9 --- /dev/null +++ b/go/filer/distributed_filer/distributed_store.go @@ -0,0 +1,15 @@ +package distributed_filer + +import "github.com/chrislusf/seaweedfs/go/filer" + +type DistributedStore interface { + Put(fullFileName string, fid string) (err error) + Get(fullFileName string) (fid string, err error) + Delete(fullFileName string) (fid string, err error) + + FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) + ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) + ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) + DeleteDirectory(dirPath string, recursive bool) (err error) + Move(fromPath string, toPath string) error +} diff --git a/go/filer/redis_store/directory_manager.go b/go/filer/redis_store/directory_manager.go new file mode 100644 index 000000000..35d39c2ae --- /dev/null +++ b/go/filer/redis_store/directory_manager.go @@ -0,0 +1,395 @@ +package redis_store + +import ( + "fmt" + "path/filepath" + "sort" + "strconv" + "strings" + + "github.com/chrislusf/seaweedfs/go/filer" + "github.com/chrislusf/seaweedfs/go/filer/embedded_filer" + "github.com/chrislusf/seaweedfs/go/glog" + redis "gopkg.in/redis.v2" +) + +type DirectoryManager struct { + Client *redis.Client + dirMaxIdKey string + dirKeyPrefix string + dirFileKeyPrefix string +} + +func InitDirectoryManger(client *redis.Client) *DirectoryManager { + dirMaxIdKey := "swfs:dir-max-id" + dm := &DirectoryManager{ + Client: client, + dirMaxIdKey: dirMaxIdKey, + dirKeyPrefix: "d:", + dirFileKeyPrefix: "df:"} + + dm.initDirectoryId() + return dm +} + +//check whether directory id exists in redis, if not then init to zero. +func (dm *DirectoryManager) initDirectoryId() error { + err := dm.Client.Get(dm.dirMaxIdKey).Err() + if err == redis.Nil { + err = dm.Client.Set(dm.dirMaxIdKey, "1").Err() + if err != nil { + glog.Errorln("init dir max id error:", err) + } + } else if err != nil { + glog.Errorln("get dir max id error:", err) + } + return err +} + +//use redis lua script to make all parent dirs in the dirPath, like linux command `mkdir -p` +func (dm *DirectoryManager) MakeDirectory(dirPath string) (filer.DirectoryId, error) { + dirPath = embedded_filer.CleanFilePath(dirPath) + if dirPath == "/" { + return 1, nil + } + parts := strings.Split(dirPath, "/") + //'d' stands for directory. root must end with a slash + root := dm.dirKeyPrefix + "/" + script := redis.NewScript(` + local root=KEYS[1] + local dirMaxIdKey=KEYS[2] + local did + for i, v in ipairs(ARGV) do + did=redis.call('hget', root, v) + if did == false then + did=redis.call('incr', dirMaxIdKey) + redis.call('hset', root, v, did) + end + if i==1 then + root=root .. v + else + root=root..'/'..v + end + end + return did + `) + result, err := script.Run(dm.Client, []string{root, dm.dirMaxIdKey}, parts[1:]).Result() + if err != nil { + glog.Errorln("redis eval make directory script error:", err) + return 0, err + } + did, ok := result.(int64) + if !ok { + glog.Errorln("convert result:", result, " get:", did) + } + return filer.DirectoryId(did), err +} + +//delete directory dirPath and its sub-directories or files recursively ; +//it's not this function's responsibility to check whether dirPath is en empty directory. +func (dm *DirectoryManager) DeleteDirectory(dirPath string) error { + dirPath = embedded_filer.CleanFilePath(dirPath) + if dirPath == "/" { + return fmt.Errorf("Can not delete %s", dirPath) + } + dirPathParent := filepath.Dir(dirPath) + dirPathName := filepath.Base(dirPath) + /* + lua comments: + 1. delete dirPath's sub-directories and files recursively + 2. delete dirPath itself from its parent dir + + */ + script := redis.NewScript(` + local delSubs + local dirKeyPrefix=KEYS[1] + local dirFileKeyPrefix=KEYS[2] + local dirPathParent=ARGV[1] + local dirPathName=ARGV[2] + delSubs = function(dir) + local subs=redis.call('hgetall', dirKeyPrefix..dir); + for i, v in ipairs(subs) do + if i%2 ~= 0 then + redis.call('hdel', dirKeyPrefix..dir, v) + local subd=dir..'/'..v; + delSubs(subd); + end + end + redis.call('del', dirFileKeyPrefix..dir) + end + delSubs(dirPathParent..'/'..dirPathName) + redis.call('hdel', dirKeyPrefix..dirPathParent, dirPathName) + return 0 + `) + //we do not use lua to get dirPath's parent dir and its basename, so do it in golang + err := script.Run(dm.Client, []string{dm.dirKeyPrefix, dm.dirFileKeyPrefix}, []string{dirPathParent, dirPathName}).Err() + return err +} + +func (dm *DirectoryManager) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { + entry, err := dm.findDirectory(dirPath) + return entry.Id, err +} + +func (dm *DirectoryManager) findDirectory(dirPath string) (filer.DirectoryEntry, error) { + dirPath = embedded_filer.CleanFilePath(dirPath) + if dirPath == "/" { + return filer.DirectoryEntry{Name: "/", Id: 1}, nil + } + basePart := filepath.Base(dirPath) + parentPart := filepath.Dir(dirPath) + did, err := dm.Client.HGet(dm.dirKeyPrefix+parentPart, basePart).Int64() + //adjust redis.Nil to golang nil + if err == redis.Nil { + err = nil + } + if err != nil { + return filer.DirectoryEntry{}, err + } + return filer.DirectoryEntry{Name: basePart, Id: filer.DirectoryId(did)}, nil +} + +/* +moving is finished by following steps: +1.get old dir's directory id +2.delete old dir from its parent's hash table +3.add old dir's directory id into new parent dir's hash table, if newName is not empty, use it +4.if the full old dir path key exists(its a non-empty directory), rename it to full new dir path key + +after moving is done, it returns 0, -1 indicates that source dir is not found, -2 indicates that destination dir is not found +*/ +func (dm *DirectoryManager) MoveUnderDirectory(oldDirPath string, newParentDirPath string, newName string) error { + oldDirPath = embedded_filer.CleanFilePath(oldDirPath) + oldDirName := filepath.Base(oldDirPath) //old dir's name without path + oldParentDir := filepath.Dir(oldDirPath) + + newParentDirPath = embedded_filer.CleanFilePath(newParentDirPath) + newParentDirName := filepath.Base(newParentDirPath) // new parent dir's name without path + newParentParentDir := filepath.Dir(newParentDirPath) //newParentDirPath's parent directory + + /* + lua script comments: + suppose oldDirPathKey is d:/a/b/c, newParentDirPathKey is d:/a/b/d, newName is ""; + then oldParentDirKey is d:/a/b, oldDirName is c, and + newParentParentDirKey is d:/a/b, newParentDirName is d; + newTargetName is c + */ + script := redis.NewScript(` + local oldDirPathKey=KEYS[1] + local newParentDirPathKey=KEYS[2] + local oldParentDirKey=ARGV[1] + local oldDirName=ARGV[2] + local newParentParentDirKey=ARGV[3] + local newParentDirName=ARGV[4] + local newTargetName=ARGV[5] + local oldDirId=redis.call('hget', oldParentDirKey, oldDirName) + local newParentDirId=redis.call('hget', newParentParentDirKey, newParentDirName) + if oldDirId == false then + return -1 + end + if newParentDirId == false then + return -2 + end + redis.call('hdel', oldParentDirKey, oldDirName) + redis.call('hset', newParentDirPathKey, newTargetName, oldDirId) + local exists=redis.call('exists', oldDirPathKey) + if exists==1 then + redis.call('rename', oldDirPathKey, newParentDirPathKey..'/'..newTargetName) + end + return 0 + `) + if newName == "" { + newName = oldDirName + } + result, err := script.Run( + dm.Client, + []string{ + dm.dirKeyPrefix + oldDirPath, + dm.dirKeyPrefix + newParentDirPath}, + []string{ + dm.dirKeyPrefix + oldParentDir, + oldDirName, + dm.dirKeyPrefix + newParentParentDir, + newParentDirName, newName}).Result() + if err != nil { + glog.Errorln("redis eval move directory script error:", err) + } else { + ret, ok := result.(int64) + if !ok { + glog.Errorln("convert result:", result, " get:", ret) + } + if ret == -1 { + err = fmt.Errorf("src dir: %s not exists", oldDirPath) + } else if ret == -2 { + err = fmt.Errorf("dest dir: %s not exists", newParentDirPath) + } + } + return err +} + +//return a dir's sub-directories, directories' name are sorted lexicographically +func (dm *DirectoryManager) ListDirectories(dirPath string) (dirNames []filer.DirectoryEntry, err error) { + dirPath = embedded_filer.CleanFilePath(dirPath) + result, le := dm.Client.HGetAllMap(dm.dirKeyPrefix + dirPath).Result() + if le != nil { + glog.Errorf("get sub-directories of %s error:%v", dirPath, err) + return dirNames, le + } + //sort entries by directories' name + keys := make(sort.StringSlice, len(result)) + i := 0 + for k, _ := range result { + keys[i] = k + i++ + } + keys.Sort() + for _, k := range keys { + v := result[k] + did, _ := strconv.Atoi(v) + entry := filer.DirectoryEntry{Name: k, Id: filer.DirectoryId(did)} + dirNames = append(dirNames, entry) + } + return dirNames, nil +} + +//get the amount of directories under a directory +func (dm *DirectoryManager) GetSubDirectoriesNum(dirPath string) (int64, error) { + dirPath = embedded_filer.CleanFilePath(dirPath) + return dm.getSubItemsNum(dm.dirKeyPrefix + dirPath) +} + +//use lua script to make directories and then put file for atomic +func (dm *DirectoryManager) PutFile(fullFileName string, fid string) error { + fullFileName = embedded_filer.CleanFilePath(fullFileName) + dirPath := filepath.Dir(fullFileName) + fname := filepath.Base(fullFileName) + parts := strings.Split(dirPath, "/") + //'d' stands for directory. root must end with a slash + root := dm.dirKeyPrefix + "/" + script := redis.NewScript(` + local root=KEYS[1] + local dirMaxIdKey=KEYS[2] + local dirFileKeyPrefix=KEYS[3] + local fname=KEYS[4] + local fid=KEYS[5] + local dirPath=table.concat(ARGV, '/') + local did + for i, v in ipairs(ARGV) do + did=redis.call('hget', root, v) + if did == false then + did=redis.call('incr', dirMaxIdKey) + redis.call('hset', root, v, did) + end + if i==1 then + root=root .. v + else + root=root..'/'..v + end + end + redis.call('hset', dirFileKeyPrefix..'/'..dirPath, fname, fid) + return did + `) + err := script.Run(dm.Client, []string{root, dm.dirMaxIdKey, dm.dirFileKeyPrefix, fname, fid}, parts[1:]).Err() + if err != nil { + glog.Errorln("redis eval put file script error:", err) + } + return err +} + +func (dm *DirectoryManager) FindFile(fullFileName string) (fid string, err error) { + fullFileName = embedded_filer.CleanFilePath(fullFileName) + dirPath := filepath.Dir(fullFileName) + fname := filepath.Base(fullFileName) + result, err := dm.Client.HGet(dm.dirFileKeyPrefix+dirPath, fname).Result() + if err == redis.Nil { + err = nil + } + if err != nil { + glog.Errorf("get file %s error:%v", fullFileName, err) + } + return result, err +} + +func (dm *DirectoryManager) DeleteFile(fullFileName string) (fid string, err error) { + fullFileName = embedded_filer.CleanFilePath(fullFileName) + dirPath := filepath.Dir(fullFileName) + fname := filepath.Base(fullFileName) + script := redis.NewScript(` + local dirPathKey=KEYS[1] + local fname = ARGV[1] + local fid=redis.call('hget', dirPathKey, fname) + if fid ~= false then + redis.call('hdel', dirPathKey, fname) + end + return fid + `) + result, err := script.Run(dm.Client, []string{dm.dirFileKeyPrefix + dirPath}, []string{fname}).Result() + if err != nil { + glog.Errorf("delete file %s error:%v\n", fullFileName, err) + } + fid, ok := result.(string) + if !ok { + glog.Errorf("convert result %v to string failed", result) + } + return fid, err +} + +//list files under dirPath, use lastFileName and limit for pagination +//in fact, this implemention has a bug, you can not get first file of the first page; +//the api needs to be modified +func (dm *DirectoryManager) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { + + dirPath = embedded_filer.CleanFilePath(dirPath) + result, le := dm.Client.HGetAllMap(dm.dirFileKeyPrefix + dirPath).Result() + if le != nil { + glog.Errorf("get files of %s error:%v", dirPath, err) + return files, le + } + //if the lastFileName argument is ok + //when list first page, pass lastFileName an empty string + if _, ok := result[lastFileName]; ok || lastFileName == "" { + //sort entries by file names + //when files amount is large, here may be slow + nResult := len(result) + keys := make(sort.StringSlice, nResult) + i := 0 + for k, _ := range result { + keys[i] = k + i++ + } + keys.Sort() + index := -1 + if ok { + index = keys.Search(lastFileName) + } + cnt := 0 + for _, k := range keys[index+1:] { + fid := result[k] + entry := filer.FileEntry{Name: k, Id: filer.FileId(fid)} + files = append(files, entry) + cnt++ + if cnt == limit { + break + } + } + } + return files, le +} + +//get the amount of files under a directory +func (dm *DirectoryManager) GetFilesNum(dirPath string) (int64, error) { + dirPath = embedded_filer.CleanFilePath(dirPath) + return dm.getSubItemsNum(dm.dirFileKeyPrefix + dirPath) +} + +//get a hash key's fields number +func (dm *DirectoryManager) getSubItemsNum(key string) (int64, error) { + result, err := dm.Client.HLen(key).Result() + if err == redis.Nil { + err = nil + } + if err != nil { + glog.Errorf("hlen %s error:%v", key, err) + } + return result, err +} diff --git a/go/filer/redis_store/directory_manager_test.go b/go/filer/redis_store/directory_manager_test.go new file mode 100644 index 000000000..1fc0eb146 --- /dev/null +++ b/go/filer/redis_store/directory_manager_test.go @@ -0,0 +1,288 @@ +package redis_store + +import ( + "flag" + "fmt" + "os" + "path/filepath" + "testing" + + redis "gopkg.in/redis.v2" +) + +var dm *DirectoryManager = nil + +func TestDirectory(t *testing.T) { + //root dir + did, err := dm.MakeDirectory("/") + if err != nil { + t.Errorf("make root dir error:%v\n", err) + } + if did != 1 { + t.Errorf("root dir's id is not 1!") + } + //make some dirs + dir := "/a/b/c" + did, err = dm.MakeDirectory(dir) + if err != nil { + t.Errorf("make dirs:%s, error:%v\n", dir, err) + } + if did != 4 { + t.Errorf("dir:%s's id is %d, 4 is expected!", dir, did) + } + //check its parent dir + parentDir := filepath.Dir(dir) + did, err = dm.FindDirectory(parentDir) + if err != nil { + t.Errorf("get %s's parent dir %s, error:%v\n", dir, parentDir, err) + } + if did != 3 { + t.Errorf("dir:%s's id is %d, 3 is expected!", parentDir, did) + } + //make another dir with same parent + dir = "/a/b/d" + did, err = dm.MakeDirectory(dir) + if err != nil { + t.Errorf("make dirs:%s, error:%v\n", dir, err) + } + if did != 5 { + t.Errorf("dir:%s's id is %d, 5 is expected!", dir, did) + } + //check its parent dir + parentDir = filepath.Dir(dir) + did, err = dm.FindDirectory(parentDir) + if err != nil { + t.Errorf("get %s's parent dir %s, error:%v\n", dir, parentDir, err) + } + if did != 3 { + t.Errorf("dir:%s's id is %d, 3 is expected!", parentDir, did) + } + //find /a + dir = "/a" + did, err = dm.FindDirectory(dir) + if err != nil { + t.Errorf("find dir %s, error:%v\n", dir, err) + } + if did != 2 { + t.Errorf("dir:%s's id is %d, 2 is expected!", dir, did) + } + //make /a/b/c/e, so /a/b/c has a sub-directory + dir = "/a/b/c/e" + did, err = dm.MakeDirectory(dir) + if err != nil { + t.Errorf("make dirs:%s, error:%v\n", dir, err) + } + if did != 6 { + t.Errorf("dir:%s's id is %d, 6 is expected!", dir, did) + } + + /* + * + * move to an existing dir + * + * + */ + + //move /a/b/c under /a/b/d + from := "/a/b/c" + to := "/a/b/d" + err = dm.MoveUnderDirectory(from, to, "") + if err != nil { + t.Errorf("move %s to %s error:%v", from, to, err) + } + //now /a/b/c should not exist + did, err = dm.FindDirectory(from) + if err != nil { + t.Errorf("find from dir %s error:%v", from, err) + } + if did != 0 { + t.Errorf("%s still exists after moved under %s", from, to) + } + //now /a/b/d/c should exist + dir = filepath.Join(to, filepath.Base(from)) + did, err = dm.FindDirectory(dir) + if did != 4 { + t.Errorf("new dir %s has a wrong dir id:%d, 4 is expected", dir, did) + } + //now /a/b/d/c/e also should exist, this indicates c is moved under /a/b/d entirely, include its sub-directories + dir = "/a/b/d/c/e" + did, err = dm.FindDirectory(dir) + if did != 6 { + t.Errorf("new dir %s has a wrong dir id:%d, 6 is expected", dir, did) + } + + /* + * + * move to a new dir + * + */ + + //now move /a/b/d/c to /a/b/f, f is a new dir, this means c will be changed to f, but dir id will not be changed + from = "/a/b/d/c" + to = "/a/b/f" + err = dm.MoveUnderDirectory(from, filepath.Dir(to), filepath.Base(to)) + if err != nil { + t.Errorf("move %s to %s error:%v", from, to, err) + } + // /a/b/d/c should not exist + dir = from + did, err = dm.FindDirectory(dir) + if err != nil { + t.Errorf("find dir %s error:%v", dir, err) + } + if did != 0 { + t.Errorf("%s still exists after moved to %s", from, to) + } + // /a/b/f should exist + dir = to + did, err = dm.FindDirectory(dir) + if did != 4 { + t.Errorf("new dir %s has a wrong dir id:%d, 4 is expected", dir, did) + } + // /a/b/f/e also should exist + dir = to + "/e" + did, err = dm.FindDirectory(dir) + if did != 6 { + t.Errorf("new dir %s has a wrong dir id:%d, 6 is expected", dir, did) + } + + /* + * list a dir's sub-directories + * + */ + dir = "/a/b" + entries, err := dm.ListDirectories(dir) + if !(entries[0].Name == "d" && entries[1].Name == "f") { + t.Errorf("get entries:%v, expect 'd', 'f'", entries) + } + /* + * delete a dir: /a/b/f/e + * + */ + dir = "/a/b/f" + err = dm.DeleteDirectory(dir) + if err != nil { + t.Errorf("delete dir:%s error:%v", dir, err) + } + // now the deleted dir should not exist + did, err = dm.FindDirectory(dir) + if err != nil { + t.Errorf("get deleted dir %s error:%v", dir, err) + } + if did != 0 { + t.Errorf("the dir %s is not deleted!", dir) + } + // now the deleted dir's sub-directory also should not exist + dir = "/a/b/f/e" + did, err = dm.FindDirectory(dir) + if err != nil { + t.Errorf("get deleted dir %s error:%v", dir, err) + } + if did != 0 { + t.Errorf("the dir %s is not deleted!", dir) + } +} + +func TestFiles(t *testing.T) { + // put one file + fname := "/a/b/c/test.txt" + fid := "1,23" + err := dm.PutFile(fname, fid) + if err != nil { + t.Errorf("put file %s error:%v", fname, err) + } + // put another file + fname = "/a/b/c/abc.txt" + fid = "1,234" + err = dm.PutFile(fname, fid) + if err != nil { + t.Errorf("put file %s error:%v", fname, err) + } + //get file + id, err := dm.FindFile(fname) + if err != nil { + t.Errorf("get file %s error:%v", fname, err) + } + if id != fid { + t.Errorf("get wrong fid %s, expect %s", id, fid) + } + //list files + lastFileName := "abc.txt" + dir := filepath.Dir(fname) + files, err := dm.ListFiles(dir, lastFileName, 10) + if err != nil { + t.Errorf("list files for %s error:%v", dir, err) + } + if files[0].Name != "test.txt" { + t.Errorf("files list order wrong, first file is %s, expect %s", files[0].Name, "test.txt") + } + //delete file + id, err = dm.DeleteFile(fname) + if err != nil { + t.Errorf("delete file get error:%v", err) + } + if id != fid { + t.Errorf("delete file return wrong fid %s, expect %s", id, fid) + } +} + +func clearRedisKeys(client *redis.Client, dirKeyPrefix string, dirMaxIdKey string) error { + result, err := client.Keys(dirKeyPrefix + "*").Result() + if err != nil { + fmt.Println("get redis keys error:", err) + return err + } + if len(result) > 0 { + n, err := client.Del(result...).Result() + if err != nil { + fmt.Println("del keys error:", err) + } else { + fmt.Println("del", n, " keys.") + } + } + n, err := client.Del(dirMaxIdKey).Result() + if err != nil { + fmt.Printf("del dirMaxIdKey:%s, error:%v\n", dirMaxIdKey, err) + } else { + fmt.Println("del", n, " keys.") + } + return err +} + +//don't use flag.PrintDefaults() for help, because it show a lot of flags which are used by go test command +func printUsage() { + fmt.Println("usage:\n\tgo test github.com/chrislusf/seaweedfs/go/filer/redis_store -redis_addr localhost:6379 [-redis_passwd \"\"] [-redis_db 0]\n") +} +func TestMain(m *testing.M) { + var ( + redisAddr string + redisPasswd string + redisDb int64 + dirKeyPrefix = "d:" + dirMaxIdKey = "swfs:dir-max-id" + ) + flag.StringVar(&redisAddr, "redis_addr", "", "A redis server to run this test, e.g. localhost:6379") + flag.StringVar(&redisPasswd, "redis_passwd", "", "The redis server's password if any") + flag.Int64Var(&redisDb, "redis_db", 0, "the redis DB to use") + flag.Parse() + if redisAddr == "" { + fmt.Println("[WARN] You need to specify a value for the redis_addr flag!\n") + printUsage() + os.Exit(1) + } + redisClient := redis.NewTCPClient(&redis.Options{ + Addr: redisAddr, + Password: redisPasswd, // no password set + DB: redisDb, + }) + err := clearRedisKeys(redisClient, dirKeyPrefix, dirMaxIdKey) + if err != nil { + os.Exit(-1) + } + dm = InitDirectoryManger(redisClient) + ret := m.Run() + //clean used keys + clearRedisKeys(redisClient, dirKeyPrefix, dirMaxIdKey) + redisClient.Close() + os.Exit(ret) +} diff --git a/go/filer/redis_store/redis_store.go b/go/filer/redis_store/redis_store.go index e71776845..ef6197144 100644 --- a/go/filer/redis_store/redis_store.go +++ b/go/filer/redis_store/redis_store.go @@ -1,11 +1,16 @@ package redis_store import ( + "fmt" + "path/filepath" + + "github.com/chrislusf/seaweedfs/go/filer" redis "gopkg.in/redis.v2" ) type RedisStore struct { Client *redis.Client + dm *DirectoryManager } func NewRedisStore(hostPort string, database int) *RedisStore { @@ -14,31 +19,20 @@ func NewRedisStore(hostPort string, database int) *RedisStore { Password: "", // no password set DB: int64(database), }) - return &RedisStore{Client: client} + dm := InitDirectoryManger(client) + return &RedisStore{Client: client, dm: dm} } func (s *RedisStore) Get(fullFileName string) (fid string, err error) { - fid, err = s.Client.Get(fullFileName).Result() - if err == redis.Nil { - err = nil - } - return fid, err + return s.dm.FindFile(fullFileName) } func (s *RedisStore) Put(fullFileName string, fid string) (err error) { - _, err = s.Client.Set(fullFileName, fid).Result() - if err == redis.Nil { - err = nil - } - return err + return s.dm.PutFile(fullFileName, fid) } -// Currently the fid is not returned +// Currently the fid is returned func (s *RedisStore) Delete(fullFileName string) (fid string, err error) { - _, err = s.Client.Del(fullFileName).Result() - if err == redis.Nil { - err = nil - } - return "", err + return s.dm.DeleteFile(fullFileName) } func (s *RedisStore) Close() { @@ -46,3 +40,62 @@ func (s *RedisStore) Close() { s.Client.Close() } } + +func (s *RedisStore) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { + return s.dm.FindDirectory(dirPath) +} +func (s *RedisStore) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) { + return s.dm.ListDirectories(dirPath) +} +func (s *RedisStore) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { + return s.dm.ListFiles(dirPath, lastFileName, limit) +} +func (s *RedisStore) DeleteDirectory(dirPath string, recursive bool) (err error) { + if recursive { + nDirs, _ := s.dm.GetSubDirectoriesNum(dirPath) + if nDirs > 0 { + return fmt.Errorf("Fail to delete directory %s: %d sub directories found!", dirPath, nDirs) + } + nFiles, _ := s.dm.GetFilesNum(dirPath) + if nFiles > 0 { + return fmt.Errorf("Fail to delete directory %s: %d files found!", dirPath, nFiles) + + } + + } else { + err = s.dm.DeleteDirectory(dirPath) + } + return +} + +/* +Move a folder or a file, with 4 Use cases: +mv fromDir toNewDir +mv fromDir toOldDir +mv fromFile toDir +mv fromFile toFile +*/ +func (s *RedisStore) Move(fromPath string, toPath string) error { + //first check whether fromPath is a directory + fromDid, _ := s.dm.FindDirectory(fromPath) + if fromDid > 0 { + toDid, _ := s.dm.FindDirectory(toPath) + if toDid > 0 { + //move under an existing dir + return s.dm.MoveUnderDirectory(fromPath, toPath, "") + } + //move to a new dir + return s.dm.MoveUnderDirectory(fromPath, filepath.Dir(toPath), filepath.Base(toPath)) + } + //whether fromPath is a file path + if fid, err := s.dm.DeleteFile(fromPath); err == nil { + toDid, _ := s.dm.FindDirectory(toPath) + if toDid > 0 { + //move file under an existing dir + return s.dm.PutFile(filepath.Join(toPath, filepath.Base(fromPath)), fid) + } + //move to a folder with a new name + return s.dm.PutFile(toPath, fid) + } + return fmt.Errorf("File %s is not found!", fromPath) +} diff --git a/go/weed/weed_server/filer_server.go b/go/weed/weed_server/filer_server.go index 616f11022..af837c084 100644 --- a/go/weed/weed_server/filer_server.go +++ b/go/weed/weed_server/filer_server.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/go/filer" "github.com/chrislusf/seaweedfs/go/filer/cassandra_store" + "github.com/chrislusf/seaweedfs/go/filer/distributed_filer" "github.com/chrislusf/seaweedfs/go/filer/embedded_filer" "github.com/chrislusf/seaweedfs/go/filer/flat_namespace" "github.com/chrislusf/seaweedfs/go/filer/redis_store" @@ -47,7 +48,7 @@ func NewFilerServer(r *http.ServeMux, port int, master string, dir string, colle fs.filer = flat_namespace.NewFlatNamespaceFiler(master, cassandra_store) } else if redis_server != "" { redis_store := redis_store.NewRedisStore(redis_server, redis_database) - fs.filer = flat_namespace.NewFlatNamespaceFiler(master, redis_store) + fs.filer = distributed_filer.NewDistributedFiler(master, redis_store) } else { if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil { glog.Fatalf("Can not start filer in dir %s : %v", dir, err)