Browse Source

add filer support

pull/361/head
霍晓栋 8 years ago
parent
commit
b0035747e3
  1. 3
      weed/command/filer.go
  2. 2
      weed/command/server.go
  3. 11
      weed/filer/mysql_store/filer_mapping.sql
  4. 224
      weed/filer/mysql_store/mysql_store.go
  5. 60
      weed/filer/mysql_store/mysql_store_test.go
  6. 38
      weed/server/filer_server.go

3
weed/command/filer.go

@ -24,6 +24,7 @@ type FilerOptions struct {
dir *string dir *string
redirectOnRead *bool redirectOnRead *bool
disableDirListing *bool disableDirListing *bool
confFile *string
maxMB *int maxMB *int
secretKey *string secretKey *string
cassandra_server *string cassandra_server *string
@ -43,6 +44,7 @@ func init() {
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
f.confFile = cmdFiler.Flag.String("confFile", "", "json encoded filer conf file")
f.maxMB = cmdFiler.Flag.Int("maxMB", 0, "split files larger than the limit") f.maxMB = cmdFiler.Flag.Int("maxMB", 0, "split files larger than the limit")
f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server") f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server")
f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server") f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
@ -84,6 +86,7 @@ func runFiler(cmd *Command, args []string) bool {
r := http.NewServeMux() r := http.NewServeMux()
_, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection, _, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection,
*f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing, *f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing,
*f.confFile,
*f.maxMB, *f.maxMB,
*f.secretKey, *f.secretKey,
*f.cassandra_server, *f.cassandra_keyspace, *f.cassandra_server, *f.cassandra_keyspace,

2
weed/command/server.go

@ -86,6 +86,7 @@ func init() {
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
filerOptions.confFile = cmdServer.Flag.String("filer.confFile", "", "json encoded filer conf file")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 0, "split files larger than the limit") filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 0, "split files larger than the limit")
filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server") filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server")
filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
@ -170,6 +171,7 @@ func runServer(cmd *Command, args []string) bool {
_, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, _, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
*filerOptions.defaultReplicaPlacement, *filerOptions.defaultReplicaPlacement,
*filerOptions.redirectOnRead, *filerOptions.disableDirListing, *filerOptions.redirectOnRead, *filerOptions.disableDirListing,
*filerOptions.confFile,
*filerOptions.maxMB, *filerOptions.maxMB,
*filerOptions.secretKey, *filerOptions.secretKey,
*filerOptions.cassandra_server, *filerOptions.cassandra_keyspace, *filerOptions.cassandra_server, *filerOptions.cassandra_keyspace,

11
weed/filer/mysql_store/filer_mapping.sql

@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS `filer_mapping` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`uriPath` char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
`fid` char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
`createTime` int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
`updateTime` int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
`remark` varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
`status` tinyint(2) DEFAULT '1' COMMENT 'resource status',
PRIMARY KEY (`id`),
UNIQUE KEY `index_uriPath` (`uriPath`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

224
weed/filer/mysql_store/mysql_store.go

@ -0,0 +1,224 @@
package mysql_store
import (
"database/sql"
"fmt"
"hash/crc32"
"sync"
"time"
_ "github.com/go-sql-driver/mysql"
)
const (
sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
maxIdleConnections = 100
maxOpenConnections = 50
maxTableNums = 1024
tableName = "filer_mapping"
)
var (
_init_db sync.Once
_db_connections []*sql.DB
)
type MySqlConf struct {
User string
Password string
HostName string
Port int
DataBase string
}
type MySqlStore struct {
dbs []*sql.DB
}
func getDbConnection(confs []MySqlConf) []*sql.DB {
_init_db.Do(func() {
for _, conf := range confs {
sqlUrl := fmt.Sprintf(sqlUrl, conf.User, conf.Password, conf.HostName, conf.Port, conf.DataBase)
var dbErr error
_db_connection, dbErr := sql.Open("mysql", sqlUrl)
if dbErr != nil {
_db_connection.Close()
_db_connection = nil
panic(dbErr)
}
_db_connection.SetMaxIdleConns(maxIdleConnections)
_db_connection.SetMaxOpenConns(maxOpenConnections)
_db_connections = append(_db_connections, _db_connection)
}
})
return _db_connections
}
func NewMysqlStore(confs []MySqlConf) *MySqlStore {
ms := &MySqlStore{
dbs: getDbConnection(confs),
}
for _, db := range ms.dbs {
for i := 0; i < maxTableNums; i++ {
if err := ms.createTables(db, tableName, i); err != nil {
fmt.Printf("create table failed %s", err.Error())
}
}
}
return ms
}
func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) {
hash_value := crc32.ChecksumIEEE([]byte(fullFileName))
instance_offset = int(hash_value) % len(s.dbs)
table_postfix = int(hash_value) % maxTableNums
return
}
func (s *MySqlStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) {
instance_offset, table_postfix := s.hash(path)
instanceId = instance_offset
tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix)
return
}
func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) {
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
if err != nil {
return "", err
}
fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName)
if err == sql.ErrNoRows {
//Could not found
err = nil
}
return fid, err
}
func (s *MySqlStore) Put(fullFilePath string, fid string) (err error) {
var tableFullName string
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
if err != nil {
return err
}
if old_fid, localErr := s.query(fullFilePath, s.dbs[instance_offset], tableFullName); localErr != nil && localErr != sql.ErrNoRows {
err = localErr
return
} else {
if len(old_fid) == 0 {
err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
} else {
err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
}
}
return
}
func (s *MySqlStore) Delete(fullFilePath string) (err error) {
var fid string
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
if err != nil {
return err
}
if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
return err
} else if fid == "" {
return nil
}
if err := s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
return err
} else {
return nil
}
}
func (s *MySqlStore) Close() {
for _, db := range s.dbs {
db.Close()
}
}
var createTable = `
CREATE TABLE IF NOT EXISTS %s_%04d (
id bigint(20) NOT NULL AUTO_INCREMENT,
uriPath char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
fid char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
createTime int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
updateTime int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
remark varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
status tinyint(2) DEFAULT '1' COMMENT 'resource status',
PRIMARY KEY (id),
UNIQUE KEY index_uriPath (uriPath)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`
func (s *MySqlStore) createTables(db *sql.DB, tableName string, postfix int) error {
stmt, err := db.Prepare(fmt.Sprintf(createTable, tableName, postfix))
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec()
if err != nil {
return err
}
return nil
}
func (s *MySqlStore) query(uriPath string, db *sql.DB, tableName string) (string, error) {
sqlStatement := "SELECT fid FROM %s WHERE uriPath=?"
row := db.QueryRow(fmt.Sprintf(sqlStatement, tableName), uriPath)
var fid string
err := row.Scan(&fid)
if err != nil {
return "", err
}
return fid, nil
}
func (s *MySqlStore) update(uriPath string, fid string, db *sql.DB, tableName string) error {
sqlStatement := "UPDATE %s SET fid=?, updateTime=? WHERE uriPath=?"
res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), fid, time.Now().Unix(), uriPath)
if err != nil {
return err
}
_, err = res.RowsAffected()
if err != nil {
return err
}
return nil
}
func (s *MySqlStore) insert(uriPath string, fid string, db *sql.DB, tableName string) error {
sqlStatement := "INSERT INTO %s (uriPath,fid,createTime) VALUES(?,?,?)"
res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath, fid, time.Now().Unix())
if err != nil {
return err
}
_, err = res.RowsAffected()
if err != nil {
return err
}
return nil
}
func (s *MySqlStore) delete(uriPath string, db *sql.DB, tableName string) error {
sqlStatement := "DELETE FROM %s WHERE uriPath=?"
res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath)
if err != nil {
return err
}
_, err = res.RowsAffected()
if err != nil {
return err
}
return nil
}

