Browse Source

finish redis_store's directory manager and its unit testing

pull/254/head
zhaozhi 10 years ago
parent
commit
91c2cc0dcd
  1. 224
      go/filer/redis_store/directory_manager.go
  2. 186
      go/filer/redis_store/directory_manager_test.go
  3. 44
      go/filer/redis_store/redis_store.go

224
go/filer/redis_store/directory_manager.go

@ -0,0 +1,224 @@
package redis_store
import (
"fmt"
"path/filepath"
"strconv"
"strings"
"github.com/chrislusf/seaweedfs/go/filer"
"github.com/chrislusf/seaweedfs/go/filer/embedded_filer"
"github.com/chrislusf/seaweedfs/go/glog"
redis "gopkg.in/redis.v2"
)
type DirectoryManager struct {
Client *redis.Client
dirMaxIdKey string
dirKeyPrefix string
dirFileKeyPrefix string
}
func InitDirectoryManger(client *redis.Client) *DirectoryManager {
dirMaxIdKey := "swfs:dir-max-id"
dm := &DirectoryManager{
Client: client,
dirMaxIdKey: dirMaxIdKey,
dirKeyPrefix: "d:",
dirFileKeyPrefix: "df:"}
dm.initDirectoryId()
return dm
}
//check whether directory id exists in redis, if not then init to zero.
func (dm *DirectoryManager) initDirectoryId() error {
err := dm.Client.Get(dm.dirMaxIdKey).Err()
if err == redis.Nil {
err = dm.Client.Set(dm.dirMaxIdKey, "1").Err()
if err != nil {
glog.Errorln("init dir max id error:", err)
}
} else if err != nil {
glog.Errorln("get dir max id error:", err)
}
return err
}
//use redis lua script to make all parent dirs in the dirPath, like linux command `mkdir -p`
func (dm *DirectoryManager) MakeDirectory(dirPath string) (filer.DirectoryId, error) {
dirPath = embedded_filer.CleanFilePath(dirPath)
if dirPath == "/" {
return 1, nil
}
parts := strings.Split(dirPath, "/")
//'d' stands for directory. root must end with a slash
root := dm.dirKeyPrefix + "/"
script := redis.NewScript(`
local root=KEYS[1]
local dirMaxIdKey=KEYS[2]
local did, exists
for i, v in ipairs(ARGV) do
exists=redis.call('hexists', root, v)
if exists == 0 then
did=redis.call('incr', dirMaxIdKey)
redis.call('hset', root, v, did)
end
if i==1 then
root=root .. v
else
root=root..'/'..v
end
end
redis.call('set', 'last-dir-id', did)
return did
`)
result, err := script.Run(dm.Client, []string{root, dm.dirMaxIdKey}, parts[1:]).Result()
if err != nil {
glog.Errorln("redis eval make directory script error:", err)
return 0, err
}
did, ok := result.(int64)
if !ok {
glog.Errorln("convert result:", result, " get:", did)
}
return filer.DirectoryId(did), err
}
//delete directory dirPath and its sub-directories recursively;
//it's not this function's responsibility to check whether dirPath is empty.
func (dm *DirectoryManager) DeleteDirectory(dirPath string) error {
dirPath = embedded_filer.CleanFilePath(dirPath)
if dirPath == "/" {
return fmt.Errorf("Can not delete %s", dirPath)
}
script := redis.NewScript(`
local delSubs
delSubs = function(dir)
local subs=redis.call('hgetall', dir);
for i, v in ipairs(subs) do
if i%2 ~= 0 then
local subd=dir..'/'..v;
delSubs(subd);
end
end
end
delSubs(KEYS[1])
`)
err := script.Run(dm.Client, []string{dm.dirKeyPrefix + dirPath}, nil).Err()
return err
}
func (dm *DirectoryManager) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) {
entry, err := dm.findDirectory(dirPath)
return entry.Id, err
}
func (dm *DirectoryManager) findDirectory(dirPath string) (filer.DirectoryEntry, error) {
dirPath = embedded_filer.CleanFilePath(dirPath)
if dirPath == "/" {
return filer.DirectoryEntry{Name: "/", Id: 1}, nil
}
basePart := filepath.Base(dirPath)
parentPart := filepath.Dir(dirPath)
did, err := dm.Client.HGet(dm.dirKeyPrefix+parentPart, basePart).Int64()
//adjust redis.Nil to golang nil
if err == redis.Nil {
err = nil
}
if err != nil {
return filer.DirectoryEntry{}, err
}
return filer.DirectoryEntry{Name: basePart, Id: filer.DirectoryId(did)}, nil
}
/*
moving is finished by following steps:
1.get old dir's directory id
2.delete old dir from its parent's hash table
3.add old dir's directory id into new parent dir's hash table, if newName is not empty, use it
4.if the full old dir path key exists(its a non-empty directory), rename it to full new dir path key
*/
func (dm *DirectoryManager) MoveUnderDirectory(oldDirPath string, newParentDirPath string, newName string) error {
oldDirPath = embedded_filer.CleanFilePath(oldDirPath)
oldDirName := filepath.Base(oldDirPath) //old dir's name without path
oldParentDir := filepath.Dir(oldDirPath)
newParentDirPath = embedded_filer.CleanFilePath(newParentDirPath)
newParentDirName := filepath.Base(newParentDirPath) // new parent dir's name without path
newParentParentDir := filepath.Dir(newParentDirPath) //newParentDirPath's parent directory
/*
lua script comments:
suppose oldDirPathKey is d:/a/b/c, newParentDirPathKey is d:/a/b/d, newName is "";
then oldParentDirKey is d:/a/b, oldDirName is c, and
newParentParentDirKey is d:/a/b, newParentDirName is d;
newTargetName is c
*/
script := redis.NewScript(`
local oldDirPathKey=KEYS[1]
local newParentDirPathKey=KEYS[2]
local oldParentDirKey=ARGV[1]
local oldDirName=ARGV[2]
local newParentParentDirKey=ARGV[3]
local newParentDirName=ARGV[4]
local newTargetName=ARGV[5]
local oldDirId=redis.call('hget', oldParentDirKey, oldDirName)
local newParentDirId=redis.call('hget', newParentParentDirKey, newParentDirName)
if oldDirId == false then
return -1
end
if newParentDirId == false then
return -2
end
redis.call('hdel', oldParentDirKey, oldDirName)
redis.call('hset', newParentDirPathKey, newTargetName, oldDirId)
local exists=redis.call('exists', oldDirPathKey)
if exists==1 then
redis.call('rename', oldDirPathKey, newParentDirPathKey..'/'..newTargetName)
end
return 0
`)
if newName == "" {
newName = oldDirName
}
result, err := script.Run(
dm.Client,
[]string{
dm.dirKeyPrefix + oldDirPath,
dm.dirKeyPrefix + newParentDirPath},
[]string{
dm.dirKeyPrefix + oldParentDir,
oldDirName,
dm.dirKeyPrefix + newParentParentDir,
newParentDirName, newName}).Result()
if err != nil {
glog.Errorln("redis eval move directory script error:", err)
} else {
ret, ok := result.(int64)
if !ok {
glog.Errorln("convert result:", result, " get:", ret)
}
if ret == -1 {
err = fmt.Errorf("src dir: %s not exists", oldDirPath)
} else if ret == -2 {
err = fmt.Errorf("dest dir: %s not exists", newParentDirPath)
}
}
return err
}
func (dm *DirectoryManager) ListDirectories(dirPath string) (dirNames []filer.DirectoryEntry, err error) {
dirPath = embedded_filer.CleanFilePath(dirPath)
result, le := dm.Client.HGetAllMap(dm.dirKeyPrefix + dirPath).Result()
if le != nil {
glog.Errorf("get sub-directories of %s error:%v", dirPath, err)
return dirNames, le
}
for k, v := range result {
did, _ := strconv.Atoi(v)
entry := filer.DirectoryEntry{Name: k, Id: filer.DirectoryId(did)}
dirNames = append(dirNames, entry)
}
return dirNames, nil
}

