Browse Source

Merge 4561f97a93 into f7f9129b05

pull/254/merge
zhaozhi406 10 years ago
parent
commit
2fffb8a1ea
  1. 42
      go/filer/distributed_filer/distributed_filer.go
  2. 15
      go/filer/distributed_filer/distributed_store.go
  3. 395
      go/filer/redis_store/directory_manager.go
  4. 288
      go/filer/redis_store/directory_manager_test.go
  5. 87
      go/filer/redis_store/redis_store.go
  6. 3
      go/weed/weed_server/filer_server.go

42
go/filer/distributed_filer/distributed_filer.go

@ -0,0 +1,42 @@
package distributed_filer
import "github.com/chrislusf/seaweedfs/go/filer"
type DistributedFiler struct {
master string
store DistributedStore
}
func NewDistributedFiler(master string, store DistributedStore) *DistributedFiler {
return &DistributedFiler{
master: master,
store: store,
}
}
func (filer *DistributedFiler) CreateFile(fullFileName string, fid string) (err error) {
return filer.store.Put(fullFileName, fid)
}
func (filer *DistributedFiler) FindFile(fullFileName string) (fid string, err error) {
return filer.store.Get(fullFileName)
}
func (filer *DistributedFiler) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) {
return filer.store.FindDirectory(dirPath)
}
func (filer *DistributedFiler) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) {
return filer.store.ListDirectories(dirPath)
}
func (filer *DistributedFiler) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
return filer.store.ListFiles(dirPath, lastFileName, limit)
}
func (filer *DistributedFiler) DeleteDirectory(dirPath string, recursive bool) (err error) {
return filer.store.DeleteDirectory(dirPath, recursive)
}
func (filer *DistributedFiler) DeleteFile(fullFileName string) (fid string, err error) {
return filer.store.Delete(fullFileName)
}
func (filer *DistributedFiler) Move(fromPath string, toPath string) error {
return filer.store.Move(fromPath, toPath)
}

15
go/filer/distributed_filer/distributed_store.go

@ -0,0 +1,15 @@
package distributed_filer
import "github.com/chrislusf/seaweedfs/go/filer"
type DistributedStore interface {
Put(fullFileName string, fid string) (err error)
Get(fullFileName string) (fid string, err error)
Delete(fullFileName string) (fid string, err error)
FindDirectory(dirPath string) (dirId filer.DirectoryId, err error)
ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error)
ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error)
DeleteDirectory(dirPath string, recursive bool) (err error)
Move(fromPath string, toPath string) error
}

395
go/filer/redis_store/directory_manager.go