60
weed/filer/mysql_store/mysql_store_test.go

@ -0,0 +1,60 @@
package mysql_store
import (
"encoding/json"
"hash/crc32"
"testing"
)
/*
To improve performance when storing billion of files, you could shar
At each mysql instance, we will try to create 1024 tables if not exist, table name will be something like:
filer_mapping_0000
filer_mapping_0001
.....
filer_mapping_1023
sample conf should be
>$cat filer_conf.json
{
"mysql": [
{
"User": "root",
"Password": "root",
"HostName": "127.0.0.1",
"Port": 3306,
"DataBase": "seaweedfs"
},
{
"User": "root",
"Password": "root",
"HostName": "127.0.0.2",
"Port": 3306,
"DataBase": "seaweedfs"
}
]
}
*/
func TestGenerateMysqlConf(t *testing.T) {
var conf MySqlConf
conf = append(conf, MySqlInstConf{
User: "root",
Password: "root",
HostName: "localhost",
Port: 3306,
DataBase: "seaweedfs",
})
body, err := json.Marshal(conf)
if err != nil {
t.Errorf("json encoding err %s", err.Error())
}
t.Logf("json output is %s", string(body))
}
func TestCRC32FullPathName(t *testing.T) {
fullPathName := "/prod-bucket/law632191483895612493300-signed.pdf"
hash_value := crc32.ChecksumIEEE([]byte(fullPathName))
table_postfix := int(hash_value) % 1024
t.Logf("table postfix %d", table_postfix)
}

