Browse Source

filer mysqlstore bug fix

pull/361/head
霍晓栋 9 years ago
parent
commit
78474409a5
  1. 2
      weed/filer/cassandra_store/cassandra_store.go
  2. 4
      weed/filer/embedded_filer/files_in_leveldb.go
  3. 6
      weed/filer/filer.go
  4. 15
      weed/filer/mysql_store/README.md
  5. 22
      weed/filer/mysql_store/mysql_store.go
  6. 4
      weed/filer/redis_store/redis_store.go
  7. 2
      weed/server/filer_server.go
  8. 3
      weed/server/filer_server_handlers_read.go
  9. 12
      weed/server/filer_server_handlers_write.go

2
weed/filer/cassandra_store/cassandra_store.go

@ -3,6 +3,7 @@ package cassandra_store
import ( import (
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/gocql/gocql" "github.com/gocql/gocql"
@ -59,6 +60,7 @@ func (c *CassandraStore) Get(fullFileName string) (fid string, err error) {
fullFileName).Consistency(gocql.One).Scan(&output); err != nil { fullFileName).Consistency(gocql.One).Scan(&output); err != nil {
if err != gocql.ErrNotFound { if err != gocql.ErrNotFound {
glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err) glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err)
return "", filer.ErrNotFound
} }
} }
if len(output) == 0 { if len(output) == 0 {

4
weed/filer/embedded_filer/files_in_leveldb.go

@ -53,7 +53,9 @@ func (fl *FileListInLevelDb) DeleteFile(dirId filer.DirectoryId, fileName string
} }
func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string) (fid string, err error) { func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string) (fid string, err error) {
data, e := fl.db.Get(genKey(dirId, fileName), nil) data, e := fl.db.Get(genKey(dirId, fileName), nil)
if e != nil {
if e == leveldb.ErrNotFound {
return "", filer.ErrNotFound
} else if e != nil {
return "", e return "", e
} }
return string(data), nil return string(data), nil

6
weed/filer/filer.go

@ -1,5 +1,9 @@
package filer package filer
import (
"errors"
)
type FileId string //file id in SeaweedFS type FileId string //file id in SeaweedFS
type FileEntry struct { type FileEntry struct {
@ -26,3 +30,5 @@ type Filer interface {
DeleteDirectory(dirPath string, recursive bool) (err error) DeleteDirectory(dirPath string, recursive bool) (err error)
Move(fromPath string, toPath string) (err error) Move(fromPath string, toPath string) (err error)
} }
var ErrNotFound = errors.New("filer: no entry is found in filer store")

15
weed/filer/mysql_store/README.md

@ -47,19 +47,20 @@ The sample config file's content is below:
} }
], ],
"IsSharding":true, "IsSharding":true,
"ShardingNum":1024
"ShardCount":1024
} }
</code></pre> </code></pre>
The "mysql" field in above conf file is an array which include all mysql instances you prepared to store sharding data. The "mysql" field in above conf file is an array which include all mysql instances you prepared to store sharding data.
1. If one mysql instance is enough, just keep one instance in "mysql" field. 1. If one mysql instance is enough, just keep one instance in "mysql" field.
2. If table sharding at a specific mysql instance is needed , mark "IsSharding" field with true and specify total table
sharding numbers using "ShardingNum" field.
3. If the mysql service could be auto scaled transparently in your environment, just config one mysql instance(usually it's a frondend proxy or VIP),
and mark "IsSharding" with false value
4. If your prepare more than one mysql instances and have no plan to use table sharding for any instance(mark isSharding with false), instance sharding
will still be done implicitly
2. If table sharding at a specific mysql instance is needed , mark "IsSharding" field with true and specify total table sharding numbers using "ShardCount" field.
3. If the mysql service could be auto scaled transparently in your environment, just config one mysql instance(usually it's a frondend proxy or VIP),and mark "IsSharding" with false value
4. If you prepare more than one mysql instance and have no plan to use table sharding for any instance(mark isSharding with false), instance sharding will still be done implicitly

22
weed/filer/mysql_store/mysql_store.go

@ -7,6 +7,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/filer"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
) )
@ -35,13 +37,13 @@ type MySqlConf struct {
type ShardingConf struct { type ShardingConf struct {
IsSharding bool `json:"isSharding"` IsSharding bool `json:"isSharding"`
ShardingNum int `json:"shardingNum"`
ShardCount int `json:"shardCount"`
} }
type MySqlStore struct { type MySqlStore struct {
dbs []*sql.DB dbs []*sql.DB
isSharding bool isSharding bool
shardingNum int
shardCount int
} }
func getDbConnection(confs []MySqlConf) []*sql.DB { func getDbConnection(confs []MySqlConf) []*sql.DB {
@ -77,22 +79,22 @@ func getDbConnection(confs []MySqlConf) []*sql.DB {
return _db_connections return _db_connections
} }
func NewMysqlStore(confs []MySqlConf, isSharding bool, shardingNum int) *MySqlStore {
func NewMysqlStore(confs []MySqlConf, isSharding bool, shardCount int) *MySqlStore {
ms := &MySqlStore{ ms := &MySqlStore{
dbs: getDbConnection(confs), dbs: getDbConnection(confs),
isSharding: isSharding, isSharding: isSharding,
shardingNum: shardingNum,
shardCount: shardCount,
} }
for _, db := range ms.dbs { for _, db := range ms.dbs {
if !isSharding { if !isSharding {
ms.shardingNum = 1
ms.shardCount = 1
} else { } else {
if ms.shardingNum == 0 {
ms.shardingNum = default_maxTableNums
if ms.shardCount == 0 {
ms.shardCount = default_maxTableNums
} }
} }
for i := 0; i < ms.shardingNum; i++ {
for i := 0; i < ms.shardCount; i++ {
if err := ms.createTables(db, tableName, i); err != nil { if err := ms.createTables(db, tableName, i); err != nil {
fmt.Printf("create table failed %v", err) fmt.Printf("create table failed %v", err)
} }
@ -105,7 +107,7 @@ func NewMysqlStore(confs []MySqlConf, isSharding bool, shardingNum int) *MySqlSt
func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) { func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) {
hash_value := crc32.ChecksumIEEE([]byte(fullFileName)) hash_value := crc32.ChecksumIEEE([]byte(fullFileName))
instance_offset = int(hash_value) % len(s.dbs) instance_offset = int(hash_value) % len(s.dbs)
table_postfix = int(hash_value) % s.shardingNum
table_postfix = int(hash_value) % s.shardCount
return return
} }
@ -128,7 +130,7 @@ func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) {
fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName) fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
//Could not found //Could not found
err = nil
err = filer.ErrNotFound
} }
return fid, err return fid, err
} }

4
weed/filer/redis_store/redis_store.go

@ -1,6 +1,8 @@
package redis_store package redis_store
import ( import (
"github.com/chrislusf/seaweedfs/weed/filer"
redis "gopkg.in/redis.v2" redis "gopkg.in/redis.v2"
) )
@ -20,7 +22,7 @@ func NewRedisStore(hostPort string, password string, database int) *RedisStore {
func (s *RedisStore) Get(fullFileName string) (fid string, err error) { func (s *RedisStore) Get(fullFileName string) (fid string, err error) {
fid, err = s.Client.Get(fullFileName).Result() fid, err = s.Client.Get(fullFileName).Result()
if err == redis.Nil { if err == redis.Nil {
err = nil
err = filer.ErrNotFound
} }
return fid, err return fid, err
} }

2
weed/server/filer_server.go

@ -84,7 +84,7 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st
} }
if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 { if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 {
mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf, setting.IsSharding, setting.ShardingNum)
mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf, setting.IsSharding, setting.ShardCount)
fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store) fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store)
} else if cassandra_server != "" { } else if cassandra_server != "" {
cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server) cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)