@ -0,0 +1,395 @@
package redis_store
import (
"fmt"
"path/filepath"
"sort"
"strconv"
"strings"
"github.com/chrislusf/seaweedfs/go/filer"
"github.com/chrislusf/seaweedfs/go/filer/embedded_filer"
"github.com/chrislusf/seaweedfs/go/glog"
redis "gopkg.in/redis.v2"
)
type DirectoryManager struct {
Client *redis.Client
dirMaxIdKey string
dirKeyPrefix string
dirFileKeyPrefix string
}
func InitDirectoryManger(client *redis.Client) *DirectoryManager {
dirMaxIdKey := "swfs:dir-max-id"
dm := &DirectoryManager{
Client: client,
dirMaxIdKey: dirMaxIdKey,
dirKeyPrefix: "d:",
dirFileKeyPrefix: "df:"}
dm.initDirectoryId()
return dm
}
//check whether directory id exists in redis, if not then init to zero.
func (dm *DirectoryManager) initDirectoryId() error {
err := dm.Client.Get(dm.dirMaxIdKey).Err()
if err == redis.Nil {
err = dm.Client.Set(dm.dirMaxIdKey, "1").Err()
if err != nil {
glog.Errorln("init dir max id error:", err)
}
} else if err != nil {
glog.Errorln("get dir max id error:", err)
}
return err
}
//use redis lua script to make all parent dirs in the dirPath, like linux command `mkdir -p`
func (dm *DirectoryManager) MakeDirectory(dirPath string) (filer.DirectoryId, error) {
dirPath = embedded_filer.CleanFilePath(dirPath)
if dirPath == "/" {
return 1, nil
}
parts := strings.Split(dirPath, "/")
//'d' stands for directory. root must end with a slash
root := dm.dirKeyPrefix + "/"
script := redis.NewScript(`
local root=KEYS[1]
local dirMaxIdKey=KEYS[2]
local did
for i, v in ipairs(ARGV) do
did=redis.call('hget', root, v)
if did == false then
did=redis.call('incr', dirMaxIdKey)
redis.call('hset', root, v, did)
end
if i==1 then
root=root .. v
else
root=root..'/'..v
end
end
return did
`)
result, err := script.Run(dm.Client, []string{root, dm.dirMaxIdKey}, parts[1:]).Result()
if err != nil {
glog.Errorln("redis eval make directory script error:", err)
return 0, err
}
did, ok := result.(int64)
if !ok {
glog.Errorln("convert result:", result, " get:", did)
}
return filer.DirectoryId(did), err
}
//delete directory dirPath and its sub-directories or files recursively ;
//it's not this function's responsibility to check whether dirPath is en empty directory.
func (dm *DirectoryManager) DeleteDirectory(dirPath string) error {
dirPath = embedded_filer.CleanFilePath(dirPath)
if dirPath == "/" {
return fmt.Errorf("Can not delete %s", dirPath)
}
dirPathParent := filepath.Dir(dirPath)
dirPathName := filepath.Base(dirPath)
/*
lua comments:
1. delete dirPath's sub-directories and files recursively
2. delete dirPath itself from its parent dir
*/
script := redis.NewScript(`
local delSubs
local dirKeyPrefix=KEYS[1]
local dirFileKeyPrefix=KEYS[2]
local dirPathParent=ARGV[1]
local dirPathName=ARGV[2]
delSubs = function(dir)
local subs=redis.call('hgetall', dirKeyPrefix..dir);
for i, v in ipairs(subs) do
if i%2 ~= 0 then
redis.call('hdel', dirKeyPrefix..dir, v)
local subd=dir..'/'..v;
delSubs(subd);
end
end
redis.call('del', dirFileKeyPrefix..dir)
end
delSubs(dirPathParent..'/'..dirPathName)
redis.call('hdel', dirKeyPrefix..dirPathParent, dirPathName)
return 0
`)
//we do not use lua to get dirPath's parent dir and its basename, so do it in golang
err := script.Run(dm.Client, []string{dm.dirKeyPrefix, dm.dirFileKeyPrefix}, []string{dirPathParent, dirPathName}).Err()
return err
}
func (dm *DirectoryManager) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) {
entry, err := dm.findDirectory(dirPath)
return entry.Id, err
}
func (dm *DirectoryManager) findDirectory(dirPath string) (filer.DirectoryEntry, error) {
dirPath = embedded_filer.CleanFilePath(dirPath)
if dirPath == "/" {
return filer.DirectoryEntry{Name: "/", Id: 1}, nil
}
basePart := filepath.Base(dirPath)
parentPart := filepath.Dir(dirPath)
did, err := dm.Client.HGet(dm.dirKeyPrefix+parentPart, basePart).Int64()
//adjust redis.Nil to golang nil
if err == redis.Nil {
err = nil
}
if err != nil {
return filer.DirectoryEntry{}, err
}
return filer.DirectoryEntry{Name: basePart, Id: filer.DirectoryId(did)}, nil
}
/*
moving is finished by following steps:
1.get old dir's directory id
2.delete old dir from its parent's hash table
3.add old dir's directory id into new parent dir's hash table, if newName is not empty, use it
4.if the full old dir path key exists(its a non-empty directory), rename it to full new dir path key
after moving is done, it returns 0, -1 indicates that source dir is not found, -2 indicates that destination dir is not found
*/
func (dm *DirectoryManager) MoveUnderDirectory(oldDirPath string, newParentDirPath string, newName string) error {
oldDirPath = embedded_filer.CleanFilePath(oldDirPath)
oldDirName := filepath.Base(oldDirPath) //old dir's name without path
oldParentDir := filepath.Dir(oldDirPath)
newParentDirPath = embedded_filer.CleanFilePath(newParentDirPath)
newParentDirName := filepath.Base(newParentDirPath) // new parent dir's name without path
newParentParentDir := filepath.Dir(newParentDirPath) //newParentDirPath's parent directory
/*
lua script comments:
suppose oldDirPathKey is d:/a/b/c, newParentDirPathKey is d:/a/b/d, newName is "";
then oldParentDirKey is d:/a/b, oldDirName is c, and
newParentParentDirKey is d:/a/b, newParentDirName is d;
newTargetName is c
*/
script := redis.NewScript(`
local oldDirPathKey=KEYS[1]
local newParentDirPathKey=KEYS[2]
local oldParentDirKey=ARGV[1]
local oldDirName=ARGV[2]
local newParentParentDirKey=ARGV[3]
local newParentDirName=ARGV[4]
local newTargetName=ARGV[5]
local oldDirId=redis.call('hget', oldParentDirKey, oldDirName)
local newParentDirId=redis.call('hget', newParentParentDirKey, newParentDirName)
if oldDirId == false then
return -1
end
if newParentDirId == false then
return -2
end
redis.call('hdel', oldParentDirKey, oldDirName)
redis.call('hset', newParentDirPathKey, newTargetName, oldDirId)
local exists=redis.call('exists', oldDirPathKey)
if exists==1 then
redis.call('rename', oldDirPathKey, newParentDirPathKey..'/'..newTargetName)
end
return 0
`)
if newName == "" {
newName = oldDirName
}
result, err := script.Run(
dm.Client,
[]string{
dm.dirKeyPrefix + oldDirPath,
dm.dirKeyPrefix + newParentDirPath},
[]string{
dm.dirKeyPrefix + oldParentDir,
oldDirName,
dm.dirKeyPrefix + newParentParentDir,
newParentDirName, newName}).Result()
if err != nil {
glog.Errorln("redis eval move directory script error:", err)
} else {
ret, ok := result.(int64)
if !ok {
glog.Errorln("convert result:", result, " get:", ret)
}
if ret == -1 {
err = fmt.Errorf("src dir: %s not exists", oldDirPath)
} else if ret == -2 {
err = fmt.Errorf("dest dir: %s not exists", newParentDirPath)
}
}
return err
}
//return a dir's sub-directories, directories' name are sorted lexicographically
func (dm *DirectoryManager) ListDirectories(dirPath string) (dirNames []filer.DirectoryEntry, err error) {
dirPath = embedded_filer.CleanFilePath(dirPath)
result, le := dm.Client.HGetAllMap(dm.dirKeyPrefix + dirPath).Result()
if le != nil {
glog.Errorf("get sub-directories of %s error:%v", dirPath, err)
return dirNames, le
}
//sort entries by directories' name
keys := make(sort.StringSlice, len(result))
i := 0
for k, _ := range result {
keys[i] = k
i++
}
keys.Sort()
for _, k := range keys {
v := result[k]
did, _ := strconv.Atoi(v)
entry := filer.DirectoryEntry{Name: k, Id: filer.DirectoryId(did)}
dirNames = append(dirNames, entry)
}
return dirNames, nil
}
//get the amount of directories under a directory
func (dm *DirectoryManager) GetSubDirectoriesNum(dirPath string) (int64, error) {
dirPath = embedded_filer.CleanFilePath(dirPath)
return dm.getSubItemsNum(dm.dirKeyPrefix + dirPath)
}
//use lua script to make directories and then put file for atomic
func (dm *DirectoryManager) PutFile(fullFileName string, fid string) error {
fullFileName = embedded_filer.CleanFilePath(fullFileName)
dirPath := filepath.Dir(fullFileName)
fname := filepath.Base(fullFileName)
parts := strings.Split(dirPath, "/")
//'d' stands for directory. root must end with a slash
root := dm.dirKeyPrefix + "/"
script := redis.NewScript(`
local root=KEYS[1]
local dirMaxIdKey=KEYS[2]
local dirFileKeyPrefix=KEYS[3]
local fname=KEYS[4]
local fid=KEYS[5]
local dirPath=table.concat(ARGV, '/')
local did
for i, v in ipairs(ARGV) do
did=redis.call('hget', root, v)
if did == false then
did=redis.call('incr', dirMaxIdKey)
redis.call('hset', root, v, did)
end
if i==1 then
root=root .. v
else
root=root..'/'..v
end
end
redis.call('hset', dirFileKeyPrefix..'/'..dirPath, fname, fid)
return did
`)
err := script.Run(dm.Client, []string{root, dm.dirMaxIdKey, dm.dirFileKeyPrefix, fname, fid}, parts[1:]).Err()
if err != nil {
glog.Errorln("redis eval put file script error:", err)
}
return err
}
func (dm *DirectoryManager) FindFile(fullFileName string) (fid string, err error) {
fullFileName = embedded_filer.CleanFilePath(fullFileName)
dirPath := filepath.Dir(fullFileName)
fname := filepath.Base(fullFileName)
result, err := dm.Client.HGet(dm.dirFileKeyPrefix+dirPath, fname).Result()
if err == redis.Nil {
err = nil
}
if err != nil {
glog.Errorf("get file %s error:%v", fullFileName, err)
}
return result, err
}
func (dm *DirectoryManager) DeleteFile(fullFileName string) (fid string, err error) {
fullFileName = embedded_filer.CleanFilePath(fullFileName)
dirPath := filepath.Dir(fullFileName)
fname := filepath.Base(fullFileName)
script := redis.NewScript(`
local dirPathKey=KEYS[1]
local fname = ARGV[1]
local fid=redis.call('hget', dirPathKey, fname)
if fid ~= false then
redis.call('hdel', dirPathKey, fname)
end
return fid
`)
result, err := script.Run(dm.Client, []string{dm.dirFileKeyPrefix + dirPath}, []string{fname}).Result()
if err != nil {
glog.Errorf("delete file %s error:%v\n", fullFileName, err)
}
fid, ok := result.(string)
if !ok {
glog.Errorf("convert result %v to string failed", result)
}
return fid, err
}
//list files under dirPath, use lastFileName and limit for pagination
//in fact, this implemention has a bug, you can not get first file of the first page;
//the api needs to be modified
func (dm *DirectoryManager) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
dirPath = embedded_filer.CleanFilePath(dirPath)
result, le := dm.Client.HGetAllMap(dm.dirFileKeyPrefix + dirPath).Result()
if le != nil {
glog.Errorf("get files of %s error:%v", dirPath, err)
return files, le
}
//if the lastFileName argument is ok
//when list first page, pass lastFileName an empty string
if _, ok := result[lastFileName]; ok || lastFileName == "" {
//sort entries by file names
//when files amount is large, here may be slow
nResult := len(result)
keys := make(sort.StringSlice, nResult)
i := 0
for k, _ := range result {
keys[i] = k
i++
}
keys.Sort()
index := -1
if ok {
index = keys.Search(lastFileName)
}
cnt := 0
for _, k := range keys[index+1:] {
fid := result[k]
entry := filer.FileEntry{Name: k, Id: filer.FileId(fid)}
files = append(files, entry)
cnt++
if cnt == limit {
break
}
}
}
return files, le
}
//get the amount of files under a directory
func (dm *DirectoryManager) GetFilesNum(dirPath string) (int64, error) {
dirPath = embedded_filer.CleanFilePath(dirPath)
return dm.getSubItemsNum(dm.dirFileKeyPrefix + dirPath)
}
//get a hash key's fields number
func (dm *DirectoryManager) getSubItemsNum(key string) (int64, error) {
result, err := dm.Client.HLen(key).Result()
if err == redis.Nil {
err = nil
}
if err != nil {
glog.Errorf("hlen %s error:%v", key, err)
}
return result, err
}

