Browse Source

restructuring sql stores

pull/1759/head
Chris Lu 4 years ago
parent
commit
4c5b752b04
  1. 109
      weed/filer/abstract_sql/abstract_sql_store.go
  2. 8
      weed/filer/abstract_sql/abstract_sql_store_kv.go
  3. 45
      weed/filer/mysql/mysql_store.go
  4. 43
      weed/filer/postgres/postgres_store.go

109
weed/filer/abstract_sql/abstract_sql_store.go

@ -9,21 +9,28 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"strings" "strings"
"sync"
) )
type SqlGenerator interface {
GetSqlInsert(bucket string) string
GetSqlUpdate(bucket string) string
GetSqlFind(bucket string) string
GetSqlDelete(bucket string) string
GetSqlDeleteFolderChildren(bucket string) string
GetSqlListExclusive(bucket string) string
GetSqlListInclusive(bucket string) string
}
type AbstractSqlStore struct { type AbstractSqlStore struct {
DB *sql.DB DB *sql.DB
SqlInsert string
SqlUpdate string
SqlFind string
SqlDelete string
SqlDeleteFolderChildren string
SqlListExclusive string
SqlListInclusive string
SqlGenerator
dbs map[string]bool
dbsLock sync.Mutex
} }
const ( const (
DEFAULT = "_main"
DEFAULT_TABLE = "filemeta"
) )
type TxOrDB interface { type TxOrDB interface {
@ -57,16 +64,53 @@ func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
} }
func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) { func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) {
shortPath = fullpath shortPath = fullpath
bucket = DEFAULT_TABLE
if tx, ok := ctx.Value("tx").(*sql.Tx); ok { if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
return tx, bucket, shortPath, err
txOrDB = tx
} else {
txOrDB = store.DB
}
if strings.HasPrefix(string(fullpath), "/buckets/") {
return
}
// detect bucket
bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
t := strings.Index(bucketAndObjectKey, "/")
if t < 0 && !isForChildren {
return
}
if t > 0 {
bucket = bucketAndObjectKey[:t]
shortPath = util.FullPath(bucketAndObjectKey[t:])
}
if isValidBucket(bucket) {
store.dbsLock.Lock()
defer store.dbsLock.Unlock()
if store.dbs == nil {
store.dbs = make(map[string]bool)
} }
return store.DB, bucket, shortPath, err
if _, found := store.dbs[bucket]; !found {
if err = store.createTable(bucket); err != nil {
store.dbs[bucket] = true
}
}
}
return
} }
func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
db, _, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
if err != nil { if err != nil {
return fmt.Errorf("findDB %s : %v", entry.FullPath, err) return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
} }
@ -81,7 +125,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
meta = util.MaybeGzipData(meta) meta = util.MaybeGzipData(meta)
} }
res, err := db.ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta)
res, err := db.ExecContext(ctx, store.GetSqlInsert(bucket), util.HashStringToLong(dir), name, dir, meta)
if err == nil { if err == nil {
return return
} }
@ -94,7 +138,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
// now the insert failed possibly due to duplication constraints // now the insert failed possibly due to duplication constraints
glog.V(1).Infof("insert %s falls back to update: %v", entry.FullPath, err) glog.V(1).Infof("insert %s falls back to update: %v", entry.FullPath, err)
res, err = db.ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir)
res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
if err != nil { if err != nil {
return fmt.Errorf("upsert %s: %s", entry.FullPath, err) return fmt.Errorf("upsert %s: %s", entry.FullPath, err)
} }
@ -109,7 +153,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
db, _, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
if err != nil { if err != nil {
return fmt.Errorf("findDB %s : %v", entry.FullPath, err) return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
} }
@ -120,7 +164,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Ent
return fmt.Errorf("encode %s: %s", entry.FullPath, err) return fmt.Errorf("encode %s: %s", entry.FullPath, err)
} }
res, err := db.ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir)
res, err := db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
if err != nil { if err != nil {
return fmt.Errorf("update %s: %s", entry.FullPath, err) return fmt.Errorf("update %s: %s", entry.FullPath, err)
} }
@ -134,13 +178,13 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Ent
func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) { func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
db, _, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
if err != nil { if err != nil {
return nil, fmt.Errorf("findDB %s : %v", fullpath, err) return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
} }
dir, name := shortPath.DirAndName() dir, name := shortPath.DirAndName()
row := db.QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir)
row := db.QueryRowContext(ctx, store.GetSqlFind(bucket), util.HashStringToLong(dir), name, dir)
var data []byte var data []byte
if err := row.Scan(&data); err != nil { if err := row.Scan(&data); err != nil {
@ -162,14 +206,14 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full
func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
db, _, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
if err != nil { if err != nil {
return fmt.Errorf("findDB %s : %v", fullpath, err) return fmt.Errorf("findDB %s : %v", fullpath, err)
} }
dir, name := shortPath.DirAndName() dir, name := shortPath.DirAndName()
res, err := db.ExecContext(ctx, store.SqlDelete, util.HashStringToLong(dir), name, dir)
res, err := db.ExecContext(ctx, store.GetSqlDelete(bucket), util.HashStringToLong(dir), name, dir)
if err != nil { if err != nil {
return fmt.Errorf("delete %s: %s", fullpath, err) return fmt.Errorf("delete %s: %s", fullpath, err)
} }
@ -189,12 +233,16 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
return fmt.Errorf("findDB %s : %v", fullpath, err) return fmt.Errorf("findDB %s : %v", fullpath, err)
} }
if bucket != DEFAULT && shortPath == "/" {
store.deleteTable(bucket)
if isValidBucket(bucket) && shortPath == "/" {
if store.deleteTable(bucket) {
store.dbsLock.Lock()
delete(store.dbs, bucket)
store.dbsLock.Unlock()
return nil return nil
} }
}
res, err := db.ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(shortPath)), fullpath)
res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), fullpath)
if err != nil { if err != nil {
return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
} }
@ -209,14 +257,14 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
db, _, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
if err != nil { if err != nil {
return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err) return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
} }
sqlText := store.SqlListExclusive
sqlText := store.GetSqlListExclusive(bucket)
if includeStartFile { if includeStartFile {
sqlText = store.SqlListInclusive
sqlText = store.GetSqlListInclusive(bucket)
} }
rows, err := db.QueryContext(ctx, sqlText, util.HashStringToLong(string(shortPath)), startFileName, string(shortPath), prefix+"%", limit+1) rows, err := db.QueryContext(ctx, sqlText, util.HashStringToLong(string(shortPath)), startFileName, string(shortPath), prefix+"%", limit+1)
@ -259,5 +307,14 @@ func (store *AbstractSqlStore) Shutdown() {
store.DB.Close() store.DB.Close()
} }
func (store *AbstractSqlStore) deleteTable(bucket string) {
func isValidBucket(bucket string) bool {
return bucket != DEFAULT_TABLE && bucket != ""
}
func (store *AbstractSqlStore) createTable(bucket string) error {
return nil
}
func (store *AbstractSqlStore) deleteTable(bucket string) bool {
return false
} }