3
weed/server/filer_server_handlers_read.go

@ -7,6 +7,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
ui "github.com/chrislusf/seaweedfs/weed/server/filer_ui" ui "github.com/chrislusf/seaweedfs/weed/server/filer_ui"
@ -87,7 +88,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
} }
fileId, err := fs.filer.FindFile(r.URL.Path) fileId, err := fs.filer.FindFile(r.URL.Path)
if err == leveldb.ErrNotFound {
if err == filer.ErrNotFound {
glog.V(3).Infoln("Not found in db", r.URL.Path) glog.V(3).Infoln("Not found in db", r.URL.Path)
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return return

12
weed/server/filer_server_handlers_write.go

@ -15,11 +15,11 @@ import (
"net/url" "net/url"
"strings" "strings"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
"path" "path"
"strconv" "strconv"
) )
@ -73,17 +73,17 @@ func makeFormData(filename, mimeType string, content io.Reader) (formData io.Rea
} }
func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) { func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) {
if fileId, err = fs.filer.FindFile(path); err != nil && err != leveldb.ErrNotFound {
if fileId, err = fs.filer.FindFile(path); err != nil && err != filer.ErrNotFound {
glog.V(0).Infoln("failing to find path in filer store", path, err.Error()) glog.V(0).Infoln("failing to find path in filer store", path, err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
return
} else if fileId != "" && err == nil { } else if fileId != "" && err == nil {
urlLocation, err = operation.LookupFileId(fs.getMasterNode(), fileId) urlLocation, err = operation.LookupFileId(fs.getMasterNode(), fileId)
if err != nil { if err != nil {
glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error()) glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error())
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return
} }
} else if fileId == "" && err == filer.ErrNotFound {
w.WriteHeader(http.StatusNotFound)
} }
return return
} }
@ -315,6 +315,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "PUT" { if r.Method != "PUT" {
if oldFid, err := fs.filer.FindFile(path); err == nil { if oldFid, err := fs.filer.FindFile(path); err == nil {
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
} else if err != nil && err != filer.ErrNotFound {
glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
} }
} }
@ -498,6 +500,8 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
if r.Method != "PUT" { if r.Method != "PUT" {
if oldFid, err := fs.filer.FindFile(path); err == nil { if oldFid, err := fs.filer.FindFile(path); err == nil {
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
} else if err != nil && err != filer.ErrNotFound {
glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
} }
} }

Loading…
Cancel
Save