288
go/filer/redis_store/directory_manager_test.go

@ -0,0 +1,288 @@
package redis_store
import (
"flag"
"fmt"
"os"
"path/filepath"
"testing"
redis "gopkg.in/redis.v2"
)
var dm *DirectoryManager = nil
func TestDirectory(t *testing.T) {
//root dir
did, err := dm.MakeDirectory("/")
if err != nil {
t.Errorf("make root dir error:%v\n", err)
}
if did != 1 {
t.Errorf("root dir's id is not 1!")
}
//make some dirs
dir := "/a/b/c"
did, err = dm.MakeDirectory(dir)
if err != nil {
t.Errorf("make dirs:%s, error:%v\n", dir, err)
}
if did != 4 {
t.Errorf("dir:%s's id is %d, 4 is expected!", dir, did)
}
//check its parent dir
parentDir := filepath.Dir(dir)
did, err = dm.FindDirectory(parentDir)
if err != nil {
t.Errorf("get %s's parent dir %s, error:%v\n", dir, parentDir, err)
}
if did != 3 {
t.Errorf("dir:%s's id is %d, 3 is expected!", parentDir, did)
}
//make another dir with same parent
dir = "/a/b/d"
did, err = dm.MakeDirectory(dir)
if err != nil {
t.Errorf("make dirs:%s, error:%v\n", dir, err)
}
if did != 5 {
t.Errorf("dir:%s's id is %d, 5 is expected!", dir, did)
}
//check its parent dir
parentDir = filepath.Dir(dir)
did, err = dm.FindDirectory(parentDir)
if err != nil {
t.Errorf("get %s's parent dir %s, error:%v\n", dir, parentDir, err)
}
if did != 3 {
t.Errorf("dir:%s's id is %d, 3 is expected!", parentDir, did)
}
//find /a
dir = "/a"
did, err = dm.FindDirectory(dir)
if err != nil {
t.Errorf("find dir %s, error:%v\n", dir, err)
}
if did != 2 {
t.Errorf("dir:%s's id is %d, 2 is expected!", dir, did)
}
//make /a/b/c/e, so /a/b/c has a sub-directory
dir = "/a/b/c/e"
did, err = dm.MakeDirectory(dir)
if err != nil {
t.Errorf("make dirs:%s, error:%v\n", dir, err)
}
if did != 6 {
t.Errorf("dir:%s's id is %d, 6 is expected!", dir, did)
}
/*
*
* move to an existing dir
*
*
*/
//move /a/b/c under /a/b/d
from := "/a/b/c"
to := "/a/b/d"
err = dm.MoveUnderDirectory(from, to, "")
if err != nil {
t.Errorf("move %s to %s error:%v", from, to, err)
}
//now /a/b/c should not exist
did, err = dm.FindDirectory(from)
if err != nil {
t.Errorf("find from dir %s error:%v", from, err)
}
if did != 0 {
t.Errorf("%s still exists after moved under %s", from, to)
}
//now /a/b/d/c should exist
dir = filepath.Join(to, filepath.Base(from))
did, err = dm.FindDirectory(dir)
if did != 4 {
t.Errorf("new dir %s has a wrong dir id:%d, 4 is expected", dir, did)
}
//now /a/b/d/c/e also should exist, this indicates c is moved under /a/b/d entirely, include its sub-directories
dir = "/a/b/d/c/e"
did, err = dm.FindDirectory(dir)
if did != 6 {
t.Errorf("new dir %s has a wrong dir id:%d, 6 is expected", dir, did)
}
/*
*
* move to a new dir
*
*/
//now move /a/b/d/c to /a/b/f, f is a new dir, this means c will be changed to f, but dir id will not be changed
from = "/a/b/d/c"
to = "/a/b/f"
err = dm.MoveUnderDirectory(from, filepath.Dir(to), filepath.Base(to))
if err != nil {
t.Errorf("move %s to %s error:%v", from, to, err)
}
// /a/b/d/c should not exist
dir = from
did, err = dm.FindDirectory(dir)
if err != nil {
t.Errorf("find dir %s error:%v", dir, err)
}
if did != 0 {
t.Errorf("%s still exists after moved to %s", from, to)
}
// /a/b/f should exist
dir = to
did, err = dm.FindDirectory(dir)
if did != 4 {
t.Errorf("new dir %s has a wrong dir id:%d, 4 is expected", dir, did)
}
// /a/b/f/e also should exist
dir = to + "/e"
did, err = dm.FindDirectory(dir)
if did != 6 {
t.Errorf("new dir %s has a wrong dir id:%d, 6 is expected", dir, did)
}
/*
* list a dir's sub-directories
*
*/
dir = "/a/b"
entries, err := dm.ListDirectories(dir)
if !(entries[0].Name == "d" && entries[1].Name == "f") {
t.Errorf("get entries:%v, expect 'd', 'f'", entries)
}
/*
* delete a dir: /a/b/f/e
*
*/
dir = "/a/b/f"
err = dm.DeleteDirectory(dir)
if err != nil {
t.Errorf("delete dir:%s error:%v", dir, err)
}
// now the deleted dir should not exist
did, err = dm.FindDirectory(dir)
if err != nil {
t.Errorf("get deleted dir %s error:%v", dir, err)
}
if did != 0 {
t.Errorf("the dir %s is not deleted!", dir)
}
// now the deleted dir's sub-directory also should not exist
dir = "/a/b/f/e"
did, err = dm.FindDirectory(dir)
if err != nil {
t.Errorf("get deleted dir %s error:%v", dir, err)
}
if did != 0 {
t.Errorf("the dir %s is not deleted!", dir)
}
}
func TestFiles(t *testing.T) {
// put one file
fname := "/a/b/c/test.txt"
fid := "1,23"
err := dm.PutFile(fname, fid)
if err != nil {
t.Errorf("put file %s error:%v", fname, err)
}
// put another file
fname = "/a/b/c/abc.txt"
fid = "1,234"
err = dm.PutFile(fname, fid)
if err != nil {
t.Errorf("put file %s error:%v", fname, err)
}
//get file
id, err := dm.FindFile(fname)
if err != nil {
t.Errorf("get file %s error:%v", fname, err)
}
if id != fid {
t.Errorf("get wrong fid %s, expect %s", id, fid)
}
//list files
lastFileName := "abc.txt"
dir := filepath.Dir(fname)
files, err := dm.ListFiles(dir, lastFileName, 10)
if err != nil {
t.Errorf("list files for %s error:%v", dir, err)
}
if files[0].Name != "test.txt" {
t.Errorf("files list order wrong, first file is %s, expect %s", files[0].Name, "test.txt")
}
//delete file
id, err = dm.DeleteFile(fname)
if err != nil {
t.Errorf("delete file get error:%v", err)
}
if id != fid {
t.Errorf("delete file return wrong fid %s, expect %s", id, fid)
}
}
func clearRedisKeys(client *redis.Client, dirKeyPrefix string, dirMaxIdKey string) error {
result, err := client.Keys(dirKeyPrefix + "*").Result()
if err != nil {
fmt.Println("get redis keys error:", err)
return err
}
if len(result) > 0 {
n, err := client.Del(result...).Result()
if err != nil {
fmt.Println("del keys error:", err)
} else {
fmt.Println("del", n, " keys.")
}
}
n, err := client.Del(dirMaxIdKey).Result()
if err != nil {
fmt.Printf("del dirMaxIdKey:%s, error:%v\n", dirMaxIdKey, err)
} else {
fmt.Println("del", n, " keys.")
}
return err
}
//don't use flag.PrintDefaults() for help, because it show a lot of flags which are used by go test command
func printUsage() {
fmt.Println("usage:\n\tgo test github.com/chrislusf/seaweedfs/go/filer/redis_store -redis_addr localhost:6379 [-redis_passwd \"\"] [-redis_db 0]\n")
}
func TestMain(m *testing.M) {
var (
redisAddr string
redisPasswd string
redisDb int64
dirKeyPrefix = "d:"
dirMaxIdKey = "swfs:dir-max-id"
)
flag.StringVar(&redisAddr, "redis_addr", "", "A redis server to run this test, e.g. localhost:6379")
flag.StringVar(&redisPasswd, "redis_passwd", "", "The redis server's password if any")
flag.Int64Var(&redisDb, "redis_db", 0, "the redis DB to use")
flag.Parse()
if redisAddr == "" {
fmt.Println("[WARN] You need to specify a value for the redis_addr flag!\n")
printUsage()
os.Exit(1)
}
redisClient := redis.NewTCPClient(&redis.Options{
Addr: redisAddr,
Password: redisPasswd, // no password set
DB: redisDb,
})
err := clearRedisKeys(redisClient, dirKeyPrefix, dirMaxIdKey)
if err != nil {
os.Exit(-1)
}
dm = InitDirectoryManger(redisClient)
ret := m.Run()
//clean used keys
clearRedisKeys(redisClient, dirKeyPrefix, dirMaxIdKey)
redisClient.Close()
os.Exit(ret)
}

