Browse Source

Merge pull request #361 from hxiaodon/master

add mysql filer support
pull/366/head
Chris Lu 8 years ago
committed by GitHub
parent
commit
99e47fc5fc
  1. 3
      weed/command/filer.go
  2. 2
      weed/command/server.go
  3. 2
      weed/filer/cassandra_store/cassandra_store.go
  4. 4
      weed/filer/embedded_filer/files_in_leveldb.go
  5. 6
      weed/filer/filer.go
  6. 67
      weed/filer/mysql_store/README.md
  7. 270
      weed/filer/mysql_store/mysql_store.go
  8. 30
      weed/filer/mysql_store/mysql_store_test.go
  9. 4
      weed/filer/redis_store/redis_store.go
  10. 39
      weed/server/filer_server.go
  11. 3
      weed/server/filer_server_handlers_read.go
  12. 12
      weed/server/filer_server_handlers_write.go

3
weed/command/filer.go

@ -24,6 +24,7 @@ type FilerOptions struct {
dir *string
redirectOnRead *bool
disableDirListing *bool
confFile *string
maxMB *int
secretKey *string
cassandra_server *string
@ -43,6 +44,7 @@ func init() {
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
f.confFile = cmdFiler.Flag.String("confFile", "", "json encoded filer conf file")
f.maxMB = cmdFiler.Flag.Int("maxMB", 0, "split files larger than the limit")
f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server")
f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
@ -84,6 +86,7 @@ func runFiler(cmd *Command, args []string) bool {
r := http.NewServeMux()
_, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection,
*f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing,
*f.confFile,
*f.maxMB,
*f.secretKey,
*f.cassandra_server, *f.cassandra_keyspace,

2
weed/command/server.go

@ -86,6 +86,7 @@ func init() {
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
filerOptions.confFile = cmdServer.Flag.String("filer.confFile", "", "json encoded filer conf file")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 0, "split files larger than the limit")
filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server")
filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
@ -170,6 +171,7 @@ func runServer(cmd *Command, args []string) bool {
_, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
*filerOptions.defaultReplicaPlacement,
*filerOptions.redirectOnRead, *filerOptions.disableDirListing,
*filerOptions.confFile,
*filerOptions.maxMB,
*filerOptions.secretKey,
*filerOptions.cassandra_server, *filerOptions.cassandra_keyspace,

2
weed/filer/cassandra_store/cassandra_store.go

@ -3,6 +3,7 @@ package cassandra_store
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/gocql/gocql"
@ -59,6 +60,7 @@ func (c *CassandraStore) Get(fullFileName string) (fid string, err error) {
fullFileName).Consistency(gocql.One).Scan(&output); err != nil {
if err != gocql.ErrNotFound {
glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err)
return "", filer.ErrNotFound
}
}
if len(output) == 0 {

4
weed/filer/embedded_filer/files_in_leveldb.go

@ -53,7 +53,9 @@ func (fl *FileListInLevelDb) DeleteFile(dirId filer.DirectoryId, fileName string
}
func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string) (fid string, err error) {
data, e := fl.db.Get(genKey(dirId, fileName), nil)
if e != nil {
if e == leveldb.ErrNotFound {
return "", filer.ErrNotFound
} else if e != nil {
return "", e
}
return string(data), nil

6
weed/filer/filer.go

@ -1,5 +1,9 @@
package filer
import (
"errors"
)
type FileId string //file id in SeaweedFS
type FileEntry struct {
@ -26,3 +30,5 @@ type Filer interface {
DeleteDirectory(dirPath string, recursive bool) (err error)
Move(fromPath string, toPath string) (err error)
}
var ErrNotFound = errors.New("filer: no entry is found in filer store")

67
weed/filer/mysql_store/README.md

@ -0,0 +1,67 @@
#MySQL filer mapping store
## Schema format
Basically, uriPath and fid are the key elements stored in MySQL. In view of the optimization and user's usage,
adding primary key with integer type and involving createTime, updateTime, status fields should be somewhat meaningful.
Of course, you could customize the schema per your concretely circumstance freely.
<pre><code>
CREATE TABLE IF NOT EXISTS `filer_mapping` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`uriPath` char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
`fid` char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
`createTime` int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
`updateTime` int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
`remark` varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
`status` tinyint(2) DEFAULT '1' COMMENT 'resource status',
PRIMARY KEY (`id`),
UNIQUE KEY `index_uriPath` (`uriPath`)
) DEFAULT CHARSET=utf8;
</code></pre>
The MySQL 's config params is not added into the weed command option as other stores(redis,cassandra). Instead,
We created a config file(json format) for them. TOML,YAML or XML also should be OK. But TOML and YAML need import thirdparty package
while XML is a little bit complex.
The sample config file's content is below:
<pre><code>
{
"mysql": [
{
"User": "root",
"Password": "root",
"HostName": "127.0.0.1",
"Port": 3306,
"DataBase": "seaweedfs"
},
{
"User": "root",
"Password": "root",
"HostName": "127.0.0.2",
"Port": 3306,
"DataBase": "seaweedfs"
}
],
"IsSharding":true,
"ShardCount":1024
}
</code></pre>
The "mysql" field in above conf file is an array which include all mysql instances you prepared to store sharding data.
1. If one mysql instance is enough, just keep one instance in "mysql" field.
2. If table sharding at a specific mysql instance is needed , mark "IsSharding" field with true and specify total table sharding numbers using "ShardCount" field.
3. If the mysql service could be auto scaled transparently in your environment, just config one mysql instance(usually it's a frondend proxy or VIP),and mark "IsSharding" with false value
4. If you prepare more than one mysql instance and have no plan to use table sharding for any instance(mark isSharding with false), instance sharding will still be done implicitly

270
weed/filer/mysql_store/mysql_store.go

@ -0,0 +1,270 @@
package mysql_store
import (
"database/sql"
"fmt"
"hash/crc32"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
_ "github.com/go-sql-driver/mysql"
)
const (
sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
default_maxIdleConnections = 100
default_maxOpenConnections = 50
default_maxTableNums = 1024
tableName = "filer_mapping"
)
var (
_init_db sync.Once
_db_connections []*sql.DB
)
type MySqlConf struct {
User string
Password string
HostName string
Port int
DataBase string
MaxIdleConnections int
MaxOpenConnections int
}
type ShardingConf struct {
IsSharding bool `json:"isSharding"`
ShardCount int `json:"shardCount"`
}
type MySqlStore struct {
dbs []*sql.DB
isSharding bool
shardCount int
}
func getDbConnection(confs []MySqlConf) []*sql.DB {
_init_db.Do(func() {
for _, conf := range confs {
sqlUrl := fmt.Sprintf(sqlUrl, conf.User, conf.Password, conf.HostName, conf.Port, conf.DataBase)
var dbErr error
_db_connection, dbErr := sql.Open("mysql", sqlUrl)
if dbErr != nil {
_db_connection.Close()
_db_connection = nil
panic(dbErr)
}
var maxIdleConnections, maxOpenConnections int
if conf.MaxIdleConnections != 0 {
maxIdleConnections = conf.MaxIdleConnections
} else {
maxIdleConnections = default_maxIdleConnections
}
if conf.MaxOpenConnections != 0 {
maxOpenConnections = conf.MaxOpenConnections
} else {
maxOpenConnections = default_maxOpenConnections
}
_db_connection.SetMaxIdleConns(maxIdleConnections)
_db_connection.SetMaxOpenConns(maxOpenConnections)
_db_connections = append(_db_connections, _db_connection)
}
})
return _db_connections
}
func NewMysqlStore(confs []MySqlConf, isSharding bool, shardCount int) *MySqlStore {
ms := &MySqlStore{
dbs: getDbConnection(confs),
isSharding: isSharding,
shardCount: shardCount,
}
for _, db := range ms.dbs {
if !isSharding {
ms.shardCount = 1
} else {
if ms.shardCount == 0 {
ms.shardCount = default_maxTableNums
}
}
for i := 0; i < ms.shardCount; i++ {
if err := ms.createTables(db, tableName, i); err != nil {
fmt.Printf("create table failed %v", err)
}
}
}
return ms
}
func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) {
hash_value := crc32.ChecksumIEEE([]byte(fullFileName))
instance_offset = int(hash_value) % len(s.dbs)
table_postfix = int(hash_value) % s.shardCount
return
}
func (s *MySqlStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) {
instance_offset, table_postfix := s.hash(path)
instanceId = instance_offset
if s.isSharding {
tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix)
} else {
tableFullName = tableName
}
return
}
func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) {
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
if err != nil {
return "", fmt.Errorf("MySqlStore Get operation can not parse file path %s: err is %v", fullFilePath, err)
}
fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName)
if err == sql.ErrNoRows {
//Could not found
err = filer.ErrNotFound
}
return fid, err
}
func (s *MySqlStore) Put(fullFilePath string, fid string) (err error) {
var tableFullName string
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
if err != nil {
return fmt.Errorf("MySqlStore Put operation can not parse file path %s: err is %v", fullFilePath, err)
}
var old_fid string
if old_fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil && err != sql.ErrNoRows {
return fmt.Errorf("MySqlStore Put operation failed when querying path %s: err is %v", fullFilePath, err)
} else {
if len(old_fid) == 0 {
err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
err = fmt.Errorf("MySqlStore Put operation failed when inserting path %s with fid %s : err is %v", fullFilePath, fid, err)
} else {
err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
err = fmt.Errorf("MySqlStore Put operation failed when updating path %s with fid %s : err is %v", fullFilePath, fid, err)
}
}
return
}
func (s *MySqlStore) Delete(fullFilePath string) (err error) {
var fid string
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
if err != nil {
return fmt.Errorf("MySqlStore Delete operation can not parse file path %s: err is %v", fullFilePath, err)
}
if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
return fmt.Errorf("MySqlStore Delete operation failed when querying path %s: err is %v", fullFilePath, err)
} else if fid == "" {
return nil
}
if err = s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
return fmt.Errorf("MySqlStore Delete operation failed when deleting path %s: err is %v", fullFilePath, err)
} else {
return nil
}
}
func (s *MySqlStore) Close() {
for _, db := range s.dbs {
db.Close()
}
}
var createTable = `
CREATE TABLE IF NOT EXISTS %s (
id bigint(20) NOT NULL AUTO_INCREMENT,
uriPath char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
fid char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
createTime int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
updateTime int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
remark varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
status tinyint(2) DEFAULT '1' COMMENT 'resource status',
PRIMARY KEY (id),
UNIQUE KEY index_uriPath (uriPath)
) DEFAULT CHARSET=utf8;
`
func (s *MySqlStore) createTables(db *sql.DB, tableName string, postfix int) error {
var realTableName string
if s.isSharding {
realTableName = fmt.Sprintf("%s_%4d", tableName, postfix)
} else {
realTableName = tableName
}
stmt, err := db.Prepare(fmt.Sprintf(createTable, realTableName))
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec()
if err != nil {
return err
}
return nil
}
func (s *MySqlStore) query(uriPath string, db *sql.DB, tableName string) (string, error) {
sqlStatement := "SELECT fid FROM %s WHERE uriPath=?"
row := db.QueryRow(fmt.Sprintf(sqlStatement, tableName), uriPath)
var fid string
err := row.Scan(&fid)
if err != nil {
return "", err
}
return fid, nil
}
func (s *MySqlStore) update(uriPath string, fid string, db *sql.DB, tableName string) error {
sqlStatement := "UPDATE %s SET fid=?, updateTime=? WHERE uriPath=?"
res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), fid, time.Now().Unix(), uriPath)
if err != nil {
return err
}
_, err = res.RowsAffected()
if err != nil {
return err
}
return nil
}
func (s *MySqlStore) insert(uriPath string, fid string, db *sql.DB, tableName string) error {
sqlStatement := "INSERT INTO %s (uriPath,fid,createTime) VALUES(?,?,?)"
res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath, fid, time.Now().Unix())
if err != nil {
return err
}
_, err = res.RowsAffected()
if err != nil {
return err
}
return nil
}
func (s *MySqlStore) delete(uriPath string, db *sql.DB, tableName string) error {
sqlStatement := "DELETE FROM %s WHERE uriPath=?"
res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath)
if err != nil {
return err
}
_, err = res.RowsAffected()
if err != nil {
return err
}
return nil
}