8
weed/filer/abstract_sql/abstract_sql_store_kv.go

@ -20,7 +20,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by
dirStr, dirHash, name := genDirAndName(key) dirStr, dirHash, name := genDirAndName(key)
res, err := db.ExecContext(ctx, store.SqlInsert, dirHash, name, dirStr, value)
res, err := db.ExecContext(ctx, store.GetSqlInsert(DEFAULT_TABLE), dirHash, name, dirStr, value)
if err == nil { if err == nil {
return return
} }
@ -33,7 +33,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by
// now the insert failed possibly due to duplication constraints // now the insert failed possibly due to duplication constraints
glog.V(1).Infof("kv insert falls back to update: %s", err) glog.V(1).Infof("kv insert falls back to update: %s", err)
res, err = db.ExecContext(ctx, store.SqlUpdate, value, dirHash, name, dirStr)
res, err = db.ExecContext(ctx, store.GetSqlUpdate(DEFAULT_TABLE), value, dirHash, name, dirStr)
if err != nil { if err != nil {
return fmt.Errorf("kv upsert: %s", err) return fmt.Errorf("kv upsert: %s", err)
} }
@ -54,7 +54,7 @@ func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []b
} }
dirStr, dirHash, name := genDirAndName(key) dirStr, dirHash, name := genDirAndName(key)
row := db.QueryRowContext(ctx, store.SqlFind, dirHash, name, dirStr)
row := db.QueryRowContext(ctx, store.GetSqlFind(DEFAULT_TABLE), dirHash, name, dirStr)
err = row.Scan(&value) err = row.Scan(&value)
@ -78,7 +78,7 @@ func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err er
dirStr, dirHash, name := genDirAndName(key) dirStr, dirHash, name := genDirAndName(key)
res, err := db.ExecContext(ctx, store.SqlDelete, dirHash, name, dirStr)
res, err := db.ExecContext(ctx, store.GetSqlDelete(DEFAULT_TABLE), dirHash, name, dirStr)
if err != nil { if err != nil {
return fmt.Errorf("kv delete: %s", err) return fmt.Errorf("kv delete: %s", err)
} }