38
weed/server/filer_server.go

@ -1,8 +1,10 @@
package weed_server package weed_server
import ( import (
"encoding/json"
"math/rand" "math/rand"
"net/http" "net/http"
"os"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -11,6 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer/cassandra_store" "github.com/chrislusf/seaweedfs/weed/filer/cassandra_store"
"github.com/chrislusf/seaweedfs/weed/filer/embedded_filer" "github.com/chrislusf/seaweedfs/weed/filer/embedded_filer"
"github.com/chrislusf/seaweedfs/weed/filer/flat_namespace" "github.com/chrislusf/seaweedfs/weed/filer/flat_namespace"
"github.com/chrislusf/seaweedfs/weed/filer/mysql_store"
"github.com/chrislusf/seaweedfs/weed/filer/redis_store" "github.com/chrislusf/seaweedfs/weed/filer/redis_store"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
@ -18,6 +21,25 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
type filerConf struct {
MysqlConf []mysql_store.MySqlConf `json:"mysql"`
}
func parseConfFile(confPath string) (*filerConf, error) {
var setting filerConf
configFile, err := os.Open(confPath)
defer configFile.Close()
if err != nil {
return nil, err
}
jsonParser := json.NewDecoder(configFile)
if err = jsonParser.Decode(&setting); err != nil {
return nil, err
}
return &setting, nil
}
type FilerServer struct { type FilerServer struct {
port string port string
master string master string
@ -34,6 +56,7 @@ type FilerServer struct {
func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string, func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string,
replication string, redirectOnRead bool, disableDirListing bool, replication string, redirectOnRead bool, disableDirListing bool,
confFile string,
maxMB int, maxMB int,
secret string, secret string,
cassandra_server string, cassandra_keyspace string, cassandra_server string, cassandra_keyspace string,
@ -49,7 +72,20 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st
port: ip + ":" + strconv.Itoa(port), port: ip + ":" + strconv.Itoa(port),
} }
if cassandra_server != "" {
var setting *filerConf
if confFile != "" {
setting, err = parseConfFile(confFile)
if err != nil {
return nil, err
}
} else {
setting = new(filerConf)
}
if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 {
mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf)
fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store)
} else if cassandra_server != "" {
cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server) cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
if err != nil { if err != nil {
glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err) glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err)

Loading…
Cancel
Save