30
weed/filer/mysql_store/mysql_store_test.go

@ -0,0 +1,30 @@
package mysql_store
import (
"encoding/json"
"hash/crc32"
"testing"
)
func TestGenerateMysqlConf(t *testing.T) {
var conf []MySqlConf
conf = append(conf, MySqlConf{
User: "root",
Password: "root",
HostName: "localhost",
Port: 3306,
DataBase: "seaweedfs",
})
body, err := json.Marshal(conf)
if err != nil {
t.Errorf("json encoding err %s", err.Error())
}
t.Logf("json output is %s", string(body))
}
func TestCRC32FullPathName(t *testing.T) {
fullPathName := "/prod-bucket/law632191483895612493300-signed.pdf"
hash_value := crc32.ChecksumIEEE([]byte(fullPathName))
table_postfix := int(hash_value) % 1024
t.Logf("table postfix %d", table_postfix)
}

4
weed/filer/redis_store/redis_store.go

@ -1,6 +1,8 @@
package redis_store
import (
"github.com/chrislusf/seaweedfs/weed/filer"
redis "gopkg.in/redis.v2"
)
@ -20,7 +22,7 @@ func NewRedisStore(hostPort string, password string, database int) *RedisStore {
func (s *RedisStore) Get(fullFileName string) (fid string, err error) {
fid, err = s.Client.Get(fullFileName).Result()
if err == redis.Nil {
err = nil
err = filer.ErrNotFound
}
return fid, err
}

39
weed/server/filer_server.go

@ -1,8 +1,10 @@
package weed_server
import (
"encoding/json"
"math/rand"
"net/http"
"os"
"strconv"
"sync"
"time"
@ -11,6 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer/cassandra_store"
"github.com/chrislusf/seaweedfs/weed/filer/embedded_filer"
"github.com/chrislusf/seaweedfs/weed/filer/flat_namespace"
"github.com/chrislusf/seaweedfs/weed/filer/mysql_store"
"github.com/chrislusf/seaweedfs/weed/filer/redis_store"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
@ -18,6 +21,26 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
type filerConf struct {
MysqlConf []mysql_store.MySqlConf `json:"mysql"`
mysql_store.ShardingConf
}
func parseConfFile(confPath string) (*filerConf, error) {
var setting filerConf
configFile, err := os.Open(confPath)
defer configFile.Close()
if err != nil {
return nil, err
}
jsonParser := json.NewDecoder(configFile)
if err = jsonParser.Decode(&setting); err != nil {
return nil, err
}
return &setting, nil
}
type FilerServer struct {
port string
master string
@ -34,6 +57,7 @@ type FilerServer struct {
func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string,
replication string, redirectOnRead bool, disableDirListing bool,
confFile string,
maxMB int,
secret string,
cassandra_server string, cassandra_keyspace string,
@ -49,7 +73,20 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st
port: ip + ":" + strconv.Itoa(port),
}
if cassandra_server != "" {
var setting *filerConf
if confFile != "" {
setting, err = parseConfFile(confFile)
if err != nil {
return nil, err
}
} else {
setting = new(filerConf)
}
if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 {
mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf, setting.IsSharding, setting.ShardCount)
fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store)
} else if cassandra_server != "" {
cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
if err != nil {
glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err)

3
weed/server/filer_server_handlers_read.go

@ -7,6 +7,7 @@ import (
"strconv"
"strings"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
ui "github.com/chrislusf/seaweedfs/weed/server/filer_ui"
@ -87,7 +88,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
fileId, err := fs.filer.FindFile(r.URL.Path)
if err == leveldb.ErrNotFound {
if err == filer.ErrNotFound {
glog.V(3).Infoln("Not found in db", r.URL.Path)
w.WriteHeader(http.StatusNotFound)
return

12
weed/server/filer_server_handlers_write.go

@ -15,11 +15,11 @@ import (
"net/url"
"strings"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
"path"
"strconv"
)
@ -73,17 +73,17 @@ func makeFormData(filename, mimeType string, content io.Reader) (formData io.Rea
}
func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) {
if fileId, err = fs.filer.FindFile(path); err != nil && err != leveldb.ErrNotFound {
if fileId, err = fs.filer.FindFile(path); err != nil && err != filer.ErrNotFound {
glog.V(0).Infoln("failing to find path in filer store", path, err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err)
return
} else if fileId != "" && err == nil {
urlLocation, err = operation.LookupFileId(fs.getMasterNode(), fileId)
if err != nil {
glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error())
w.WriteHeader(http.StatusNotFound)
return
}
} else if fileId == "" && err == filer.ErrNotFound {
w.WriteHeader(http.StatusNotFound)
}
return
}
@ -315,6 +315,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "PUT" {
if oldFid, err := fs.filer.FindFile(path); err == nil {
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
} else if err != nil && err != filer.ErrNotFound {
glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
}
}
@ -498,6 +500,8 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
if r.Method != "PUT" {
if oldFid, err := fs.filer.FindFile(path); err == nil {
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
} else if err != nil && err != filer.ErrNotFound {
glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
}
}

Loading…
Cancel
Save