Browse Source

redis store now support directory manipulations

pull/254/head
zhaozhi 10 years ago
parent
commit
038a025c32
  1. 28
      go/filer/redis_store/directory_manager.go
  2. 111
      go/filer/redis_store/redis_store.go

28
go/filer/redis_store/directory_manager.go

@ -155,6 +155,8 @@ moving is finished by following steps:
2.delete old dir from its parent's hash table
3.add old dir's directory id into new parent dir's hash table, if newName is not empty, use it
4.if the full old dir path key exists(its a non-empty directory), rename it to full new dir path key
after moving is done, it returns 0, -1 indicates that source dir is not found, -2 indicates that destination dir is not found
*/
func (dm *DirectoryManager) MoveUnderDirectory(oldDirPath string, newParentDirPath string, newName string) error {
oldDirPath = embedded_filer.CleanFilePath(oldDirPath)
@ -250,6 +252,12 @@ func (dm *DirectoryManager) ListDirectories(dirPath string) (dirNames []filer.Di
return dirNames, nil
}
//get the amount of directories under a directory
func (dm *DirectoryManager) GetSubDirectoriesNum(dirPath string) (int64, error) {
dirPath = embedded_filer.CleanFilePath(dirPath)
return dm.getSubItemsNum(dm.dirKeyPrefix + dirPath)
}
//use lua script to make directories and then put file for atomic
func (dm *DirectoryManager) PutFile(fullFileName string, fid string) error {
fullFileName = embedded_filer.CleanFilePath(fullFileName)
@ -310,7 +318,9 @@ func (dm *DirectoryManager) DeleteFile(fullFileName string) (fid string, err err
local dirPathKey=KEYS[1]
local fname = ARGV[1]
local fid=redis.call('hget', dirPathKey, fname)
if fid != false then
redis.call('hdel', dirPathKey, fname)
end
return fid
`)
result, err := script.Run(dm.Client, []string{dm.dirFileKeyPrefix + dirPath}, []string{fname}).Result()
@ -356,3 +366,21 @@ func (dm *DirectoryManager) ListFiles(dirPath string, lastFileName string, limit
}
return files, le
}
//get the amount of files under a directory
func (dm *DirectoryManager) GetFilesNum(dirPath string) (int64, error) {
dirPath = embedded_filer.CleanFilePath(dirPath)
return dm.getSubItemsNum(dm.dirFileKeyPrefix + dirPath)
}
//get a hash key's fields number
func (dm *DirectoryManager) getSubItemsNum(key string) (int64, error) {
result, err := dm.Client.HLen(key).Result()
if err == redis.Nil {
err = nil
}
if err != nil {
glog.Errorf("hlen %s error:%v", key, err)
}
return result, err
}

111
go/filer/redis_store/redis_store.go

@ -1,8 +1,10 @@
package redis_store
import (
"fmt"
"path/filepath"
"github.com/chrislusf/seaweedfs/go/filer"
"github.com/chrislusf/seaweedfs/go/filer/flat_namespace"
redis "gopkg.in/redis.v2"
)
@ -21,53 +23,16 @@ func NewRedisStore(hostPort string, database int) *RedisStore {
return &RedisStore{Client: client, dm: dm}
}
/*
func (s *RedisStore) Get(fullFileName string) (fid string, err error) {
fid, err = s.Client.Get(fullFileName).Result()
if err == redis.Nil {
err = nil
}
return fid, err
return s.dm.FindFile(fullFileName)
}
func (s *RedisStore) Put(fullFileName string, fid string) (err error) {
_, err = s.Client.Set(fullFileName, fid).Result()
if err == redis.Nil {
err = nil
}
return err
return s.dm.PutFile(fullFileName, fid)
}
// Currently the fid is not returned
// Currently the fid is returned
func (s *RedisStore) Delete(fullFileName string) (fid string, err error) {
_, err = s.Client.Del(fullFileName).Result()
if err == redis.Nil {
err = nil
}
return "", err
}
*/
func (s *RedisStore) Get(fullFileName string) (fid string, err error) {
fid, err = s.Client.Get(fullFileName).Result()
if err == redis.Nil {
err = nil
}
return fid, err
}
func (s *RedisStore) Put(fullFileName string, fid string) (err error) {
_, err = s.Client.Set(fullFileName, fid).Result()
if err == redis.Nil {
err = nil
}
return err
}
// Currently the fid is not returned
func (s *RedisStore) Delete(fullFileName string) (fid string, err error) {
_, err = s.Client.Del(fullFileName).Result()
if err == redis.Nil {
err = nil
}
return "", err
return s.dm.DeleteFile(fullFileName)
}
func (s *RedisStore) Close() {
@ -76,19 +41,61 @@ func (s *RedisStore) Close() {
}
}
func (c *RedisStore) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) {
return c.dm.FindDirectory(dirPath)
func (s *RedisStore) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) {
return s.dm.FindDirectory(dirPath)
}
func (s *RedisStore) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) {
return s.dm.ListDirectories(dirPath)
}
func (c *RedisStore) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) {
return c.dm.ListDirectories(dirPath)
func (s *RedisStore) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
return s.dm.ListFiles(dirPath, lastFileName, limit)
}
func (c *RedisStore) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
return nil, flat_namespace.ErrNotImplemented
func (s *RedisStore) DeleteDirectory(dirPath string, recursive bool) (err error) {
if recursive {
nDirs, _ := s.dm.GetSubDirectoriesNum(dirPath)
if nDirs > 0 {
return fmt.Errorf("Fail to delete directory %s: %d sub directories found!", dirPath, nDirs)
}
func (c *RedisStore) DeleteDirectory(dirPath string, recursive bool) (err error) {
return flat_namespace.ErrNotImplemented
nFiles, _ := s.dm.GetFilesNum(dirPath)
if nFiles > 0 {
return fmt.Errorf("Fail to delete directory %s: %d files found!", dirPath, nFiles)
}
func (c *RedisStore) Move(fromPath string, toPath string) error {
return flat_namespace.ErrNotImplemented
} else {
err = s.dm.DeleteDirectory(dirPath)
}
return
}
/*
Move a folder or a file, with 4 Use cases:
mv fromDir toNewDir
mv fromDir toOldDir
mv fromFile toDir
mv fromFile toFile
*/
func (s *RedisStore) Move(fromPath string, toPath string) error {
//first check whether fromPath is a directory
fromDid, _ := s.dm.FindDirectory(fromPath)
if fromDid > 0 {
toDid, _ := s.dm.FindDirectory(toPath)
if toDid > 0 {
//move under an existing dir
return s.dm.MoveUnderDirectory(fromPath, toPath, "")
}
//move to a new dir
return s.dm.MoveUnderDirectory(fromPath, filepath.Dir(toPath), filepath.Base(toPath))
}
//whether fromPath is a file path
if fid, err := s.dm.DeleteFile(fromPath); err == nil {
toDid, _ := s.dm.FindDirectory(toPath)
if toDid > 0 {
//move file under an existing dir
return s.dm.PutFile(filepath.Join(toPath, filepath.Base(fromPath)), fid)
}
//move to a folder with a new name
return s.dm.PutFile(toPath, fid)
}
return fmt.Errorf("File %s is not found!", fromPath)
}
Loading…
Cancel
Save