From b0035747e33913f4a6f998c410f3fee67d76f7d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9C=8D=E6=99=93=E6=A0=8B?= Date: Wed, 31 Aug 2016 11:32:30 +0800 Subject: [PATCH 1/4] add filer support --- weed/command/filer.go | 5 +- weed/command/server.go | 2 + weed/filer/mysql_store/filer_mapping.sql | 11 + weed/filer/mysql_store/mysql_store.go | 224 +++++++++++++++++++++ weed/filer/mysql_store/mysql_store_test.go | 60 ++++++ weed/server/filer_server.go | 42 +++- 6 files changed, 340 insertions(+), 4 deletions(-) create mode 100644 weed/filer/mysql_store/filer_mapping.sql create mode 100644 weed/filer/mysql_store/mysql_store.go create mode 100644 weed/filer/mysql_store/mysql_store_test.go diff --git a/weed/command/filer.go b/weed/command/filer.go index 0bd508e0b..7d90707a6 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -24,7 +24,8 @@ type FilerOptions struct { dir *string redirectOnRead *bool disableDirListing *bool - maxMB *int + confFile *string + maxMB *int secretKey *string cassandra_server *string cassandra_keyspace *string @@ -43,6 +44,7 @@ func init() { f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") + f.confFile = cmdFiler.Flag.String("confFile", "", "json encoded filer conf file") f.maxMB = cmdFiler.Flag.Int("maxMB", 0, "split files larger than the limit") f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server") f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server") @@ -84,6 +86,7 @@ func runFiler(cmd *Command, args []string) bool { r := http.NewServeMux() _, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection, *f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing, + *f.confFile, *f.maxMB, *f.secretKey, *f.cassandra_server, *f.cassandra_keyspace, diff --git a/weed/command/server.go b/weed/command/server.go index 7a6677a65..eed7dcae4 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -86,6 +86,7 @@ func init() { filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") + filerOptions.confFile = cmdServer.Flag.String("filer.confFile", "", "json encoded filer conf file") filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 0, "split files larger than the limit") filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server") filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") @@ -170,6 +171,7 @@ func runServer(cmd *Command, args []string) bool { _, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, *filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead, *filerOptions.disableDirListing, + *filerOptions.confFile, *filerOptions.maxMB, *filerOptions.secretKey, *filerOptions.cassandra_server, *filerOptions.cassandra_keyspace, diff --git a/weed/filer/mysql_store/filer_mapping.sql b/weed/filer/mysql_store/filer_mapping.sql new file mode 100644 index 000000000..6bbe4e880 --- /dev/null +++ b/weed/filer/mysql_store/filer_mapping.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS `filer_mapping` ( + `id` bigint(20) NOT NULL AUTO_INCREMENT, + `uriPath` char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath', + `fid` char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid', + `createTime` int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp', + `updateTime` int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp', + `remark` varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field', + `status` tinyint(2) DEFAULT '1' COMMENT 'resource status', + PRIMARY KEY (`id`), + UNIQUE KEY `index_uriPath` (`uriPath`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; \ No newline at end of file diff --git a/weed/filer/mysql_store/mysql_store.go b/weed/filer/mysql_store/mysql_store.go new file mode 100644 index 000000000..44d0d88a7 --- /dev/null +++ b/weed/filer/mysql_store/mysql_store.go @@ -0,0 +1,224 @@ +package mysql_store + +import ( + "database/sql" + "fmt" + "hash/crc32" + "sync" + "time" + + _ "github.com/go-sql-driver/mysql" +) + +const ( + sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8" + maxIdleConnections = 100 + maxOpenConnections = 50 + maxTableNums = 1024 + tableName = "filer_mapping" +) + +var ( + _init_db sync.Once + _db_connections []*sql.DB +) + +type MySqlConf struct { + User string + Password string + HostName string + Port int + DataBase string +} + +type MySqlStore struct { + dbs []*sql.DB +} + +func getDbConnection(confs []MySqlConf) []*sql.DB { + _init_db.Do(func() { + for _, conf := range confs { + + sqlUrl := fmt.Sprintf(sqlUrl, conf.User, conf.Password, conf.HostName, conf.Port, conf.DataBase) + var dbErr error + _db_connection, dbErr := sql.Open("mysql", sqlUrl) + if dbErr != nil { + _db_connection.Close() + _db_connection = nil + panic(dbErr) + } + _db_connection.SetMaxIdleConns(maxIdleConnections) + _db_connection.SetMaxOpenConns(maxOpenConnections) + _db_connections = append(_db_connections, _db_connection) + } + }) + return _db_connections +} + +func NewMysqlStore(confs []MySqlConf) *MySqlStore { + ms := &MySqlStore{ + dbs: getDbConnection(confs), + } + + for _, db := range ms.dbs { + for i := 0; i < maxTableNums; i++ { + if err := ms.createTables(db, tableName, i); err != nil { + fmt.Printf("create table failed %s", err.Error()) + } + } + } + + return ms +} + +func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) { + hash_value := crc32.ChecksumIEEE([]byte(fullFileName)) + instance_offset = int(hash_value) % len(s.dbs) + table_postfix = int(hash_value) % maxTableNums + return +} + +func (s *MySqlStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) { + instance_offset, table_postfix := s.hash(path) + instanceId = instance_offset + tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix) + return +} + +func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) { + instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath) + if err != nil { + return "", err + } + fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName) + if err == sql.ErrNoRows { + //Could not found + err = nil + } + return fid, err +} + +func (s *MySqlStore) Put(fullFilePath string, fid string) (err error) { + var tableFullName string + + instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath) + if err != nil { + return err + } + if old_fid, localErr := s.query(fullFilePath, s.dbs[instance_offset], tableFullName); localErr != nil && localErr != sql.ErrNoRows { + err = localErr + return + } else { + if len(old_fid) == 0 { + err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName) + } else { + err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName) + } + } + return +} + +func (s *MySqlStore) Delete(fullFilePath string) (err error) { + var fid string + instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath) + if err != nil { + return err + } + if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil { + return err + } else if fid == "" { + return nil + } + if err := s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil { + return err + } else { + return nil + } +} + +func (s *MySqlStore) Close() { + for _, db := range s.dbs { + db.Close() + } +} + +var createTable = ` +CREATE TABLE IF NOT EXISTS %s_%04d ( + id bigint(20) NOT NULL AUTO_INCREMENT, + uriPath char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath', + fid char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid', + createTime int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp', + updateTime int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp', + remark varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field', + status tinyint(2) DEFAULT '1' COMMENT 'resource status', + PRIMARY KEY (id), + UNIQUE KEY index_uriPath (uriPath) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; +` + +func (s *MySqlStore) createTables(db *sql.DB, tableName string, postfix int) error { + stmt, err := db.Prepare(fmt.Sprintf(createTable, tableName, postfix)) + if err != nil { + return err + } + defer stmt.Close() + + _, err = stmt.Exec() + if err != nil { + return err + } + return nil +} + +func (s *MySqlStore) query(uriPath string, db *sql.DB, tableName string) (string, error) { + sqlStatement := "SELECT fid FROM %s WHERE uriPath=?" + row := db.QueryRow(fmt.Sprintf(sqlStatement, tableName), uriPath) + var fid string + err := row.Scan(&fid) + if err != nil { + return "", err + } + return fid, nil +} + +func (s *MySqlStore) update(uriPath string, fid string, db *sql.DB, tableName string) error { + sqlStatement := "UPDATE %s SET fid=?, updateTime=? WHERE uriPath=?" + res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), fid, time.Now().Unix(), uriPath) + if err != nil { + return err + } + + _, err = res.RowsAffected() + if err != nil { + return err + } + return nil +} + +func (s *MySqlStore) insert(uriPath string, fid string, db *sql.DB, tableName string) error { + sqlStatement := "INSERT INTO %s (uriPath,fid,createTime) VALUES(?,?,?)" + res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath, fid, time.Now().Unix()) + if err != nil { + return err + } + + _, err = res.RowsAffected() + if err != nil { + return err + } + return nil +} + +func (s *MySqlStore) delete(uriPath string, db *sql.DB, tableName string) error { + sqlStatement := "DELETE FROM %s WHERE uriPath=?" + res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath) + if err != nil { + return err + } + + _, err = res.RowsAffected() + if err != nil { + return err + } + return nil +} diff --git a/weed/filer/mysql_store/mysql_store_test.go b/weed/filer/mysql_store/mysql_store_test.go new file mode 100644 index 000000000..e89ca9020 --- /dev/null +++ b/weed/filer/mysql_store/mysql_store_test.go @@ -0,0 +1,60 @@ +package mysql_store + +import ( + "encoding/json" + "hash/crc32" + "testing" +) + +/* +To improve performance when storing billion of files, you could shar +At each mysql instance, we will try to create 1024 tables if not exist, table name will be something like: +filer_mapping_0000 +filer_mapping_0001 +..... +filer_mapping_1023 +sample conf should be + +>$cat filer_conf.json +{ + "mysql": [ + { + "User": "root", + "Password": "root", + "HostName": "127.0.0.1", + "Port": 3306, + "DataBase": "seaweedfs" + }, + { + "User": "root", + "Password": "root", + "HostName": "127.0.0.2", + "Port": 3306, + "DataBase": "seaweedfs" + } + ] +} +*/ + +func TestGenerateMysqlConf(t *testing.T) { + var conf MySqlConf + conf = append(conf, MySqlInstConf{ + User: "root", + Password: "root", + HostName: "localhost", + Port: 3306, + DataBase: "seaweedfs", + }) + body, err := json.Marshal(conf) + if err != nil { + t.Errorf("json encoding err %s", err.Error()) + } + t.Logf("json output is %s", string(body)) +} + +func TestCRC32FullPathName(t *testing.T) { + fullPathName := "/prod-bucket/law632191483895612493300-signed.pdf" + hash_value := crc32.ChecksumIEEE([]byte(fullPathName)) + table_postfix := int(hash_value) % 1024 + t.Logf("table postfix %d", table_postfix) +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 3c7c1fd9e..1da0d065d 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -1,8 +1,10 @@ package weed_server import ( + "encoding/json" "math/rand" "net/http" + "os" "strconv" "sync" "time" @@ -11,6 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer/cassandra_store" "github.com/chrislusf/seaweedfs/weed/filer/embedded_filer" "github.com/chrislusf/seaweedfs/weed/filer/flat_namespace" + "github.com/chrislusf/seaweedfs/weed/filer/mysql_store" "github.com/chrislusf/seaweedfs/weed/filer/redis_store" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" @@ -18,6 +21,25 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +type filerConf struct { + MysqlConf []mysql_store.MySqlConf `json:"mysql"` +} + +func parseConfFile(confPath string) (*filerConf, error) { + var setting filerConf + configFile, err := os.Open(confPath) + defer configFile.Close() + if err != nil { + return nil, err + } + + jsonParser := json.NewDecoder(configFile) + if err = jsonParser.Decode(&setting); err != nil { + return nil, err + } + return &setting, nil +} + type FilerServer struct { port string master string @@ -28,12 +50,13 @@ type FilerServer struct { disableDirListing bool secret security.Secret filer filer.Filer - maxMB int + maxMB int masterNodes *storage.MasterNodes } func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string, replication string, redirectOnRead bool, disableDirListing bool, + confFile string, maxMB int, secret string, cassandra_server string, cassandra_keyspace string, @@ -45,11 +68,24 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st defaultReplication: replication, redirectOnRead: redirectOnRead, disableDirListing: disableDirListing, - maxMB: maxMB, + maxMB: maxMB, port: ip + ":" + strconv.Itoa(port), } - if cassandra_server != "" { + var setting *filerConf + if confFile != "" { + setting, err = parseConfFile(confFile) + if err != nil { + return nil, err + } + } else { + setting = new(filerConf) + } + + if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 { + mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf) + fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store) + } else if cassandra_server != "" { cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server) if err != nil { glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err) From e7b237c8dadc8f5d65fed8db2115c1d946d8c519 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9C=8D=E6=99=93=E6=A0=8B?= Date: Wed, 31 Aug 2016 11:55:02 +0800 Subject: [PATCH 2/4] UT case fix --- weed/filer/mysql_store/mysql_store_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/weed/filer/mysql_store/mysql_store_test.go b/weed/filer/mysql_store/mysql_store_test.go index e89ca9020..2bfe26dc8 100644 --- a/weed/filer/mysql_store/mysql_store_test.go +++ b/weed/filer/mysql_store/mysql_store_test.go @@ -37,8 +37,8 @@ sample conf should be */ func TestGenerateMysqlConf(t *testing.T) { - var conf MySqlConf - conf = append(conf, MySqlInstConf{ + var conf []MySqlConf + conf = append(conf, MySqlConf{ User: "root", Password: "root", HostName: "localhost", From 3aa021a812317fa640ebe2c7ec3b2a78492fa319 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9C=8D=E6=99=93=E6=A0=8B?= Date: Mon, 5 Sep 2016 14:10:22 +0800 Subject: [PATCH 3/4] refactoring mysql store code --- weed/filer/mysql_store/README.md | 66 +++++++++++++ weed/filer/mysql_store/filer_mapping.sql | 11 --- weed/filer/mysql_store/mysql_store.go | 102 +++++++++++++++------ weed/filer/mysql_store/mysql_store_test.go | 30 ------ weed/server/filer_server.go | 3 +- 5 files changed, 141 insertions(+), 71 deletions(-) create mode 100644 weed/filer/mysql_store/README.md delete mode 100644 weed/filer/mysql_store/filer_mapping.sql diff --git a/weed/filer/mysql_store/README.md b/weed/filer/mysql_store/README.md new file mode 100644 index 000000000..4ce6438da --- /dev/null +++ b/weed/filer/mysql_store/README.md @@ -0,0 +1,66 @@ +#MySQL filer mapping store + +## Schema format + + +Basically, uriPath and fid are the key elements stored in MySQL. In view of the optimization and user's usage, +adding primary key with integer type and involving createTime, updateTime, status fields should be somewhat meaningful. +Of course, you could customize the schema per your concretely circumstance freely. + +

+CREATE TABLE IF NOT EXISTS `filer_mapping` (
+  `id` bigint(20) NOT NULL AUTO_INCREMENT,
+  `uriPath` char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
+  `fid` char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
+  `createTime` int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
+  `updateTime` int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
+  `remark` varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
+  `status` tinyint(2) DEFAULT '1' COMMENT 'resource status',
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `index_uriPath` (`uriPath`)
+) DEFAULT CHARSET=utf8;
+
+ + +The MySQL 's config params is not added into the weed command option as other stores(redis,cassandra). Instead, +We created a config file(json format) for them. TOML,YAML or XML also should be OK. But TOML and YAML need import thirdparty package +while XML is a little bit complex. + +The sample config file's content is below: + +

+{
+    "mysql": [
+        {
+            "User": "root",
+            "Password": "root",
+            "HostName": "127.0.0.1",
+            "Port": 3306,
+            "DataBase": "seaweedfs"
+        },
+        {
+            "User": "root",
+            "Password": "root",
+            "HostName": "127.0.0.2",
+            "Port": 3306,
+            "DataBase": "seaweedfs"
+        }
+    ],
+    "IsSharding":true,
+    "ShardingNum":1024
+}
+
+ + +The "mysql" field in above conf file is an array which include all mysql instances you prepared to store sharding data. +1. If one mysql instance is enough, just keep one instance in "mysql" field. +2. If table sharding at a specific mysql instance is needed , mark "IsSharding" field with true and specify total table +sharding numbers using "ShardingNum" field. +3. If the mysql service could be auto scaled transparently in your environment, just config one mysql instance(usually it's a frondend proxy or VIP), +and mark "IsSharding" with false value +4. If your prepare more than one mysql instances and have no plan to use table sharding for any instance(mark isSharding with false), instance sharding +will still be done implicitly + + + + diff --git a/weed/filer/mysql_store/filer_mapping.sql b/weed/filer/mysql_store/filer_mapping.sql deleted file mode 100644 index 6bbe4e880..000000000 --- a/weed/filer/mysql_store/filer_mapping.sql +++ /dev/null @@ -1,11 +0,0 @@ -CREATE TABLE IF NOT EXISTS `filer_mapping` ( - `id` bigint(20) NOT NULL AUTO_INCREMENT, - `uriPath` char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath', - `fid` char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid', - `createTime` int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp', - `updateTime` int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp', - `remark` varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field', - `status` tinyint(2) DEFAULT '1' COMMENT 'resource status', - PRIMARY KEY (`id`), - UNIQUE KEY `index_uriPath` (`uriPath`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8; \ No newline at end of file diff --git a/weed/filer/mysql_store/mysql_store.go b/weed/filer/mysql_store/mysql_store.go index 44d0d88a7..439ed22f7 100644 --- a/weed/filer/mysql_store/mysql_store.go +++ b/weed/filer/mysql_store/mysql_store.go @@ -11,11 +11,11 @@ import ( ) const ( - sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8" - maxIdleConnections = 100 - maxOpenConnections = 50 - maxTableNums = 1024 - tableName = "filer_mapping" + sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8" + default_maxIdleConnections = 100 + default_maxOpenConnections = 50 + default_maxTableNums = 1024 + tableName = "filer_mapping" ) var ( @@ -24,15 +24,24 @@ var ( ) type MySqlConf struct { - User string - Password string - HostName string - Port int - DataBase string + User string + Password string + HostName string + Port int + DataBase string + MaxIdleConnections int + MaxOpenConnections int +} + +type ShardingConf struct { + IsSharding bool `json:"isSharding"` + ShardingNum int `json:"shardingNum"` } type MySqlStore struct { - dbs []*sql.DB + dbs []*sql.DB + isSharding bool + shardingNum int } func getDbConnection(confs []MySqlConf) []*sql.DB { @@ -47,6 +56,19 @@ func getDbConnection(confs []MySqlConf) []*sql.DB { _db_connection = nil panic(dbErr) } + var maxIdleConnections, maxOpenConnections int + + if conf.MaxIdleConnections != 0 { + maxIdleConnections = conf.MaxIdleConnections + } else { + maxIdleConnections = default_maxIdleConnections + } + if conf.MaxOpenConnections != 0 { + maxOpenConnections = conf.MaxOpenConnections + } else { + maxOpenConnections = default_maxOpenConnections + } + _db_connection.SetMaxIdleConns(maxIdleConnections) _db_connection.SetMaxOpenConns(maxOpenConnections) _db_connections = append(_db_connections, _db_connection) @@ -55,15 +77,24 @@ func getDbConnection(confs []MySqlConf) []*sql.DB { return _db_connections } -func NewMysqlStore(confs []MySqlConf) *MySqlStore { +func NewMysqlStore(confs []MySqlConf, isSharding bool, shardingNum int) *MySqlStore { ms := &MySqlStore{ - dbs: getDbConnection(confs), + dbs: getDbConnection(confs), + isSharding: isSharding, + shardingNum: shardingNum, } for _, db := range ms.dbs { - for i := 0; i < maxTableNums; i++ { + if !isSharding { + ms.shardingNum = 1 + } else { + if ms.shardingNum == 0 { + ms.shardingNum = default_maxTableNums + } + } + for i := 0; i < ms.shardingNum; i++ { if err := ms.createTables(db, tableName, i); err != nil { - fmt.Printf("create table failed %s", err.Error()) + fmt.Printf("create table failed %v", err) } } } @@ -74,21 +105,25 @@ func NewMysqlStore(confs []MySqlConf) *MySqlStore { func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) { hash_value := crc32.ChecksumIEEE([]byte(fullFileName)) instance_offset = int(hash_value) % len(s.dbs) - table_postfix = int(hash_value) % maxTableNums + table_postfix = int(hash_value) % s.shardingNum return } func (s *MySqlStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) { instance_offset, table_postfix := s.hash(path) instanceId = instance_offset - tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix) + if s.isSharding { + tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix) + } else { + tableFullName = tableName + } return } func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) { instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath) if err != nil { - return "", err + return "", fmt.Errorf("MySqlStore Get operation can not parse file path %s: err is %v", fullFilePath, err) } fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName) if err == sql.ErrNoRows { @@ -103,16 +138,18 @@ func (s *MySqlStore) Put(fullFilePath string, fid string) (err error) { instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath) if err != nil { - return err + return fmt.Errorf("MySqlStore Put operation can not parse file path %s: err is %v", fullFilePath, err) } - if old_fid, localErr := s.query(fullFilePath, s.dbs[instance_offset], tableFullName); localErr != nil && localErr != sql.ErrNoRows { - err = localErr - return + var old_fid string + if old_fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil && err != sql.ErrNoRows { + return fmt.Errorf("MySqlStore Put operation failed when querying path %s: err is %v", fullFilePath, err) } else { if len(old_fid) == 0 { err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName) + err = fmt.Errorf("MySqlStore Put operation failed when inserting path %s with fid %s : err is %v", fullFilePath, fid, err) } else { err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName) + err = fmt.Errorf("MySqlStore Put operation failed when updating path %s with fid %s : err is %v", fullFilePath, fid, err) } } return @@ -122,15 +159,15 @@ func (s *MySqlStore) Delete(fullFilePath string) (err error) { var fid string instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath) if err != nil { - return err + return fmt.Errorf("MySqlStore Delete operation can not parse file path %s: err is %v", fullFilePath, err) } if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil { - return err + return fmt.Errorf("MySqlStore Delete operation failed when querying path %s: err is %v", fullFilePath, err) } else if fid == "" { return nil } - if err := s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil { - return err + if err = s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil { + return fmt.Errorf("MySqlStore Delete operation failed when deleting path %s: err is %v", fullFilePath, err) } else { return nil } @@ -143,7 +180,7 @@ func (s *MySqlStore) Close() { } var createTable = ` -CREATE TABLE IF NOT EXISTS %s_%04d ( +CREATE TABLE IF NOT EXISTS %s ( id bigint(20) NOT NULL AUTO_INCREMENT, uriPath char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath', fid char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid', @@ -153,11 +190,18 @@ CREATE TABLE IF NOT EXISTS %s_%04d ( status tinyint(2) DEFAULT '1' COMMENT 'resource status', PRIMARY KEY (id), UNIQUE KEY index_uriPath (uriPath) -) ENGINE=InnoDB DEFAULT CHARSET=utf8; +) DEFAULT CHARSET=utf8; ` func (s *MySqlStore) createTables(db *sql.DB, tableName string, postfix int) error { - stmt, err := db.Prepare(fmt.Sprintf(createTable, tableName, postfix)) + var realTableName string + if s.isSharding { + realTableName = fmt.Sprintf("%s_%4d", tableName, postfix) + } else { + realTableName = tableName + } + + stmt, err := db.Prepare(fmt.Sprintf(createTable, realTableName)) if err != nil { return err } diff --git a/weed/filer/mysql_store/mysql_store_test.go b/weed/filer/mysql_store/mysql_store_test.go index 2bfe26dc8..1c9765c59 100644 --- a/weed/filer/mysql_store/mysql_store_test.go +++ b/weed/filer/mysql_store/mysql_store_test.go @@ -6,36 +6,6 @@ import ( "testing" ) -/* -To improve performance when storing billion of files, you could shar -At each mysql instance, we will try to create 1024 tables if not exist, table name will be something like: -filer_mapping_0000 -filer_mapping_0001 -..... -filer_mapping_1023 -sample conf should be - ->$cat filer_conf.json -{ - "mysql": [ - { - "User": "root", - "Password": "root", - "HostName": "127.0.0.1", - "Port": 3306, - "DataBase": "seaweedfs" - }, - { - "User": "root", - "Password": "root", - "HostName": "127.0.0.2", - "Port": 3306, - "DataBase": "seaweedfs" - } - ] -} -*/ - func TestGenerateMysqlConf(t *testing.T) { var conf []MySqlConf conf = append(conf, MySqlConf{ diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 1da0d065d..1bcbd046f 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -23,6 +23,7 @@ import ( type filerConf struct { MysqlConf []mysql_store.MySqlConf `json:"mysql"` + mysql_store.ShardingConf } func parseConfFile(confPath string) (*filerConf, error) { @@ -83,7 +84,7 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st } if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 { - mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf) + mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf, setting.IsSharding, setting.ShardingNum) fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store) } else if cassandra_server != "" { cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server) From 78474409a596aabd8e579716ba1f4939e7d62579 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9C=8D=E6=99=93=E6=A0=8B?= Date: Thu, 8 Sep 2016 11:35:54 +0800 Subject: [PATCH 4/4] filer mysqlstore bug fix --- weed/filer/cassandra_store/cassandra_store.go | 2 ++ weed/filer/embedded_filer/files_in_leveldb.go | 4 ++- weed/filer/filer.go | 6 ++++ weed/filer/mysql_store/README.md | 15 +++++---- weed/filer/mysql_store/mysql_store.go | 32 ++++++++++--------- weed/filer/redis_store/redis_store.go | 4 ++- weed/server/filer_server.go | 2 +- weed/server/filer_server_handlers_read.go | 3 +- weed/server/filer_server_handlers_write.go | 12 ++++--- 9 files changed, 50 insertions(+), 30 deletions(-) diff --git a/weed/filer/cassandra_store/cassandra_store.go b/weed/filer/cassandra_store/cassandra_store.go index cdb9d3e3c..50a792a65 100644 --- a/weed/filer/cassandra_store/cassandra_store.go +++ b/weed/filer/cassandra_store/cassandra_store.go @@ -3,6 +3,7 @@ package cassandra_store import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/gocql/gocql" @@ -59,6 +60,7 @@ func (c *CassandraStore) Get(fullFileName string) (fid string, err error) { fullFileName).Consistency(gocql.One).Scan(&output); err != nil { if err != gocql.ErrNotFound { glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err) + return "", filer.ErrNotFound } } if len(output) == 0 { diff --git a/weed/filer/embedded_filer/files_in_leveldb.go b/weed/filer/embedded_filer/files_in_leveldb.go index 19f6dd7e8..c40d7adaf 100644 --- a/weed/filer/embedded_filer/files_in_leveldb.go +++ b/weed/filer/embedded_filer/files_in_leveldb.go @@ -53,7 +53,9 @@ func (fl *FileListInLevelDb) DeleteFile(dirId filer.DirectoryId, fileName string } func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string) (fid string, err error) { data, e := fl.db.Get(genKey(dirId, fileName), nil) - if e != nil { + if e == leveldb.ErrNotFound { + return "", filer.ErrNotFound + } else if e != nil { return "", e } return string(data), nil diff --git a/weed/filer/filer.go b/weed/filer/filer.go index fd23e119c..5d5acb68d 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -1,5 +1,9 @@ package filer +import ( + "errors" +) + type FileId string //file id in SeaweedFS type FileEntry struct { @@ -26,3 +30,5 @@ type Filer interface { DeleteDirectory(dirPath string, recursive bool) (err error) Move(fromPath string, toPath string) (err error) } + +var ErrNotFound = errors.New("filer: no entry is found in filer store") diff --git a/weed/filer/mysql_store/README.md b/weed/filer/mysql_store/README.md index 4ce6438da..6efeb1c54 100644 --- a/weed/filer/mysql_store/README.md +++ b/weed/filer/mysql_store/README.md @@ -47,19 +47,20 @@ The sample config file's content is below: } ], "IsSharding":true, - "ShardingNum":1024 + "ShardCount":1024 } The "mysql" field in above conf file is an array which include all mysql instances you prepared to store sharding data. + 1. If one mysql instance is enough, just keep one instance in "mysql" field. -2. If table sharding at a specific mysql instance is needed , mark "IsSharding" field with true and specify total table -sharding numbers using "ShardingNum" field. -3. If the mysql service could be auto scaled transparently in your environment, just config one mysql instance(usually it's a frondend proxy or VIP), -and mark "IsSharding" with false value -4. If your prepare more than one mysql instances and have no plan to use table sharding for any instance(mark isSharding with false), instance sharding -will still be done implicitly + +2. If table sharding at a specific mysql instance is needed , mark "IsSharding" field with true and specify total table sharding numbers using "ShardCount" field. + +3. If the mysql service could be auto scaled transparently in your environment, just config one mysql instance(usually it's a frondend proxy or VIP),and mark "IsSharding" with false value + +4. If you prepare more than one mysql instance and have no plan to use table sharding for any instance(mark isSharding with false), instance sharding will still be done implicitly diff --git a/weed/filer/mysql_store/mysql_store.go b/weed/filer/mysql_store/mysql_store.go index 439ed22f7..6910206ce 100644 --- a/weed/filer/mysql_store/mysql_store.go +++ b/weed/filer/mysql_store/mysql_store.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/chrislusf/seaweedfs/weed/filer" + _ "github.com/go-sql-driver/mysql" ) @@ -34,14 +36,14 @@ type MySqlConf struct { } type ShardingConf struct { - IsSharding bool `json:"isSharding"` - ShardingNum int `json:"shardingNum"` + IsSharding bool `json:"isSharding"` + ShardCount int `json:"shardCount"` } type MySqlStore struct { - dbs []*sql.DB - isSharding bool - shardingNum int + dbs []*sql.DB + isSharding bool + shardCount int } func getDbConnection(confs []MySqlConf) []*sql.DB { @@ -77,22 +79,22 @@ func getDbConnection(confs []MySqlConf) []*sql.DB { return _db_connections } -func NewMysqlStore(confs []MySqlConf, isSharding bool, shardingNum int) *MySqlStore { +func NewMysqlStore(confs []MySqlConf, isSharding bool, shardCount int) *MySqlStore { ms := &MySqlStore{ - dbs: getDbConnection(confs), - isSharding: isSharding, - shardingNum: shardingNum, + dbs: getDbConnection(confs), + isSharding: isSharding, + shardCount: shardCount, } for _, db := range ms.dbs { if !isSharding { - ms.shardingNum = 1 + ms.shardCount = 1 } else { - if ms.shardingNum == 0 { - ms.shardingNum = default_maxTableNums + if ms.shardCount == 0 { + ms.shardCount = default_maxTableNums } } - for i := 0; i < ms.shardingNum; i++ { + for i := 0; i < ms.shardCount; i++ { if err := ms.createTables(db, tableName, i); err != nil { fmt.Printf("create table failed %v", err) } @@ -105,7 +107,7 @@ func NewMysqlStore(confs []MySqlConf, isSharding bool, shardingNum int) *MySqlSt func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) { hash_value := crc32.ChecksumIEEE([]byte(fullFileName)) instance_offset = int(hash_value) % len(s.dbs) - table_postfix = int(hash_value) % s.shardingNum + table_postfix = int(hash_value) % s.shardCount return } @@ -128,7 +130,7 @@ func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) { fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName) if err == sql.ErrNoRows { //Could not found - err = nil + err = filer.ErrNotFound } return fid, err } diff --git a/weed/filer/redis_store/redis_store.go b/weed/filer/redis_store/redis_store.go index 5e51b5455..2ad49a805 100644 --- a/weed/filer/redis_store/redis_store.go +++ b/weed/filer/redis_store/redis_store.go @@ -1,6 +1,8 @@ package redis_store import ( + "github.com/chrislusf/seaweedfs/weed/filer" + redis "gopkg.in/redis.v2" ) @@ -20,7 +22,7 @@ func NewRedisStore(hostPort string, password string, database int) *RedisStore { func (s *RedisStore) Get(fullFileName string) (fid string, err error) { fid, err = s.Client.Get(fullFileName).Result() if err == redis.Nil { - err = nil + err = filer.ErrNotFound } return fid, err } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 1bcbd046f..959bb92cb 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -84,7 +84,7 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st } if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 { - mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf, setting.IsSharding, setting.ShardingNum) + mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf, setting.IsSharding, setting.ShardCount) fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store) } else if cassandra_server != "" { cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server) diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 6b9505377..bf95e37b9 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" ui "github.com/chrislusf/seaweedfs/weed/server/filer_ui" @@ -87,7 +88,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } fileId, err := fs.filer.FindFile(r.URL.Path) - if err == leveldb.ErrNotFound { + if err == filer.ErrNotFound { glog.V(3).Infoln("Not found in db", r.URL.Path) w.WriteHeader(http.StatusNotFound) return diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 872d8c4b9..464cb81ef 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -15,11 +15,11 @@ import ( "net/url" "strings" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/syndtr/goleveldb/leveldb" "path" "strconv" ) @@ -73,17 +73,17 @@ func makeFormData(filename, mimeType string, content io.Reader) (formData io.Rea } func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) { - if fileId, err = fs.filer.FindFile(path); err != nil && err != leveldb.ErrNotFound { + if fileId, err = fs.filer.FindFile(path); err != nil && err != filer.ErrNotFound { glog.V(0).Infoln("failing to find path in filer store", path, err.Error()) writeJsonError(w, r, http.StatusInternalServerError, err) - return } else if fileId != "" && err == nil { urlLocation, err = operation.LookupFileId(fs.getMasterNode(), fileId) if err != nil { glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error()) w.WriteHeader(http.StatusNotFound) - return } + } else if fileId == "" && err == filer.ErrNotFound { + w.WriteHeader(http.StatusNotFound) } return } @@ -315,6 +315,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "PUT" { if oldFid, err := fs.filer.FindFile(path); err == nil { operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) + } else if err != nil && err != filer.ErrNotFound { + glog.V(0).Infof("error %v occur when finding %s in filer store", err, path) } } @@ -498,6 +500,8 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte if r.Method != "PUT" { if oldFid, err := fs.filer.FindFile(path); err == nil { operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) + } else if err != nil && err != filer.ErrNotFound { + glog.V(0).Infof("error %v occur when finding %s in filer store", err, path) } }