87
go/filer/redis_store/redis_store.go

@ -1,11 +1,16 @@
package redis_store
import (
"fmt"
"path/filepath"
"github.com/chrislusf/seaweedfs/go/filer"
redis "gopkg.in/redis.v2"
)
type RedisStore struct {
Client *redis.Client
dm *DirectoryManager
}
func NewRedisStore(hostPort string, database int) *RedisStore {
@ -14,31 +19,20 @@ func NewRedisStore(hostPort string, database int) *RedisStore {
Password: "", // no password set
DB: int64(database),
})
return &RedisStore{Client: client}
dm := InitDirectoryManger(client)
return &RedisStore{Client: client, dm: dm}
}
func (s *RedisStore) Get(fullFileName string) (fid string, err error) {
fid, err = s.Client.Get(fullFileName).Result()
if err == redis.Nil {
err = nil
}
return fid, err
return s.dm.FindFile(fullFileName)
}
func (s *RedisStore) Put(fullFileName string, fid string) (err error) {
_, err = s.Client.Set(fullFileName, fid).Result()
if err == redis.Nil {
err = nil
}
return err
return s.dm.PutFile(fullFileName, fid)
}
// Currently the fid is not returned
// Currently the fid is returned
func (s *RedisStore) Delete(fullFileName string) (fid string, err error) {
_, err = s.Client.Del(fullFileName).Result()
if err == redis.Nil {
err = nil
}
return "", err
return s.dm.DeleteFile(fullFileName)
}
func (s *RedisStore) Close() {
@ -46,3 +40,62 @@ func (s *RedisStore) Close() {
s.Client.Close()
}
}
func (s *RedisStore) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) {
return s.dm.FindDirectory(dirPath)
}
func (s *RedisStore) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) {
return s.dm.ListDirectories(dirPath)
}
func (s *RedisStore) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
return s.dm.ListFiles(dirPath, lastFileName, limit)
}
func (s *RedisStore) DeleteDirectory(dirPath string, recursive bool) (err error) {
if recursive {
nDirs, _ := s.dm.GetSubDirectoriesNum(dirPath)
if nDirs > 0 {
return fmt.Errorf("Fail to delete directory %s: %d sub directories found!", dirPath, nDirs)
}
nFiles, _ := s.dm.GetFilesNum(dirPath)
if nFiles > 0 {
return fmt.Errorf("Fail to delete directory %s: %d files found!", dirPath, nFiles)
}
} else {
err = s.dm.DeleteDirectory(dirPath)
}
return
}
/*
Move a folder or a file, with 4 Use cases:
mv fromDir toNewDir
mv fromDir toOldDir
mv fromFile toDir
mv fromFile toFile
*/
func (s *RedisStore) Move(fromPath string, toPath string) error {
//first check whether fromPath is a directory
fromDid, _ := s.dm.FindDirectory(fromPath)
if fromDid > 0 {
toDid, _ := s.dm.FindDirectory(toPath)
if toDid > 0 {
//move under an existing dir
return s.dm.MoveUnderDirectory(fromPath, toPath, "")
}
//move to a new dir
return s.dm.MoveUnderDirectory(fromPath, filepath.Dir(toPath), filepath.Base(toPath))
}
//whether fromPath is a file path
if fid, err := s.dm.DeleteFile(fromPath); err == nil {
toDid, _ := s.dm.FindDirectory(toPath)
if toDid > 0 {
//move file under an existing dir
return s.dm.PutFile(filepath.Join(toPath, filepath.Base(fromPath)), fid)
}
//move to a folder with a new name
return s.dm.PutFile(toPath, fid)
}
return fmt.Errorf("File %s is not found!", fromPath)
}

3
go/weed/weed_server/filer_server.go

@ -6,6 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/go/filer"
"github.com/chrislusf/seaweedfs/go/filer/cassandra_store"
"github.com/chrislusf/seaweedfs/go/filer/distributed_filer"
"github.com/chrislusf/seaweedfs/go/filer/embedded_filer"
"github.com/chrislusf/seaweedfs/go/filer/flat_namespace"
"github.com/chrislusf/seaweedfs/go/filer/redis_store"
@ -47,7 +48,7 @@ func NewFilerServer(r *http.ServeMux, port int, master string, dir string, colle
fs.filer = flat_namespace.NewFlatNamespaceFiler(master, cassandra_store)
} else if redis_server != "" {
redis_store := redis_store.NewRedisStore(redis_server, redis_database)
fs.filer = flat_namespace.NewFlatNamespaceFiler(master, redis_store)
fs.filer = distributed_filer.NewDistributedFiler(master, redis_store)
} else {
if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil {
glog.Fatalf("Can not start filer in dir %s : %v", dir, err)

Loading…
Cancel
Save