186
go/filer/redis_store/directory_manager_test.go

@ -0,0 +1,186 @@
package redis_store
import (
"fmt"
"os"
"path/filepath"
"testing"
redis "gopkg.in/redis.v2"
)
var dm *DirectoryManager = nil
func TestDirectory(t *testing.T) {
//root dir
did, err := dm.MakeDirectory("/")
if err != nil {
t.Errorf("make root dir error:%v\n", err)
}
if did != 1 {
t.Errorf("root dir's id is not 1!")
}
//make some dirs
dir := "/a/b/c"
did, err = dm.MakeDirectory(dir)
if err != nil {
t.Errorf("make dirs:%s, error:%v\n", dir, err)
}
if did != 4 {
t.Errorf("dir:%s's id is %d, 4 is expected!", dir, did)
}
//check its parent dir
parentDir := filepath.Dir(dir)
did, err = dm.FindDirectory(parentDir)
if err != nil {
t.Errorf("get %s's parent dir %s, error:%v\n", dir, parentDir, err)
}
if did != 3 {
t.Errorf("dir:%s's id is %d, 3 is expected!", parentDir, did)
}
//make another dir with same parent
dir = "/a/b/d"
did, err = dm.MakeDirectory(dir)
if err != nil {
t.Errorf("make dirs:%s, error:%v\n", dir, err)
}
if did != 5 {
t.Errorf("dir:%s's id is %d, 5 is expected!", dir, did)
}
//check its parent dir
parentDir = filepath.Dir(dir)
did, err = dm.FindDirectory(parentDir)
if err != nil {
t.Errorf("get %s's parent dir %s, error:%v\n", dir, parentDir, err)
}
if did != 3 {
t.Errorf("dir:%s's id is %d, 3 is expected!", parentDir, did)
}
//find /a
dir = "/a"
did, err = dm.FindDirectory(dir)
if err != nil {
t.Errorf("find dir %s, error:%v\n", dir, err)
}
if did != 2 {
t.Errorf("dir:%s's id is %d, 2 is expected!", dir, did)
}
//make /a/b/c/e, so /a/b/c has a sub-directory
dir = "/a/b/c/e"
did, err = dm.MakeDirectory(dir)
if err != nil {
t.Errorf("make dirs:%s, error:%v\n", dir, err)
}
if did != 6 {
t.Errorf("dir:%s's id is %d, 6 is expected!", dir, did)
}
/*
*
* move to an existing dir
*
*
*/
//move /a/b/c under /a/b/d
from := "/a/b/c"
to := "/a/b/d"
err = dm.MoveUnderDirectory(from, to, "")
if err != nil {
t.Errorf("move %s to %s error:%v", from, to, err)
}
//now /a/b/c should not exist
did, err = dm.FindDirectory(from)
if err != nil {
t.Errorf("find from dir %s error:%v", from, err)
}
if did != 0 {
t.Errorf("%s still exists after moved under %s", from, to)
}
//now /a/b/d/c should exist
dir = filepath.Join(to, filepath.Base(from))
did, err = dm.FindDirectory(dir)
if did != 4 {
t.Errorf("new dir %s has a wrong dir id:%d, 4 is expected", dir, did)
}
//now /a/b/d/c/e also should exist, this indicates c is moved under /a/b/d entirely, include its sub-directories
dir = "/a/b/d/c/e"
did, err = dm.FindDirectory(dir)
if did != 6 {
t.Errorf("new dir %s has a wrong dir id:%d, 6 is expected", dir, did)
}
/*
*
* move to an new dir
*
*/
//now move /a/b/d/c to /a/b/f, f is a new dir, this means c will be changed to f, but dir id will not be changed
from = "/a/b/d/c"
to = "/a/b/f"
err = dm.MoveUnderDirectory(from, filepath.Dir(to), filepath.Base(to))
if err != nil {
t.Errorf("move %s to %s error:%v", from, to, err)
}
// /a/b/d/c should not exist
dir = from
did, err = dm.FindDirectory(dir)
if err != nil {
t.Errorf("find dir %s error:%v", dir, err)
}
if did != 0 {
t.Errorf("%s still exists after moved to %s", from, to)
}
// /a/b/f should exist
dir = to
did, err = dm.FindDirectory(dir)
if did != 4 {
t.Errorf("new dir %s has a wrong dir id:%d, 4 is expected", dir, did)
}
// /a/b/f/e also should exist
dir = to + "/e"
did, err = dm.FindDirectory(dir)
if did != 6 {
t.Errorf("new dir %s has a wrong dir id:%d, 6 is expected", dir, did)
}
}
func clearRedisKeys(client *redis.Client, dirKeyPrefix string, dirMaxIdKey string) error {
result, err := client.Keys(dirKeyPrefix + "*").Result()
if err != nil {
fmt.Println("get redis keys error:", err)
return err
}
if len(result) > 0 {
n, err := client.Del(result...).Result()
if err != nil {
fmt.Println("del keys error:", err)
} else {
fmt.Println("del", n, " keys.")
}
}
n, err := client.Del(dirMaxIdKey).Result()
if err != nil {
fmt.Printf("del dirMaxIdKey:%s, error:%v\n", dirMaxIdKey, err)
} else {
fmt.Println("del", n, " keys.")
}
return err
}
func TestMain(m *testing.M) {
redisClient := redis.NewTCPClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0,
})
err := clearRedisKeys(redisClient, "d:", "swfs:dir-max-id")
if err != nil {
os.Exit(-1)
}
dm = InitDirectoryManger(redisClient)
ret := m.Run()
redisClient.Close()
os.Exit(ret)
}

