Chris Lu
7 years ago
19 changed files with 1 additions and 1972 deletions
-
96weed/filer/cassandra_store/cassandra_store.go
-
22weed/filer/cassandra_store/schema.cql
-
26weed/filer/embedded_filer/design.txt
-
15weed/filer/embedded_filer/directory.go
-
312weed/filer/embedded_filer/directory_in_map.go
-
86weed/filer/embedded_filer/directory_test.go
-
156weed/filer/embedded_filer/filer_embedded.go
-
87weed/filer/embedded_filer/files_in_leveldb.go
-
29weed/filer/filer.go
-
66weed/filer/flat_namespace/flat_namespace_filer.go
-
9weed/filer/flat_namespace/flat_namespace_store.go
-
67weed/filer/mysql_store/README.md
-
270weed/filer/mysql_store/mysql_store.go
-
30weed/filer/mysql_store/mysql_store_test.go
-
456weed/filer/postgres_store/postgres_native.go
-
149weed/filer/postgres_store/postgres_store.go
-
50weed/filer/redis_store/redis_store.go
-
45weed/filer/vasto_store/design.txt
-
2weed/server/filer_server_handlers_write.go
@ -1,96 +0,0 @@ |
|||
package cassandra_store |
|||
|
|||
import ( |
|||
"fmt" |
|||
"strings" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
|
|||
"github.com/gocql/gocql" |
|||
) |
|||
|
|||
/* |
|||
|
|||
Basically you need a table just like this: |
|||
|
|||
CREATE TABLE seaweed_files ( |
|||
path varchar, |
|||
fids list<varchar>, |
|||
PRIMARY KEY (path) |
|||
); |
|||
|
|||
Need to match flat_namespace.FlatNamespaceStore interface |
|||
Put(fullFileName string, fid string) (err error) |
|||
Get(fullFileName string) (fid string, err error) |
|||
Delete(fullFileName string) (fid string, err error) |
|||
|
|||
*/ |
|||
type CassandraStore struct { |
|||
cluster *gocql.ClusterConfig |
|||
session *gocql.Session |
|||
} |
|||
|
|||
func NewCassandraStore(keyspace string, hosts string) (c *CassandraStore, err error) { |
|||
c = &CassandraStore{} |
|||
s := strings.Split(hosts, ",") |
|||
if len(s) == 1 { |
|||
glog.V(2).Info("Only one cassandra node to connect! A cluster is Recommended! Now using:", string(hosts)) |
|||
c.cluster = gocql.NewCluster(hosts) |
|||
} else if len(s) > 1 { |
|||
c.cluster = gocql.NewCluster(s...) |
|||
} |
|||
c.cluster.Keyspace = keyspace |
|||
c.cluster.Consistency = gocql.LocalQuorum |
|||
c.session, err = c.cluster.CreateSession() |
|||
if err != nil { |
|||
glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace) |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (c *CassandraStore) Put(fullFileName string, fid string) (err error) { |
|||
var input []string |
|||
input = append(input, fid) |
|||
if err := c.session.Query( |
|||
`INSERT INTO seaweed_files (path, fids) VALUES (?, ?)`, |
|||
fullFileName, input).Exec(); err != nil { |
|||
glog.V(0).Infof("Failed to save file %s with id %s: %v", fullFileName, fid, err) |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
func (c *CassandraStore) Get(fullFileName string) (fid string, err error) { |
|||
var output []string |
|||
if err := c.session.Query( |
|||
`select fids FROM seaweed_files WHERE path = ? LIMIT 1`, |
|||
fullFileName).Consistency(gocql.One).Scan(&output); err != nil { |
|||
if err != gocql.ErrNotFound { |
|||
glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err) |
|||
return "", filer.ErrNotFound |
|||
} |
|||
} |
|||
if len(output) == 0 { |
|||
return "", fmt.Errorf("No file id found for %s", fullFileName) |
|||
} |
|||
return output[0], nil |
|||
} |
|||
|
|||
// Currently the fid is not returned
|
|||
func (c *CassandraStore) Delete(fullFileName string) (err error) { |
|||
if err := c.session.Query( |
|||
`DELETE FROM seaweed_files WHERE path = ?`, |
|||
fullFileName).Exec(); err != nil { |
|||
if err != gocql.ErrNotFound { |
|||
glog.V(0).Infof("Failed to delete file %s: %v", fullFileName, err) |
|||
} |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (c *CassandraStore) Close() { |
|||
if c.session != nil { |
|||
c.session.Close() |
|||
} |
|||
} |
@ -1,22 +0,0 @@ |
|||
/* |
|||
|
|||
Here is the CQL to create the table.CassandraStore |
|||
|
|||
Optionally you can adjust the keyspace name and replication settings. |
|||
|
|||
For production server, very likely you want to set replication_factor to 3 |
|||
|
|||
*/ |
|||
|
|||
create keyspace seaweed WITH replication = { |
|||
'class':'SimpleStrategy', |
|||
'replication_factor':1 |
|||
}; |
|||
|
|||
use seaweed; |
|||
|
|||
CREATE TABLE seaweed_files ( |
|||
path varchar, |
|||
fids list<varchar>, |
|||
PRIMARY KEY (path) |
|||
); |
@ -1,26 +0,0 @@ |
|||
Design Assumptions: |
|||
1. the number of directories are magnitudely smaller than the number of files |
|||
2. unlimited number of files under any directories |
|||
Phylosophy: |
|||
metadata for directories and files should be separated |
|||
Design: |
|||
Store directories in normal map |
|||
all of directories hopefully all be in memory |
|||
efficient to move/rename/list_directories |
|||
Log directory changes to append only log file |
|||
Store files in sorted string table in <dir_id/filename> format |
|||
efficient to list_files, just simple iterator |
|||
efficient to locate files, binary search |
|||
|
|||
Testing: |
|||
1. starting server, "weed server -filer=true" |
|||
2. posting files to different folders |
|||
curl -F "filename=@design.txt" "http://localhost:8888/sources/" |
|||
curl -F "filename=@design.txt" "http://localhost:8888/design/" |
|||
curl -F "filename=@directory.go" "http://localhost:8888/sources/weed/go/" |
|||
curl -F "filename=@directory.go" "http://localhost:8888/sources/testing/go/" |
|||
curl -F "filename=@filer.go" "http://localhost:8888/sources/weed/go/" |
|||
curl -F "filename=@filer_in_leveldb.go" "http://localhost:8888/sources/weed/go/" |
|||
curl "http://localhost:8888/?pretty=y" |
|||
curl "http://localhost:8888/sources/weed/go/?pretty=y" |
|||
curl "http://localhost:8888/sources/weed/go/?pretty=y" |
@ -1,15 +0,0 @@ |
|||
package embedded_filer |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
) |
|||
|
|||
type DirectoryManager interface { |
|||
FindDirectory(dirPath string) (DirectoryId, error) |
|||
ListDirectories(dirPath string) (dirs []filer.DirectoryName, err error) |
|||
MakeDirectory(currentDirPath string, dirName string) (DirectoryId, error) |
|||
MoveUnderDirectory(oldDirPath string, newParentDirPath string) error |
|||
DeleteDirectory(dirPath string) error |
|||
//functions used by FUSE
|
|||
FindDirectoryById(DirectoryId, error) |
|||
} |
@ -1,312 +0,0 @@ |
|||
package embedded_filer |
|||
|
|||
import ( |
|||
"bufio" |
|||
"fmt" |
|||
"io" |
|||
"os" |
|||
"path/filepath" |
|||
"strconv" |
|||
"strings" |
|||
"sync" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
) |
|||
|
|||
var writeLock sync.Mutex //serialize changes to dir.log
|
|||
|
|||
type DirectoryId int32 |
|||
|
|||
type DirectoryEntryInMap struct { |
|||
sync.Mutex |
|||
Name string |
|||
Parent *DirectoryEntryInMap |
|||
subDirectories map[string]*DirectoryEntryInMap |
|||
Id DirectoryId |
|||
} |
|||
|
|||
func (de *DirectoryEntryInMap) getChild(dirName string) (*DirectoryEntryInMap, bool) { |
|||
de.Lock() |
|||
defer de.Unlock() |
|||
child, ok := de.subDirectories[dirName] |
|||
return child, ok |
|||
} |
|||
func (de *DirectoryEntryInMap) addChild(dirName string, child *DirectoryEntryInMap) { |
|||
de.Lock() |
|||
defer de.Unlock() |
|||
de.subDirectories[dirName] = child |
|||
} |
|||
func (de *DirectoryEntryInMap) removeChild(dirName string) { |
|||
de.Lock() |
|||
defer de.Unlock() |
|||
delete(de.subDirectories, dirName) |
|||
} |
|||
func (de *DirectoryEntryInMap) hasChildren() bool { |
|||
de.Lock() |
|||
defer de.Unlock() |
|||
return len(de.subDirectories) > 0 |
|||
} |
|||
func (de *DirectoryEntryInMap) children() (dirNames []filer.DirectoryName) { |
|||
de.Lock() |
|||
defer de.Unlock() |
|||
for k, _ := range de.subDirectories { |
|||
dirNames = append(dirNames, filer.DirectoryName(k)) |
|||
} |
|||
return dirNames |
|||
} |
|||
|
|||
type DirectoryManagerInMap struct { |
|||
Root *DirectoryEntryInMap |
|||
max DirectoryId |
|||
logFile *os.File |
|||
isLoading bool |
|||
} |
|||
|
|||
func (dm *DirectoryManagerInMap) newDirectoryEntryInMap(parent *DirectoryEntryInMap, name string) (d *DirectoryEntryInMap, err error) { |
|||
d = &DirectoryEntryInMap{Name: name, Parent: parent, subDirectories: make(map[string]*DirectoryEntryInMap)} |
|||
var parts []string |
|||
for p := d; p != nil && p.Name != ""; p = p.Parent { |
|||
parts = append(parts, p.Name) |
|||
} |
|||
n := len(parts) |
|||
if n <= 0 { |
|||
return nil, fmt.Errorf("Failed to create folder %s/%s", parent.Name, name) |
|||
} |
|||
for i := 0; i < n/2; i++ { |
|||
parts[i], parts[n-1-i] = parts[n-1-i], parts[i] |
|||
} |
|||
dm.max++ |
|||
d.Id = dm.max |
|||
dm.log("add", "/"+strings.Join(parts, "/"), strconv.Itoa(int(d.Id))) |
|||
return d, nil |
|||
} |
|||
|
|||
func (dm *DirectoryManagerInMap) log(words ...string) { |
|||
if !dm.isLoading { |
|||
dm.logFile.WriteString(strings.Join(words, "\t") + "\n") |
|||
} |
|||
} |
|||
|
|||
func NewDirectoryManagerInMap(dirLogFile string) (dm *DirectoryManagerInMap, err error) { |
|||
dm = &DirectoryManagerInMap{} |
|||
//dm.Root do not use newDirectoryEntryInMap, since dm.max will be changed
|
|||
dm.Root = &DirectoryEntryInMap{subDirectories: make(map[string]*DirectoryEntryInMap)} |
|||
if dm.logFile, err = os.OpenFile(dirLogFile, os.O_RDWR|os.O_CREATE, 0644); err != nil { |
|||
return nil, fmt.Errorf("cannot write directory log file %s: %v", dirLogFile, err) |
|||
} |
|||
return dm, dm.load() |
|||
} |
|||
|
|||
func (dm *DirectoryManagerInMap) processEachLine(line string) error { |
|||
if strings.HasPrefix(line, "#") { |
|||
return nil |
|||
} |
|||
if line == "" { |
|||
return nil |
|||
} |
|||
parts := strings.Split(line, "\t") |
|||
if len(parts) == 0 { |
|||
return nil |
|||
} |
|||
switch parts[0] { |
|||
case "add": |
|||
v, pe := strconv.Atoi(parts[2]) |
|||
if pe != nil { |
|||
return pe |
|||
} |
|||
if e := dm.loadDirectory(parts[1], DirectoryId(v)); e != nil { |
|||
return e |
|||
} |
|||
case "mov": |
|||
newName := "" |
|||
if len(parts) >= 4 { |
|||
newName = parts[3] |
|||
} |
|||
if e := dm.MoveUnderDirectory(parts[1], parts[2], newName); e != nil { |
|||
return e |
|||
} |
|||
case "del": |
|||
if e := dm.DeleteDirectory(parts[1]); e != nil { |
|||
return e |
|||
} |
|||
default: |
|||
fmt.Printf("line %s has %s!\n", line, parts[0]) |
|||
return nil |
|||
} |
|||
return nil |
|||
} |
|||
func (dm *DirectoryManagerInMap) load() error { |
|||
dm.max = 0 |
|||
lines := bufio.NewReader(dm.logFile) |
|||
dm.isLoading = true |
|||
defer func() { dm.isLoading = false }() |
|||
for { |
|||
line, err := util.Readln(lines) |
|||
if err != nil && err != io.EOF { |
|||
return err |
|||
} |
|||
if pe := dm.processEachLine(string(line)); pe != nil { |
|||
return pe |
|||
} |
|||
if err == io.EOF { |
|||
return nil |
|||
} |
|||
} |
|||
} |
|||
|
|||
func (dm *DirectoryManagerInMap) findDirectory(dirPath string) (*DirectoryEntryInMap, error) { |
|||
if dirPath == "" { |
|||
return dm.Root, nil |
|||
} |
|||
dirPath = CleanFilePath(dirPath) |
|||
if dirPath == "/" { |
|||
return dm.Root, nil |
|||
} |
|||
parts := strings.Split(dirPath, "/") |
|||
dir := dm.Root |
|||
for i := 1; i < len(parts); i++ { |
|||
if sub, ok := dir.getChild(parts[i]); ok { |
|||
dir = sub |
|||
} else { |
|||
return dm.Root, filer.ErrNotFound |
|||
} |
|||
} |
|||
return dir, nil |
|||
} |
|||
func (dm *DirectoryManagerInMap) findDirectoryId(dirPath string) (DirectoryId, error) { |
|||
d, e := dm.findDirectory(dirPath) |
|||
if e == nil { |
|||
return d.Id, nil |
|||
} |
|||
return dm.Root.Id, e |
|||
} |
|||
|
|||
func (dm *DirectoryManagerInMap) loadDirectory(dirPath string, dirId DirectoryId) error { |
|||
dirPath = CleanFilePath(dirPath) |
|||
if dirPath == "/" { |
|||
return nil |
|||
} |
|||
parts := strings.Split(dirPath, "/") |
|||
dir := dm.Root |
|||
for i := 1; i < len(parts); i++ { |
|||
sub, ok := dir.getChild(parts[i]) |
|||
if !ok { |
|||
writeLock.Lock() |
|||
if sub2, createdByOtherThread := dir.getChild(parts[i]); createdByOtherThread { |
|||
sub = sub2 |
|||
} else { |
|||
if i != len(parts)-1 { |
|||
writeLock.Unlock() |
|||
return fmt.Errorf("%s should be created after parent %s", dirPath, parts[i]) |
|||
} |
|||
var err error |
|||
sub, err = dm.newDirectoryEntryInMap(dir, parts[i]) |
|||
if err != nil { |
|||
writeLock.Unlock() |
|||
return err |
|||
} |
|||
if sub.Id != dirId { |
|||
writeLock.Unlock() |
|||
// the dir.log should be the same order as in-memory directory id
|
|||
return fmt.Errorf("%s should be have id %v instead of %v", dirPath, sub.Id, dirId) |
|||
} |
|||
dir.addChild(parts[i], sub) |
|||
} |
|||
writeLock.Unlock() |
|||
} |
|||
dir = sub |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (dm *DirectoryManagerInMap) makeDirectory(dirPath string) (dir *DirectoryEntryInMap, created bool) { |
|||
dirPath = CleanFilePath(dirPath) |
|||
if dirPath == "/" { |
|||
return dm.Root, false |
|||
} |
|||
parts := strings.Split(dirPath, "/") |
|||
dir = dm.Root |
|||
for i := 1; i < len(parts); i++ { |
|||
sub, ok := dir.getChild(parts[i]) |
|||
if !ok { |
|||
writeLock.Lock() |
|||
if sub2, createdByOtherThread := dir.getChild(parts[i]); createdByOtherThread { |
|||
sub = sub2 |
|||
} else { |
|||
var err error |
|||
sub, err = dm.newDirectoryEntryInMap(dir, parts[i]) |
|||
if err != nil { |
|||
writeLock.Unlock() |
|||
return nil, false |
|||
} |
|||
dir.addChild(parts[i], sub) |
|||
created = true |
|||
} |
|||
writeLock.Unlock() |
|||
} |
|||
dir = sub |
|||
} |
|||
return dir, created |
|||
} |
|||
|
|||
func (dm *DirectoryManagerInMap) MakeDirectory(dirPath string) (DirectoryId, error) { |
|||
dir, _ := dm.makeDirectory(dirPath) |
|||
return dir.Id, nil |
|||
} |
|||
|
|||
func (dm *DirectoryManagerInMap) MoveUnderDirectory(oldDirPath string, newParentDirPath string, newName string) error { |
|||
writeLock.Lock() |
|||
defer writeLock.Unlock() |
|||
oldDir, oe := dm.findDirectory(oldDirPath) |
|||
if oe != nil { |
|||
return oe |
|||
} |
|||
parentDir, pe := dm.findDirectory(newParentDirPath) |
|||
if pe != nil { |
|||
return pe |
|||
} |
|||
dm.log("mov", oldDirPath, newParentDirPath, newName) |
|||
oldDir.Parent.removeChild(oldDir.Name) |
|||
if newName == "" { |
|||
newName = oldDir.Name |
|||
} |
|||
parentDir.addChild(newName, oldDir) |
|||
oldDir.Name = newName |
|||
oldDir.Parent = parentDir |
|||
return nil |
|||
} |
|||
|
|||
func (dm *DirectoryManagerInMap) ListDirectories(dirPath string) (dirNames []filer.DirectoryName, err error) { |
|||
d, e := dm.findDirectory(dirPath) |
|||
if e != nil { |
|||
return dirNames, e |
|||
} |
|||
return d.children(), nil |
|||
} |
|||
func (dm *DirectoryManagerInMap) DeleteDirectory(dirPath string) error { |
|||
writeLock.Lock() |
|||
defer writeLock.Unlock() |
|||
if dirPath == "/" { |
|||
return fmt.Errorf("Can not delete %s", dirPath) |
|||
} |
|||
d, e := dm.findDirectory(dirPath) |
|||
if e != nil { |
|||
return e |
|||
} |
|||
if d.hasChildren() { |
|||
return fmt.Errorf("dir %s still has sub directories", dirPath) |
|||
} |
|||
d.Parent.removeChild(d.Name) |
|||
d.Parent = nil |
|||
dm.log("del", dirPath) |
|||
return nil |
|||
} |
|||
|
|||
func CleanFilePath(fp string) string { |
|||
ret := filepath.Clean(fp) |
|||
if os.PathSeparator == '\\' { |
|||
return strings.Replace(ret, "\\", "/", -1) |
|||
} |
|||
return ret |
|||
} |
@ -1,86 +0,0 @@ |
|||
package embedded_filer |
|||
|
|||
import ( |
|||
"os" |
|||
"strings" |
|||
"testing" |
|||
) |
|||
|
|||
func TestDirectory(t *testing.T) { |
|||
dm, _ := NewDirectoryManagerInMap("/tmp/dir.log") |
|||
defer func() { |
|||
if true { |
|||
os.Remove("/tmp/dir.log") |
|||
} |
|||
}() |
|||
dm.MakeDirectory("/a/b/c") |
|||
dm.MakeDirectory("/a/b/d") |
|||
dm.MakeDirectory("/a/b/e") |
|||
dm.MakeDirectory("/a/b/e/f") |
|||
dm.MakeDirectory("/a/b/e/f/g") |
|||
dm.MoveUnderDirectory("/a/b/e/f/g", "/a/b", "t") |
|||
if _, err := dm.findDirectoryId("/a/b/e/f/g"); err == nil { |
|||
t.Fatal("/a/b/e/f/g should not exist any more after moving") |
|||
} |
|||
if _, err := dm.findDirectoryId("/a/b/t"); err != nil { |
|||
t.Fatal("/a/b/t should exist after moving") |
|||
} |
|||
if _, err := dm.findDirectoryId("/a/b/g"); err == nil { |
|||
t.Fatal("/a/b/g should not exist after moving") |
|||
} |
|||
dm.MoveUnderDirectory("/a/b/e/f", "/a/b", "") |
|||
if _, err := dm.findDirectoryId("/a/b/f"); err != nil { |
|||
t.Fatal("/a/b/g should not exist after moving") |
|||
} |
|||
dm.MakeDirectory("/a/b/g/h/i") |
|||
dm.DeleteDirectory("/a/b/e/f") |
|||
dm.DeleteDirectory("/a/b/e") |
|||
dirNames, _ := dm.ListDirectories("/a/b/e") |
|||
for _, v := range dirNames { |
|||
println("sub1 dir:", v) |
|||
} |
|||
dm.logFile.Close() |
|||
|
|||
var path []string |
|||
printTree(dm.Root, path) |
|||
|
|||
dm2, e := NewDirectoryManagerInMap("/tmp/dir.log") |
|||
if e != nil { |
|||
println("load error", e.Error()) |
|||
} |
|||
if !compare(dm.Root, dm2.Root) { |
|||
t.Fatal("restored dir not the same!") |
|||
} |
|||
printTree(dm2.Root, path) |
|||
} |
|||
|
|||
func printTree(node *DirectoryEntryInMap, path []string) { |
|||
println(strings.Join(path, "/") + "/" + node.Name) |
|||
path = append(path, node.Name) |
|||
for _, v := range node.subDirectories { |
|||
printTree(v, path) |
|||
} |
|||
} |
|||
|
|||
func compare(root1 *DirectoryEntryInMap, root2 *DirectoryEntryInMap) bool { |
|||
if len(root1.subDirectories) != len(root2.subDirectories) { |
|||
return false |
|||
} |
|||
if root1.Name != root2.Name { |
|||
return false |
|||
} |
|||
if root1.Id != root2.Id { |
|||
return false |
|||
} |
|||
if !(root1.Parent == nil && root2.Parent == nil) { |
|||
if root1.Parent.Id != root2.Parent.Id { |
|||
return false |
|||
} |
|||
} |
|||
for k, v := range root1.subDirectories { |
|||
if !compare(v, root2.subDirectories[k]) { |
|||
return false |
|||
} |
|||
} |
|||
return true |
|||
} |
@ -1,156 +0,0 @@ |
|||
package embedded_filer |
|||
|
|||
import ( |
|||
"errors" |
|||
"fmt" |
|||
"path/filepath" |
|||
"strings" |
|||
"sync" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/operation" |
|||
) |
|||
|
|||
type FilerEmbedded struct { |
|||
master string |
|||
directories *DirectoryManagerInMap |
|||
files *FileListInLevelDb |
|||
mvMutex sync.Mutex |
|||
} |
|||
|
|||
func NewFilerEmbedded(master string, dir string) (filer *FilerEmbedded, err error) { |
|||
dm, de := NewDirectoryManagerInMap(filepath.Join(dir, "dir.log")) |
|||
if de != nil { |
|||
return nil, de |
|||
} |
|||
fl, fe := NewFileListInLevelDb(dir) |
|||
if fe != nil { |
|||
return nil, fe |
|||
} |
|||
filer = &FilerEmbedded{ |
|||
master: master, |
|||
directories: dm, |
|||
files: fl, |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (filer *FilerEmbedded) CreateFile(filePath string, fid string) (err error) { |
|||
dir, file := filepath.Split(filePath) |
|||
dirId, e := filer.directories.MakeDirectory(dir) |
|||
if e != nil { |
|||
return e |
|||
} |
|||
return filer.files.CreateFile(dirId, file, fid) |
|||
} |
|||
func (filer *FilerEmbedded) FindFile(filePath string) (fid string, err error) { |
|||
dir, file := filepath.Split(filePath) |
|||
return filer.findFileEntry(dir, file) |
|||
} |
|||
func (filer *FilerEmbedded) findFileEntry(parentPath string, fileName string) (fid string, err error) { |
|||
dirId, e := filer.directories.findDirectoryId(parentPath) |
|||
if e != nil { |
|||
return "", e |
|||
} |
|||
return filer.files.FindFile(dirId, fileName) |
|||
} |
|||
|
|||
func (filer *FilerEmbedded) LookupDirectoryEntry(dirPath string, name string) (found bool, fileId string, err error) { |
|||
if _, err = filer.directories.findDirectory(filepath.Join(dirPath, name)); err == nil { |
|||
return true, "", nil |
|||
} |
|||
if fileId, err = filer.findFileEntry(dirPath, name); err == nil { |
|||
return true, fileId, nil |
|||
} |
|||
return false, "", err |
|||
} |
|||
func (filer *FilerEmbedded) ListDirectories(dirPath string) (dirs []filer.DirectoryName, err error) { |
|||
return filer.directories.ListDirectories(dirPath) |
|||
} |
|||
func (filer *FilerEmbedded) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { |
|||
dirId, e := filer.directories.findDirectoryId(dirPath) |
|||
if e != nil { |
|||
return nil, e |
|||
} |
|||
return filer.files.ListFiles(dirId, lastFileName, limit), nil |
|||
} |
|||
func (filer *FilerEmbedded) DeleteDirectory(dirPath string, recursive bool) (err error) { |
|||
dirId, e := filer.directories.findDirectoryId(dirPath) |
|||
if e != nil { |
|||
return e |
|||
} |
|||
if sub_dirs, sub_err := filer.directories.ListDirectories(dirPath); sub_err == nil { |
|||
if len(sub_dirs) > 0 && !recursive { |
|||
return fmt.Errorf("Fail to delete directory %s: %d sub directories found!", dirPath, len(sub_dirs)) |
|||
} |
|||
for _, sub := range sub_dirs { |
|||
if delete_sub_err := filer.DeleteDirectory(filepath.Join(dirPath, string(sub)), recursive); delete_sub_err != nil { |
|||
return delete_sub_err |
|||
} |
|||
} |
|||
} |
|||
list := filer.files.ListFiles(dirId, "", 100) |
|||
if len(list) != 0 && !recursive { |
|||
if !recursive { |
|||
return fmt.Errorf("Fail to delete non-empty directory %s!", dirPath) |
|||
} |
|||
} |
|||
for { |
|||
if len(list) == 0 { |
|||
return filer.directories.DeleteDirectory(dirPath) |
|||
} |
|||
var fids []string |
|||
for _, fileEntry := range list { |
|||
fids = append(fids, string(fileEntry.Id)) |
|||
} |
|||
if result_list, delete_file_err := operation.DeleteFiles(filer.master, fids); delete_file_err != nil { |
|||
return delete_file_err |
|||
} else { |
|||
if len(result_list.Errors) > 0 { |
|||
return errors.New(strings.Join(result_list.Errors, "\n")) |
|||
} |
|||
} |
|||
lastFile := list[len(list)-1] |
|||
list = filer.files.ListFiles(dirId, lastFile.Name, 100) |
|||
} |
|||
|
|||
} |
|||
|
|||
func (filer *FilerEmbedded) DeleteFile(filePath string) (fid string, err error) { |
|||
dir, file := filepath.Split(filePath) |
|||
dirId, e := filer.directories.findDirectoryId(dir) |
|||
if e != nil { |
|||
return "", e |
|||
} |
|||
return filer.files.DeleteFile(dirId, file) |
|||
} |
|||
|
|||
/* |
|||
Move a folder or a file, with 4 Use cases: |
|||
mv fromDir toNewDir |
|||
mv fromDir toOldDir |
|||
mv fromFile toDir |
|||
mv fromFile toFile |
|||
*/ |
|||
func (filer *FilerEmbedded) Move(fromPath string, toPath string) error { |
|||
filer.mvMutex.Lock() |
|||
defer filer.mvMutex.Unlock() |
|||
|
|||
if _, dir_err := filer.directories.findDirectoryId(fromPath); dir_err == nil { |
|||
if _, err := filer.directories.findDirectoryId(toPath); err == nil { |
|||
// move folder under an existing folder
|
|||
return filer.directories.MoveUnderDirectory(fromPath, toPath, "") |
|||
} |
|||
// move folder to a new folder
|
|||
return filer.directories.MoveUnderDirectory(fromPath, filepath.Dir(toPath), filepath.Base(toPath)) |
|||
} |
|||
if fid, file_err := filer.DeleteFile(fromPath); file_err == nil { |
|||
if _, err := filer.directories.findDirectoryId(toPath); err == nil { |
|||
// move file under an existing folder
|
|||
return filer.CreateFile(filepath.Join(toPath, filepath.Base(fromPath)), fid) |
|||
} |
|||
// move to a folder with new name
|
|||
return filer.CreateFile(toPath, fid) |
|||
} |
|||
return fmt.Errorf("File %s is not found!", fromPath) |
|||
} |
@ -1,87 +0,0 @@ |
|||
package embedded_filer |
|||
|
|||
import ( |
|||
"bytes" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/syndtr/goleveldb/leveldb" |
|||
"github.com/syndtr/goleveldb/leveldb/util" |
|||
) |
|||
|
|||
/* |
|||
The entry in level db has this format: |
|||
key: genKey(dirId, fileName) |
|||
value: []byte(fid) |
|||
And genKey(dirId, fileName) use first 4 bytes to store dirId, and rest for fileName |
|||
*/ |
|||
|
|||
type FileListInLevelDb struct { |
|||
db *leveldb.DB |
|||
} |
|||
|
|||
func NewFileListInLevelDb(dir string) (fl *FileListInLevelDb, err error) { |
|||
fl = &FileListInLevelDb{} |
|||
if fl.db, err = leveldb.OpenFile(dir, nil); err != nil { |
|||
return |
|||
} |
|||
return |
|||
} |
|||
|
|||
func genKey(dirId DirectoryId, fileName string) []byte { |
|||
ret := make([]byte, 0, 4+len(fileName)) |
|||
for i := 3; i >= 0; i-- { |
|||
ret = append(ret, byte(dirId>>(uint(i)*8))) |
|||
} |
|||
ret = append(ret, []byte(fileName)...) |
|||
return ret |
|||
} |
|||
|
|||
func (fl *FileListInLevelDb) CreateFile(dirId DirectoryId, fileName string, fid string) (err error) { |
|||
glog.V(4).Infoln("directory", dirId, "fileName", fileName, "fid", fid) |
|||
return fl.db.Put(genKey(dirId, fileName), []byte(fid), nil) |
|||
} |
|||
func (fl *FileListInLevelDb) DeleteFile(dirId DirectoryId, fileName string) (fid string, err error) { |
|||
if fid, err = fl.FindFile(dirId, fileName); err != nil { |
|||
if err == leveldb.ErrNotFound { |
|||
return "", nil |
|||
} |
|||
return |
|||
} |
|||
err = fl.db.Delete(genKey(dirId, fileName), nil) |
|||
return fid, err |
|||
} |
|||
func (fl *FileListInLevelDb) FindFile(dirId DirectoryId, fileName string) (fid string, err error) { |
|||
data, e := fl.db.Get(genKey(dirId, fileName), nil) |
|||
if e == leveldb.ErrNotFound { |
|||
return "", filer.ErrNotFound |
|||
} else if e != nil { |
|||
return "", e |
|||
} |
|||
return string(data), nil |
|||
} |
|||
func (fl *FileListInLevelDb) ListFiles(dirId DirectoryId, lastFileName string, limit int) (files []filer.FileEntry) { |
|||
glog.V(4).Infoln("directory", dirId, "lastFileName", lastFileName, "limit", limit) |
|||
dirKey := genKey(dirId, "") |
|||
iter := fl.db.NewIterator(&util.Range{Start: genKey(dirId, lastFileName)}, nil) |
|||
limitCounter := 0 |
|||
for iter.Next() { |
|||
key := iter.Key() |
|||
if !bytes.HasPrefix(key, dirKey) { |
|||
break |
|||
} |
|||
fileName := string(key[len(dirKey):]) |
|||
if fileName == lastFileName { |
|||
continue |
|||
} |
|||
limitCounter++ |
|||
if limit > 0 { |
|||
if limitCounter > limit { |
|||
break |
|||
} |
|||
} |
|||
files = append(files, filer.FileEntry{Name: fileName, Id: filer.FileId(string(iter.Value()))}) |
|||
} |
|||
iter.Release() |
|||
return |
|||
} |
@ -1,29 +0,0 @@ |
|||
package filer |
|||
|
|||
import ( |
|||
"errors" |
|||
) |
|||
|
|||
type FileId string //file id in SeaweedFS
|
|||
|
|||
type FileEntry struct { |
|||
Name string `json:"name,omitempty"` //file name without path
|
|||
Id FileId `json:"fid,omitempty"` |
|||
} |
|||
|
|||
type DirectoryName string |
|||
|
|||
type Filer interface { |
|||
CreateFile(fullFileName string, fid string) (err error) |
|||
FindFile(fullFileName string) (fid string, err error) |
|||
DeleteFile(fullFileName string) (fid string, err error) |
|||
|
|||
//Optional functions. embedded filer support these
|
|||
ListDirectories(dirPath string) (dirs []DirectoryName, err error) |
|||
ListFiles(dirPath string, lastFileName string, limit int) (files []FileEntry, err error) |
|||
DeleteDirectory(dirPath string, recursive bool) (err error) |
|||
Move(fromPath string, toPath string) (err error) |
|||
LookupDirectoryEntry(dirPath string, name string) (found bool, fileId string, err error) |
|||
} |
|||
|
|||
var ErrNotFound = errors.New("filer: no entry is found in filer store") |
@ -1,66 +0,0 @@ |
|||
package flat_namespace |
|||
|
|||
import ( |
|||
"errors" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"path/filepath" |
|||
) |
|||
|
|||
type FlatNamespaceFiler struct { |
|||
master string |
|||
store FlatNamespaceStore |
|||
} |
|||
|
|||
var ( |
|||
ErrNotImplemented = errors.New("Not Implemented for flat namespace meta data store") |
|||
) |
|||
|
|||
func NewFlatNamespaceFiler(master string, store FlatNamespaceStore) *FlatNamespaceFiler { |
|||
return &FlatNamespaceFiler{ |
|||
master: master, |
|||
store: store, |
|||
} |
|||
} |
|||
|
|||
func (filer *FlatNamespaceFiler) CreateFile(fullFileName string, fid string) (err error) { |
|||
return filer.store.Put(fullFileName, fid) |
|||
} |
|||
func (filer *FlatNamespaceFiler) FindFile(fullFileName string) (fid string, err error) { |
|||
return filer.store.Get(fullFileName) |
|||
} |
|||
func (filer *FlatNamespaceFiler) LookupDirectoryEntry(dirPath string, name string) (found bool, fileId string, err error) { |
|||
if fileId, err = filer.FindFile(filepath.Join(dirPath, name)); err == nil { |
|||
return true, fileId, nil |
|||
} |
|||
return false, "", err |
|||
} |
|||
func (filer *FlatNamespaceFiler) ListDirectories(dirPath string) (dirs []filer.DirectoryName, err error) { |
|||
return nil, ErrNotImplemented |
|||
} |
|||
func (filer *FlatNamespaceFiler) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { |
|||
return nil, ErrNotImplemented |
|||
} |
|||
func (filer *FlatNamespaceFiler) DeleteDirectory(dirPath string, recursive bool) (err error) { |
|||
return ErrNotImplemented |
|||
} |
|||
|
|||
func (filer *FlatNamespaceFiler) DeleteFile(fullFileName string) (fid string, err error) { |
|||
fid, err = filer.FindFile(fullFileName) |
|||
if err != nil { |
|||
return "", err |
|||
} |
|||
|
|||
err = filer.store.Delete(fullFileName) |
|||
if err != nil { |
|||
return "", err |
|||
} |
|||
|
|||
return fid, nil |
|||
//return filer.store.Delete(fullFileName)
|
|||
//are you kidding me!!!!
|
|||
} |
|||
|
|||
func (filer *FlatNamespaceFiler) Move(fromPath string, toPath string) error { |
|||
return ErrNotImplemented |
|||
} |
@ -1,9 +0,0 @@ |
|||
package flat_namespace |
|||
|
|||
import () |
|||
|
|||
type FlatNamespaceStore interface { |
|||
Put(fullFileName string, fid string) (err error) |
|||
Get(fullFileName string) (fid string, err error) |
|||
Delete(fullFileName string) (err error) |
|||
} |
@ -1,67 +0,0 @@ |
|||
#MySQL filer mapping store |
|||
|
|||
## Schema format |
|||
|
|||
|
|||
Basically, uriPath and fid are the key elements stored in MySQL. In view of the optimization and user's usage, |
|||
adding primary key with integer type and involving createTime, updateTime, status fields should be somewhat meaningful. |
|||
Of course, you could customize the schema per your concretely circumstance freely. |
|||
|
|||
<pre><code> |
|||
CREATE TABLE IF NOT EXISTS `filer_mapping` ( |
|||
`id` bigint(20) NOT NULL AUTO_INCREMENT, |
|||
`uriPath` char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath', |
|||
`fid` char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid', |
|||
`createTime` int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp', |
|||
`updateTime` int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp', |
|||
`remark` varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field', |
|||
`status` tinyint(2) DEFAULT '1' COMMENT 'resource status', |
|||
PRIMARY KEY (`id`), |
|||
UNIQUE KEY `index_uriPath` (`uriPath`) |
|||
) DEFAULT CHARSET=utf8; |
|||
</code></pre> |
|||
|
|||
|
|||
The MySQL 's config params is not added into the weed command option as other stores(redis,cassandra). Instead, |
|||
We created a config file(json format) for them. TOML,YAML or XML also should be OK. But TOML and YAML need import thirdparty package |
|||
while XML is a little bit complex. |
|||
|
|||
The sample config file's content is below: |
|||
|
|||
<pre><code> |
|||
{ |
|||
"mysql": [ |
|||
{ |
|||
"User": "root", |
|||
"Password": "root", |
|||
"HostName": "127.0.0.1", |
|||
"Port": 3306, |
|||
"DataBase": "seaweedfs" |
|||
}, |
|||
{ |
|||
"User": "root", |
|||
"Password": "root", |
|||
"HostName": "127.0.0.2", |
|||
"Port": 3306, |
|||
"DataBase": "seaweedfs" |
|||
} |
|||
], |
|||
"IsSharding":true, |
|||
"ShardCount":1024 |
|||
} |
|||
</code></pre> |
|||
|
|||
|
|||
The "mysql" field in above conf file is an array which include all mysql instances you prepared to store sharding data. |
|||
|
|||
1. If one mysql instance is enough, just keep one instance in "mysql" field. |
|||
|
|||
2. If table sharding at a specific mysql instance is needed , mark "IsSharding" field with true and specify total table sharding numbers using "ShardCount" field. |
|||
|
|||
3. If the mysql service could be auto scaled transparently in your environment, just config one mysql instance(usually it's a frondend proxy or VIP),and mark "IsSharding" with false value |
|||
|
|||
4. If you prepare more than one mysql instance and have no plan to use table sharding for any instance(mark isSharding with false), instance sharding will still be done implicitly |
|||
|
|||
|
|||
|
|||
|
@ -1,270 +0,0 @@ |
|||
package mysql_store |
|||
|
|||
import ( |
|||
"database/sql" |
|||
"fmt" |
|||
"hash/crc32" |
|||
"sync" |
|||
"time" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
|
|||
_ "github.com/go-sql-driver/mysql" |
|||
) |
|||
|
|||
const ( |
|||
sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8" |
|||
default_maxIdleConnections = 100 |
|||
default_maxOpenConnections = 50 |
|||
default_maxTableNums = 1024 |
|||
tableName = "filer_mapping" |
|||
) |
|||
|
|||
var ( |
|||
_init_db sync.Once |
|||
_db_connections []*sql.DB |
|||
) |
|||
|
|||
type MySqlConf struct { |
|||
User string |
|||
Password string |
|||
HostName string |
|||
Port int |
|||
DataBase string |
|||
MaxIdleConnections int |
|||
MaxOpenConnections int |
|||
} |
|||
|
|||
type ShardingConf struct { |
|||
IsSharding bool `json:"isSharding"` |
|||
ShardCount int `json:"shardCount"` |
|||
} |
|||
|
|||
type MySqlStore struct { |
|||
dbs []*sql.DB |
|||
isSharding bool |
|||
shardCount int |
|||
} |
|||
|
|||
func getDbConnection(confs []MySqlConf) []*sql.DB { |
|||
_init_db.Do(func() { |
|||
for _, conf := range confs { |
|||
|
|||
sqlUrl := fmt.Sprintf(sqlUrl, conf.User, conf.Password, conf.HostName, conf.Port, conf.DataBase) |
|||
var dbErr error |
|||
_db_connection, dbErr := sql.Open("mysql", sqlUrl) |
|||
if dbErr != nil { |
|||
_db_connection.Close() |
|||
_db_connection = nil |
|||
panic(dbErr) |
|||
} |
|||
var maxIdleConnections, maxOpenConnections int |
|||
|
|||
if conf.MaxIdleConnections != 0 { |
|||
maxIdleConnections = conf.MaxIdleConnections |
|||
} else { |
|||
maxIdleConnections = default_maxIdleConnections |
|||
} |
|||
if conf.MaxOpenConnections != 0 { |
|||
maxOpenConnections = conf.MaxOpenConnections |
|||
} else { |
|||
maxOpenConnections = default_maxOpenConnections |
|||
} |
|||
|
|||
_db_connection.SetMaxIdleConns(maxIdleConnections) |
|||
_db_connection.SetMaxOpenConns(maxOpenConnections) |
|||
_db_connections = append(_db_connections, _db_connection) |
|||
} |
|||
}) |
|||
return _db_connections |
|||
} |
|||
|
|||
func NewMysqlStore(confs []MySqlConf, isSharding bool, shardCount int) *MySqlStore { |
|||
ms := &MySqlStore{ |
|||
dbs: getDbConnection(confs), |
|||
isSharding: isSharding, |
|||
shardCount: shardCount, |
|||
} |
|||
|
|||
for _, db := range ms.dbs { |
|||
if !isSharding { |
|||
ms.shardCount = 1 |
|||
} else { |
|||
if ms.shardCount == 0 { |
|||
ms.shardCount = default_maxTableNums |
|||
} |
|||
} |
|||
for i := 0; i < ms.shardCount; i++ { |
|||
if err := ms.createTables(db, tableName, i); err != nil { |
|||
fmt.Printf("create table failed %v", err) |
|||
} |
|||
} |
|||
} |
|||
|
|||
return ms |
|||
} |
|||
|
|||
func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) { |
|||
hash_value := crc32.ChecksumIEEE([]byte(fullFileName)) |
|||
instance_offset = int(hash_value) % len(s.dbs) |
|||
table_postfix = int(hash_value) % s.shardCount |
|||
return |
|||
} |
|||
|
|||
func (s *MySqlStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) { |
|||
instance_offset, table_postfix := s.hash(path) |
|||
instanceId = instance_offset |
|||
if s.isSharding { |
|||
tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix) |
|||
} else { |
|||
tableFullName = tableName |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) { |
|||
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath) |
|||
if err != nil { |
|||
return "", fmt.Errorf("MySqlStore Get operation can not parse file path %s: err is %v", fullFilePath, err) |
|||
} |
|||
fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName) |
|||
if err == sql.ErrNoRows { |
|||
//Could not found
|
|||
err = filer.ErrNotFound |
|||
} |
|||
return fid, err |
|||
} |
|||
|
|||
func (s *MySqlStore) Put(fullFilePath string, fid string) (err error) { |
|||
var tableFullName string |
|||
|
|||
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath) |
|||
if err != nil { |
|||
return fmt.Errorf("MySqlStore Put operation can not parse file path %s: err is %v", fullFilePath, err) |
|||
} |
|||
var old_fid string |
|||
if old_fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil && err != sql.ErrNoRows { |
|||
return fmt.Errorf("MySqlStore Put operation failed when querying path %s: err is %v", fullFilePath, err) |
|||
} else { |
|||
if len(old_fid) == 0 { |
|||
err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName) |
|||
err = fmt.Errorf("MySqlStore Put operation failed when inserting path %s with fid %s : err is %v", fullFilePath, fid, err) |
|||
} else { |
|||
err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName) |
|||
err = fmt.Errorf("MySqlStore Put operation failed when updating path %s with fid %s : err is %v", fullFilePath, fid, err) |
|||
} |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (s *MySqlStore) Delete(fullFilePath string) (err error) { |
|||
var fid string |
|||
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath) |
|||
if err != nil { |
|||
return fmt.Errorf("MySqlStore Delete operation can not parse file path %s: err is %v", fullFilePath, err) |
|||
} |
|||
if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil { |
|||
return fmt.Errorf("MySqlStore Delete operation failed when querying path %s: err is %v", fullFilePath, err) |
|||
} else if fid == "" { |
|||
return nil |
|||
} |
|||
if err = s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil { |
|||
return fmt.Errorf("MySqlStore Delete operation failed when deleting path %s: err is %v", fullFilePath, err) |
|||
} else { |
|||
return nil |
|||
} |
|||
} |
|||
|
|||
func (s *MySqlStore) Close() { |
|||
for _, db := range s.dbs { |
|||
db.Close() |
|||
} |
|||
} |
|||
|
|||
var createTable = ` |
|||
CREATE TABLE IF NOT EXISTS %s ( |
|||
id bigint(20) NOT NULL AUTO_INCREMENT, |
|||
uriPath char(255) NOT NULL DEFAULT "" COMMENT 'http uriPath', |
|||
fid char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid', |
|||
createTime int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp', |
|||
updateTime int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp', |
|||
remark varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field', |
|||
status tinyint(2) DEFAULT '1' COMMENT 'resource status', |
|||
PRIMARY KEY (id), |
|||
UNIQUE KEY index_uriPath (uriPath) |
|||
) DEFAULT CHARSET=utf8; |
|||
` |
|||
|
|||
func (s *MySqlStore) createTables(db *sql.DB, tableName string, postfix int) error { |
|||
var realTableName string |
|||
if s.isSharding { |
|||
realTableName = fmt.Sprintf("%s_%4d", tableName, postfix) |
|||
} else { |
|||
realTableName = tableName |
|||
} |
|||
|
|||
stmt, err := db.Prepare(fmt.Sprintf(createTable, realTableName)) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer stmt.Close() |
|||
|
|||
_, err = stmt.Exec() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (s *MySqlStore) query(uriPath string, db *sql.DB, tableName string) (string, error) { |
|||
sqlStatement := "SELECT fid FROM %s WHERE uriPath=?" |
|||
row := db.QueryRow(fmt.Sprintf(sqlStatement, tableName), uriPath) |
|||
var fid string |
|||
err := row.Scan(&fid) |
|||
if err != nil { |
|||
return "", err |
|||
} |
|||
return fid, nil |
|||
} |
|||
|
|||
func (s *MySqlStore) update(uriPath string, fid string, db *sql.DB, tableName string) error { |
|||
sqlStatement := "UPDATE %s SET fid=?, updateTime=? WHERE uriPath=?" |
|||
res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), fid, time.Now().Unix(), uriPath) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
_, err = res.RowsAffected() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (s *MySqlStore) insert(uriPath string, fid string, db *sql.DB, tableName string) error { |
|||
sqlStatement := "INSERT INTO %s (uriPath,fid,createTime) VALUES(?,?,?)" |
|||
res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath, fid, time.Now().Unix()) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
_, err = res.RowsAffected() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (s *MySqlStore) delete(uriPath string, db *sql.DB, tableName string) error { |
|||
sqlStatement := "DELETE FROM %s WHERE uriPath=?" |
|||
res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
_, err = res.RowsAffected() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
} |
@ -1,30 +0,0 @@ |
|||
package mysql_store |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"hash/crc32" |
|||
"testing" |
|||
) |
|||
|
|||
func TestGenerateMysqlConf(t *testing.T) { |
|||
var conf []MySqlConf |
|||
conf = append(conf, MySqlConf{ |
|||
User: "root", |
|||
Password: "root", |
|||
HostName: "localhost", |
|||
Port: 3306, |
|||
DataBase: "seaweedfs", |
|||
}) |
|||
body, err := json.Marshal(conf) |
|||
if err != nil { |
|||
t.Errorf("json encoding err %s", err.Error()) |
|||
} |
|||
t.Logf("json output is %s", string(body)) |
|||
} |
|||
|
|||
func TestCRC32FullPathName(t *testing.T) { |
|||
fullPathName := "/prod-bucket/law632191483895612493300-signed.pdf" |
|||
hash_value := crc32.ChecksumIEEE([]byte(fullPathName)) |
|||
table_postfix := int(hash_value) % 1024 |
|||
t.Logf("table postfix %d", table_postfix) |
|||
} |
@ -1,456 +0,0 @@ |
|||
package postgres_store |
|||
|
|||
import ( |
|||
"database/sql" |
|||
"fmt" |
|||
"path/filepath" |
|||
"time" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
|
|||
_ "github.com/lib/pq" |
|||
_ "path/filepath" |
|||
"strings" |
|||
) |
|||
|
|||
type DirectoryId int32 |
|||
|
|||
func databaseExists(db *sql.DB, databaseName string) (bool, error) { |
|||
sqlStatement := "SELECT datname from pg_database WHERE datname='%s'" |
|||
row := db.QueryRow(fmt.Sprintf(sqlStatement, databaseName)) |
|||
|
|||
var dbName string |
|||
err := row.Scan(&dbName) |
|||
if err != nil { |
|||
if err == sql.ErrNoRows { |
|||
return false, nil |
|||
} |
|||
return false, err |
|||
} |
|||
return true, nil |
|||
} |
|||
|
|||
func createDatabase(db *sql.DB, databaseName string) error { |
|||
sqlStatement := "CREATE DATABASE %s ENCODING='UTF8'" |
|||
_, err := db.Exec(fmt.Sprintf(sqlStatement, databaseName)) |
|||
return err |
|||
} |
|||
|
|||
func getDbConnection(conf PostgresConf) *sql.DB { |
|||
_init_db.Do(func() { |
|||
|
|||
sqlUrl := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30", conf.HostName, conf.Port, conf.User, conf.Password, "postgres", conf.SslMode) |
|||
glog.V(3).Infoln("Opening postgres master database") |
|||
|
|||
var dbErr error |
|||
_db_connection, dbErr := sql.Open("postgres", sqlUrl) |
|||
if dbErr != nil { |
|||
_db_connection.Close() |
|||
_db_connection = nil |
|||
panic(dbErr) |
|||
} |
|||
|
|||
pingErr := _db_connection.Ping() |
|||
if pingErr != nil { |
|||
_db_connection.Close() |
|||
_db_connection = nil |
|||
panic(pingErr) |
|||
} |
|||
|
|||
glog.V(3).Infoln("Checking to see if DB exists: ", conf.DataBase) |
|||
var existsErr error |
|||
dbExists, existsErr := databaseExists(_db_connection, conf.DataBase) |
|||
if existsErr != nil { |
|||
_db_connection.Close() |
|||
_db_connection = nil |
|||
panic(existsErr) |
|||
} |
|||
|
|||
if !dbExists { |
|||
glog.V(3).Infoln("Database doesn't exist. Attempting to create one: ", conf.DataBase) |
|||
createErr := createDatabase(_db_connection, conf.DataBase) |
|||
if createErr != nil { |
|||
_db_connection.Close() |
|||
_db_connection = nil |
|||
panic(createErr) |
|||
} |
|||
} |
|||
|
|||
glog.V(3).Infoln("Closing master postgres database and opening configured database: ", conf.DataBase) |
|||
_db_connection.Close() |
|||
_db_connection = nil |
|||
|
|||
sqlUrl = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30", conf.HostName, conf.Port, conf.User, conf.Password, conf.DataBase, conf.SslMode) |
|||
_db_connection, dbErr = sql.Open("postgres", sqlUrl) |
|||
if dbErr != nil { |
|||
_db_connection.Close() |
|||
_db_connection = nil |
|||
panic(dbErr) |
|||
} |
|||
|
|||
pingErr = _db_connection.Ping() |
|||
if pingErr != nil { |
|||
_db_connection.Close() |
|||
_db_connection = nil |
|||
panic(pingErr) |
|||
} |
|||
|
|||
maxIdleConnections, maxOpenConnections := default_maxIdleConnections, default_maxOpenConnections |
|||
if conf.MaxIdleConnections != 0 { |
|||
maxIdleConnections = conf.MaxIdleConnections |
|||
} |
|||
if conf.MaxOpenConnections != 0 { |
|||
maxOpenConnections = conf.MaxOpenConnections |
|||
} |
|||
|
|||
_db_connection.SetMaxIdleConns(maxIdleConnections) |
|||
_db_connection.SetMaxOpenConns(maxOpenConnections) |
|||
}) |
|||
return _db_connection |
|||
} |
|||
|
|||
var createDirectoryTable = ` |
|||
|
|||
CREATE TABLE IF NOT EXISTS %s ( |
|||
id BIGSERIAL NOT NULL, |
|||
directoryRoot VARCHAR(1024) NOT NULL DEFAULT '', |
|||
directoryName VARCHAR(1024) NOT NULL DEFAULT '', |
|||
CONSTRAINT unique_directory UNIQUE (directoryRoot, directoryName) |
|||
); |
|||
` |
|||
|
|||
var createFileTable = ` |
|||
|
|||
CREATE TABLE IF NOT EXISTS %s ( |
|||
id BIGSERIAL NOT NULL, |
|||
directoryPart VARCHAR(1024) NOT NULL DEFAULT '', |
|||
filePart VARCHAR(1024) NOT NULL DEFAULT '', |
|||
fid VARCHAR(36) NOT NULL DEFAULT '', |
|||
createTime BIGINT NOT NULL DEFAULT 0, |
|||
updateTime BIGINT NOT NULL DEFAULT 0, |
|||
remark VARCHAR(20) NOT NULL DEFAULT '', |
|||
status SMALLINT NOT NULL DEFAULT '1', |
|||
PRIMARY KEY (id), |
|||
CONSTRAINT %s_unique_file UNIQUE (directoryPart, filePart) |
|||
); |
|||
` |
|||
|
|||
func (s *PostgresStore) createDirectoriesTable() error { |
|||
glog.V(3).Infoln("Creating postgres table if it doesn't exist: ", directoriesTableName) |
|||
|
|||
sqlCreate := fmt.Sprintf(createDirectoryTable, directoriesTableName) |
|||
|
|||
stmt, err := s.db.Prepare(sqlCreate) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer stmt.Close() |
|||
|
|||
_, err = stmt.Exec() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (s *PostgresStore) createFilesTable() error { |
|||
|
|||
glog.V(3).Infoln("Creating postgres table if it doesn't exist: ", filesTableName) |
|||
|
|||
sqlCreate := fmt.Sprintf(createFileTable, filesTableName, filesTableName) |
|||
|
|||
stmt, err := s.db.Prepare(sqlCreate) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer stmt.Close() |
|||
|
|||
_, err = stmt.Exec() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (s *PostgresStore) query(uriPath string) (string, error) { |
|||
directoryPart, filePart := filepath.Split(uriPath) |
|||
sqlStatement := fmt.Sprintf("SELECT fid FROM %s WHERE directoryPart=$1 AND filePart=$2", filesTableName) |
|||
|
|||
row := s.db.QueryRow(sqlStatement, directoryPart, filePart) |
|||
var fid string |
|||
err := row.Scan(&fid) |
|||
|
|||
glog.V(3).Infof("Postgres query -- looking up path '%s' and found id '%s' ", uriPath, fid) |
|||
|
|||
if err != nil { |
|||
return "", err |
|||
} |
|||
return fid, nil |
|||
} |
|||
|
|||
func (s *PostgresStore) update(uriPath string, fid string) error { |
|||
directoryPart, filePart := filepath.Split(uriPath) |
|||
sqlStatement := fmt.Sprintf("UPDATE %s SET fid=$1, updateTime=$2 WHERE directoryPart=$3 AND filePart=$4", filesTableName) |
|||
|
|||
glog.V(3).Infof("Postgres query -- updating path '%s' with id '%s'", uriPath, fid) |
|||
|
|||
res, err := s.db.Exec(sqlStatement, fid, time.Now().Unix(), directoryPart, filePart) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
_, err = res.RowsAffected() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (s *PostgresStore) insert(uriPath string, fid string) error { |
|||
directoryPart, filePart := filepath.Split(uriPath) |
|||
|
|||
existingId, _, _ := s.lookupDirectory(directoryPart) |
|||
if existingId == 0 { |
|||
s.recursiveInsertDirectory(directoryPart) |
|||
} |
|||
|
|||
sqlStatement := fmt.Sprintf("INSERT INTO %s (directoryPart,filePart,fid,createTime) VALUES($1, $2, $3, $4)", filesTableName) |
|||
glog.V(3).Infof("Postgres query -- inserting path '%s' with id '%s'", uriPath, fid) |
|||
|
|||
res, err := s.db.Exec(sqlStatement, directoryPart, filePart, fid, time.Now().Unix()) |
|||
|
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
rows, err := res.RowsAffected() |
|||
if rows != 1 { |
|||
return fmt.Errorf("Postgres insert -- rows affected = %d. Expecting 1", rows) |
|||
} |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (s *PostgresStore) recursiveInsertDirectory(dirPath string) { |
|||
pathParts := strings.Split(dirPath, "/") |
|||
|
|||
var workingPath string = "/" |
|||
for _, part := range pathParts { |
|||
if part == "" { |
|||
continue |
|||
} |
|||
workingPath += (part + "/") |
|||
existingId, _, _ := s.lookupDirectory(workingPath) |
|||
if existingId == 0 { |
|||
s.insertDirectory(workingPath) |
|||
} |
|||
} |
|||
} |
|||
|
|||
func (s *PostgresStore) insertDirectory(dirPath string) { |
|||
pathParts := strings.Split(dirPath, "/") |
|||
|
|||
directoryRoot := "/" |
|||
directoryName := "" |
|||
if len(pathParts) > 1 { |
|||
directoryRoot = strings.Join(pathParts[0:len(pathParts)-2], "/") + "/" |
|||
directoryName = strings.Join(pathParts[len(pathParts)-2:], "/") |
|||
} else if len(pathParts) == 1 { |
|||
directoryRoot = "/" |
|||
directoryName = pathParts[0] + "/" |
|||
} |
|||
sqlInsertDirectoryStatement := fmt.Sprintf("INSERT INTO %s (directoryroot, directoryname) "+ |
|||
"SELECT $1, $2 WHERE NOT EXISTS ( SELECT id FROM %s WHERE directoryroot=$3 AND directoryname=$4 )", |
|||
directoriesTableName, directoriesTableName) |
|||
|
|||
glog.V(4).Infof("Postgres query -- Inserting directory (if it doesn't exist) - root = %s, name = %s", |
|||
directoryRoot, directoryName) |
|||
|
|||
_, err := s.db.Exec(sqlInsertDirectoryStatement, directoryRoot, directoryName, directoryRoot, directoryName) |
|||
if err != nil { |
|||
glog.V(0).Infof("Postgres query -- Error inserting directory - root = %s, name = %s: %s", |
|||
directoryRoot, directoryName, err) |
|||
} |
|||
} |
|||
|
|||
func (s *PostgresStore) delete(uriPath string) error { |
|||
directoryPart, filePart := filepath.Split(uriPath) |
|||
sqlStatement := fmt.Sprintf("DELETE FROM %s WHERE directoryPart=$1 AND filePart=$2", filesTableName) |
|||
|
|||
glog.V(3).Infof("Postgres query -- deleting path '%s'", uriPath) |
|||
|
|||
res, err := s.db.Exec(sqlStatement, directoryPart, filePart) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
_, err = res.RowsAffected() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (s *PostgresStore) lookupDirectory(dirPath string) (DirectoryId, string, error) { |
|||
directoryRoot, directoryName := s.mySplitPath(dirPath) |
|||
|
|||
sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName) |
|||
|
|||
row := s.db.QueryRow(sqlStatement, directoryRoot, directoryName) |
|||
var id DirectoryId |
|||
var dirRoot string |
|||
var dirName string |
|||
err := row.Scan(&id, &dirRoot, &dirName) |
|||
|
|||
glog.V(3).Infof("Postgres lookupDirectory -- looking up directory '%s' and found id '%d', root '%s', name '%s' ", dirPath, id, dirRoot, dirName) |
|||
|
|||
if err != nil { |
|||
return 0, "", err |
|||
} |
|||
return id, filepath.Join(dirRoot, dirName), err |
|||
} |
|||
|
|||
func (s *PostgresStore) findDirectories(dirPath string, limit int) (dirs []filer.DirectoryName, err error) { |
|||
sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName != '' ORDER BY id LIMIT $2", directoriesTableName) |
|||
rows, err := s.db.Query(sqlStatement, dirPath, limit) |
|||
|
|||
if err != nil { |
|||
glog.V(0).Infof("Postgres findDirectories error: %s", err) |
|||
} |
|||
|
|||
if rows != nil { |
|||
defer rows.Close() |
|||
for rows.Next() { |
|||
var id DirectoryId |
|||
var directoryRoot string |
|||
var directoryName string |
|||
|
|||
scanErr := rows.Scan(&id, &directoryRoot, &directoryName) |
|||
if scanErr != nil { |
|||
err = scanErr |
|||
} |
|||
dirs = append(dirs, filer.DirectoryName(directoryName)) |
|||
} |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (s *PostgresStore) safeToDeleteDirectory(dirPath string, recursive bool) bool { |
|||
if recursive { |
|||
return true |
|||
} |
|||
sqlStatement := fmt.Sprintf("SELECT id FROM %s WHERE directoryRoot LIKE $1 LIMIT 1", directoriesTableName) |
|||
row := s.db.QueryRow(sqlStatement, dirPath+"%") |
|||
|
|||
var id DirectoryId |
|||
err := row.Scan(&id) |
|||
if err != nil { |
|||
if err == sql.ErrNoRows { |
|||
return true |
|||
} |
|||
} |
|||
return false |
|||
} |
|||
|
|||
func (s *PostgresStore) mySplitPath(dirPath string) (directoryRoot string, directoryName string) { |
|||
pathParts := strings.Split(dirPath, "/") |
|||
directoryRoot = "/" |
|||
directoryName = "" |
|||
if len(pathParts) > 1 { |
|||
directoryRoot = strings.Join(pathParts[0:len(pathParts)-2], "/") + "/" |
|||
directoryName = strings.Join(pathParts[len(pathParts)-2:], "/") |
|||
} else if len(pathParts) == 1 { |
|||
directoryRoot = "/" |
|||
directoryName = pathParts[0] + "/" |
|||
} |
|||
return directoryRoot, directoryName |
|||
} |
|||
|
|||
func (s *PostgresStore) deleteDirectory(dirPath string, recursive bool) (err error) { |
|||
directoryRoot, directoryName := s.mySplitPath(dirPath) |
|||
|
|||
// delete files
|
|||
sqlStatement := fmt.Sprintf("DELETE FROM %s WHERE directorypart=$1", filesTableName) |
|||
_, err = s.db.Exec(sqlStatement, dirPath) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
// delete specific directory if it is empty or recursive delete was requested
|
|||
safeToDelete := s.safeToDeleteDirectory(dirPath, recursive) |
|||
if safeToDelete { |
|||
sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName) |
|||
_, err = s.db.Exec(sqlStatement, directoryRoot, directoryName) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
if recursive { |
|||
// delete descendant files
|
|||
sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directorypart LIKE $1", filesTableName) |
|||
_, err = s.db.Exec(sqlStatement, dirPath+"%") |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
// delete descendant directories
|
|||
sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directoryRoot LIKE $1", directoriesTableName) |
|||
_, err = s.db.Exec(sqlStatement, dirPath+"%") |
|||
if err != nil { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
return err |
|||
} |
|||
|
|||
func (s *PostgresStore) findFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { |
|||
var rows *sql.Rows = nil |
|||
|
|||
if lastFileName == "" { |
|||
sqlStatement := |
|||
fmt.Sprintf("SELECT fid, directorypart, filepart FROM %s WHERE directorypart=$1 ORDER BY id LIMIT $2", filesTableName) |
|||
rows, err = s.db.Query(sqlStatement, dirPath, limit) |
|||
} else { |
|||
sqlStatement := |
|||
fmt.Sprintf("SELECT fid, directorypart, filepart FROM %s WHERE directorypart=$1 "+ |
|||
"AND id > (SELECT id FROM %s WHERE directoryPart=$2 AND filepart=$3) ORDER BY id LIMIT $4", |
|||
filesTableName, filesTableName) |
|||
_, lastFileNameName := filepath.Split(lastFileName) |
|||
rows, err = s.db.Query(sqlStatement, dirPath, dirPath, lastFileNameName, limit) |
|||
} |
|||
|
|||
if err != nil { |
|||
glog.V(0).Infof("Postgres find files error: %s", err) |
|||
} |
|||
|
|||
if rows != nil { |
|||
defer rows.Close() |
|||
|
|||
for rows.Next() { |
|||
var fid filer.FileId |
|||
var directoryPart string |
|||
var filePart string |
|||
|
|||
scanErr := rows.Scan(&fid, &directoryPart, &filePart) |
|||
if scanErr != nil { |
|||
err = scanErr |
|||
} |
|||
|
|||
files = append(files, filer.FileEntry{Name: filepath.Join(directoryPart, filePart), Id: fid}) |
|||
if len(files) >= limit { |
|||
break |
|||
} |
|||
} |
|||
} |
|||
|
|||
glog.V(3).Infof("Postgres findFiles -- looking up files under '%s' and found %d files. Limit=%d, lastFileName=%s", |
|||
dirPath, len(files), limit, lastFileName) |
|||
|
|||
return files, err |
|||
} |
@ -1,149 +0,0 @@ |
|||
package postgres_store |
|||
|
|||
import ( |
|||
"database/sql" |
|||
"errors" |
|||
"fmt" |
|||
"sync" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
|
|||
_ "github.com/lib/pq" |
|||
_ "path/filepath" |
|||
"path/filepath" |
|||
) |
|||
|
|||
const ( |
|||
default_maxIdleConnections = 100 |
|||
default_maxOpenConnections = 50 |
|||
filesTableName = "files" |
|||
directoriesTableName = "directories" |
|||
) |
|||
|
|||
var ( |
|||
_init_db sync.Once |
|||
_db_connection *sql.DB |
|||
) |
|||
|
|||
type PostgresConf struct { |
|||
User string |
|||
Password string |
|||
HostName string |
|||
Port int |
|||
DataBase string |
|||
SslMode string |
|||
MaxIdleConnections int |
|||
MaxOpenConnections int |
|||
} |
|||
|
|||
type PostgresStore struct { |
|||
db *sql.DB |
|||
server string |
|||
user string |
|||
password string |
|||
} |
|||
|
|||
func (s *PostgresStore) CreateFile(fullFilePath string, fid string) (err error) { |
|||
|
|||
var old_fid string |
|||
if old_fid, err = s.query(fullFilePath); err != nil && err != sql.ErrNoRows { |
|||
return fmt.Errorf("PostgresStore Put operation failed when querying path %s: err is %v", fullFilePath, err) |
|||
} else { |
|||
if len(old_fid) == 0 { |
|||
err = s.insert(fullFilePath, fid) |
|||
if err != nil { |
|||
return fmt.Errorf("PostgresStore Put operation failed when inserting path %s with fid %s : err is %v", fullFilePath, fid, err) |
|||
} |
|||
} else { |
|||
err = s.update(fullFilePath, fid) |
|||
if err != nil { |
|||
return fmt.Errorf("PostgresStore Put operation failed when updating path %s with fid %s : err is %v", fullFilePath, fid, err) |
|||
} |
|||
} |
|||
} |
|||
return |
|||
|
|||
} |
|||
|
|||
func (s *PostgresStore) FindFile(fullFilePath string) (fid string, err error) { |
|||
|
|||
if err != nil { |
|||
return "", fmt.Errorf("PostgresStore Get operation can not parse file path %s: err is %v", fullFilePath, err) |
|||
} |
|||
fid, err = s.query(fullFilePath) |
|||
|
|||
return fid, err |
|||
} |
|||
|
|||
func (s *PostgresStore) LookupDirectoryEntry(dirPath string, name string) (found bool, fileId string, err error) { |
|||
fullPath := filepath.Join(dirPath, name) |
|||
if fileId, err = s.FindFile(fullPath); err == nil { |
|||
return true, fileId, nil |
|||
} |
|||
if _, _, err := s.lookupDirectory(fullPath); err == nil { |
|||
return true, "", err |
|||
} |
|||
return false, "", err |
|||
} |
|||
|
|||
func (s *PostgresStore) DeleteFile(fullFilePath string) (fid string, err error) { |
|||
if err != nil { |
|||
return "", fmt.Errorf("PostgresStore Delete operation can not parse file path %s: err is %v", fullFilePath, err) |
|||
} |
|||
if fid, err = s.query(fullFilePath); err != nil { |
|||
return "", fmt.Errorf("PostgresStore Delete operation failed when querying path %s: err is %v", fullFilePath, err) |
|||
} else if fid == "" { |
|||
return "", nil |
|||
} |
|||
if err = s.delete(fullFilePath); err != nil { |
|||
return "", fmt.Errorf("PostgresStore Delete operation failed when deleting path %s: err is %v", fullFilePath, err) |
|||
} else { |
|||
return "", nil |
|||
} |
|||
} |
|||
|
|||
func (s *PostgresStore) ListDirectories(dirPath string) (dirs []filer.DirectoryName, err error) { |
|||
|
|||
dirs, err = s.findDirectories(dirPath, 1000) |
|||
|
|||
glog.V(3).Infof("Postgres ListDirs = found %d directories under %s", len(dirs), dirPath) |
|||
|
|||
return dirs, err |
|||
} |
|||
|
|||
func (s *PostgresStore) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { |
|||
files, err = s.findFiles(dirPath, lastFileName, limit) |
|||
return files, err |
|||
} |
|||
|
|||
func (s *PostgresStore) DeleteDirectory(dirPath string, recursive bool) (err error) { |
|||
err = s.deleteDirectory(dirPath, recursive) |
|||
if err != nil { |
|||
glog.V(0).Infof("Error in Postgres DeleteDir '%s' (recursive = '%t'): %s", err) |
|||
} |
|||
return err |
|||
} |
|||
|
|||
func (s *PostgresStore) Move(fromPath string, toPath string) (err error) { |
|||
glog.V(3).Infoln("Calling posgres_store Move") |
|||
return errors.New("Move is not yet implemented for the PostgreSQL store.") |
|||
} |
|||
|
|||
//func NewPostgresStore(master string, confs []PostgresConf, isSharding bool, shardCount int) *PostgresStore {
|
|||
func NewPostgresStore(master string, conf PostgresConf) *PostgresStore { |
|||
pg := &PostgresStore{ |
|||
db: getDbConnection(conf), |
|||
} |
|||
|
|||
pg.createDirectoriesTable() |
|||
|
|||
if err := pg.createFilesTable(); err != nil { |
|||
fmt.Printf("create table failed %v", err) |
|||
} |
|||
|
|||
return pg |
|||
} |
|||
|
|||
func (s *PostgresStore) Close() { |
|||
s.db.Close() |
|||
} |
@ -1,50 +0,0 @@ |
|||
package redis_store |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
|
|||
"github.com/go-redis/redis" |
|||
) |
|||
|
|||
type RedisStore struct { |
|||
Client *redis.Client |
|||
} |
|||
|
|||
func NewRedisStore(hostPort string, password string, database int) *RedisStore { |
|||
client := redis.NewClient(&redis.Options{ |
|||
Addr: hostPort, |
|||
Password: password, |
|||
DB: database, |
|||
}) |
|||
return &RedisStore{Client: client} |
|||
} |
|||
|
|||
func (s *RedisStore) Get(fullFileName string) (fid string, err error) { |
|||
fid, err = s.Client.Get(fullFileName).Result() |
|||
if err == redis.Nil { |
|||
err = filer.ErrNotFound |
|||
} |
|||
return fid, err |
|||
} |
|||
func (s *RedisStore) Put(fullFileName string, fid string) (err error) { |
|||
_, err = s.Client.Set(fullFileName, fid, 0).Result() |
|||
if err == redis.Nil { |
|||
err = nil |
|||
} |
|||
return err |
|||
} |
|||
|
|||
// Currently the fid is not returned
|
|||
func (s *RedisStore) Delete(fullFileName string) (err error) { |
|||
_, err = s.Client.Del(fullFileName).Result() |
|||
if err == redis.Nil { |
|||
err = nil |
|||
} |
|||
return err |
|||
} |
|||
|
|||
func (s *RedisStore) Close() { |
|||
if s.Client != nil { |
|||
s.Client.Close() |
|||
} |
|||
} |
@ -1,45 +0,0 @@ |
|||
There are two main components of a filer: directories and files. |
|||
|
|||
My previous approach was to use some sequance number to generate directoryId. |
|||
However, this is not scalable. The id generation itself is a bottleneck. |
|||
It needs careful locking and deduplication checking to get a directoryId. |
|||
|
|||
In a second design, each directory is deterministically mapped to UUID version 3, |
|||
which uses MD5 to map a tuple of <uuid, name> to a version 3 UUID. |
|||
However, this UUID3 approach is logically the same as storing the full path. |
|||
|
|||
Storing the full path is the simplest design. |
|||
|
|||
separator is a special byte, 0x00. |
|||
|
|||
When writing a file: |
|||
<file parent full path, separator, file name> => fildId, file properties |
|||
For folders: |
|||
The filer breaks the directory path into folders. |
|||
for each folder: |
|||
if it is not in cache: |
|||
check whether the folder is created in the KVS, if not: |
|||
set <folder parent full path, separator, folder name> => directory properties |
|||
if no permission for the folder: |
|||
break |
|||
|
|||
|
|||
The filer caches the most recently used folder permissions with a TTL. |
|||
So any folder permission change needs to wait TTL interval to take effect. |
|||
|
|||
|
|||
|
|||
When listing the directory: |
|||
prefix scan of using (the folder full path + separator) as the prefix |
|||
|
|||
The downside: |
|||
1. Rename a folder will need to recursively process all sub folders and files. |
|||
2. Move a folder will need to recursively process all sub folders and files. |
|||
So these operations are not allowed if the folder is not empty. |
|||
|
|||
Allowing: |
|||
1. Rename a file |
|||
2. Move a file to a different folder |
|||
3. Delete an empty folder |
|||
|
|||
|
Write
Preview
Loading…
Cancel
Save
Reference in new issue