45
weed/filer/mysql/mysql_store.go

@ -15,6 +15,41 @@ const (
CONNECTION_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?charset=utf8" CONNECTION_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
) )
type SqlGenMysql struct {
}
var (
_ = abstract_sql.SqlGenerator(&SqlGenMysql{})
)
func (gen *SqlGenMysql) GetSqlInsert(bucket string) string {
return "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)"
}
func (gen *SqlGenMysql) GetSqlUpdate(bucket string) string {
return "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?"
}
func (gen *SqlGenMysql) GetSqlFind(bucket string) string {
return "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
}
func (gen *SqlGenMysql) GetSqlDelete(bucket string) string {
return "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
}
func (gen *SqlGenMysql) GetSqlDeleteFolderChildren(bucket string) string {
return "DELETE FROM filemeta WHERE dirhash=? AND directory=?"
}
func (gen *SqlGenMysql) GetSqlListExclusive(bucket string) string {
return "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?"
}
func (gen *SqlGenMysql) GetSqlListInclusive(bucket string) string {
return "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?"
}
func init() { func init() {
filer.Stores = append(filer.Stores, &MysqlStore{}) filer.Stores = append(filer.Stores, &MysqlStore{})
} }
@ -43,14 +78,8 @@ func (store *MysqlStore) Initialize(configuration util.Configuration, prefix str
func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen, func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen,
maxLifetimeSeconds int, interpolateParams bool) (err error) { maxLifetimeSeconds int, interpolateParams bool) (err error) {
//
store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)"
store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?"
store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?"
store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?"
store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?"
store.SqlGenerator = &SqlGenMysql{}
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
if interpolateParams { if interpolateParams {

43
weed/filer/postgres/postgres_store.go

@ -14,6 +14,41 @@ const (
CONNECTION_URL_PATTERN = "host=%s port=%d sslmode=%s connect_timeout=30" CONNECTION_URL_PATTERN = "host=%s port=%d sslmode=%s connect_timeout=30"
) )
type SqlGenPostgres struct {
}
var (
_ = abstract_sql.SqlGenerator(&SqlGenPostgres{})
)
func (gen *SqlGenPostgres) GetSqlInsert(bucket string) string {
return "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)"
}
func (gen *SqlGenPostgres) GetSqlUpdate(bucket string) string {
return "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4"
}
func (gen *SqlGenPostgres) GetSqlFind(bucket string) string {
return "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
}
func (gen *SqlGenPostgres) GetSqlDelete(bucket string) string {
return "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
}
func (gen *SqlGenPostgres) GetSqlDeleteFolderChildren(bucket string) string {
return "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2"
}
func (gen *SqlGenPostgres) GetSqlListExclusive(bucket string) string {
return "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5"
}
func (gen *SqlGenPostgres) GetSqlListInclusive(bucket string) string {
return "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5"
}
func init() { func init() {
filer.Stores = append(filer.Stores, &PostgresStore{}) filer.Stores = append(filer.Stores, &PostgresStore{})
} }
@ -41,13 +76,7 @@ func (store *PostgresStore) Initialize(configuration util.Configuration, prefix
func (store *PostgresStore) initialize(user, password, hostname string, port int, database, sslmode string, maxIdle, maxOpen int) (err error) { func (store *PostgresStore) initialize(user, password, hostname string, port int, database, sslmode string, maxIdle, maxOpen int) (err error) {
store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)"
store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4"
store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2"
store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5"
store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5"
store.SqlGenerator = &SqlGenPostgres{}
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode) sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode)
if user != "" { if user != "" {

Loading…
Cancel
Save