44
go/filer/redis_store/redis_store.go

@ -1,6 +1,8 @@
package redis_store package redis_store
import ( import (
"github.com/chrislusf/seaweedfs/go/filer"
"github.com/chrislusf/seaweedfs/go/filer/flat_namespace"
redis "gopkg.in/redis.v2" redis "gopkg.in/redis.v2"
) )
@ -17,6 +19,31 @@ func NewRedisStore(hostPort string, database int) *RedisStore {
return &RedisStore{Client: client} return &RedisStore{Client: client}
} }
/*
func (s *RedisStore) Get(fullFileName string) (fid string, err error) {
fid, err = s.Client.Get(fullFileName).Result()
if err == redis.Nil {
err = nil
}
return fid, err
}
func (s *RedisStore) Put(fullFileName string, fid string) (err error) {
_, err = s.Client.Set(fullFileName, fid).Result()
if err == redis.Nil {
err = nil
}
return err
}
// Currently the fid is not returned
func (s *RedisStore) Delete(fullFileName string) (fid string, err error) {
_, err = s.Client.Del(fullFileName).Result()
if err == redis.Nil {
err = nil
}
return "", err
}
*/
func (s *RedisStore) Get(fullFileName string) (fid string, err error) { func (s *RedisStore) Get(fullFileName string) (fid string, err error) {
fid, err = s.Client.Get(fullFileName).Result() fid, err = s.Client.Get(fullFileName).Result()
if err == redis.Nil { if err == redis.Nil {
@ -46,3 +73,20 @@ func (s *RedisStore) Close() {
s.Client.Close() s.Client.Close()
} }
} }
func (c *RedisStore) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) {
return 0, flat_namespace.ErrNotImplemented
}
func (c *RedisStore) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) {
return nil, flat_namespace.ErrNotImplemented
}
func (c *RedisStore) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
return nil, flat_namespace.ErrNotImplemented
}
func (c *RedisStore) DeleteDirectory(dirPath string, recursive bool) (err error) {
return flat_namespace.ErrNotImplemented
}
func (c *RedisStore) Move(fromPath string, toPath string) error {
return flat_namespace.ErrNotImplemented
}
Loading…
Cancel
Save