From 4c5b752b040bbbee34fdc1db61fe6e210fb11961 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 19 Jan 2021 13:53:16 -0800 Subject: [PATCH] restructuring sql stores --- weed/filer/abstract_sql/abstract_sql_store.go | 113 +++++++++++++----- .../abstract_sql/abstract_sql_store_kv.go | 8 +- weed/filer/mysql/mysql_store.go | 45 +++++-- weed/filer/postgres/postgres_store.go | 43 +++++-- 4 files changed, 162 insertions(+), 47 deletions(-) diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index 9aafd448e..dd35112a2 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -9,21 +9,28 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "strings" + "sync" ) +type SqlGenerator interface { + GetSqlInsert(bucket string) string + GetSqlUpdate(bucket string) string + GetSqlFind(bucket string) string + GetSqlDelete(bucket string) string + GetSqlDeleteFolderChildren(bucket string) string + GetSqlListExclusive(bucket string) string + GetSqlListInclusive(bucket string) string +} + type AbstractSqlStore struct { - DB *sql.DB - SqlInsert string - SqlUpdate string - SqlFind string - SqlDelete string - SqlDeleteFolderChildren string - SqlListExclusive string - SqlListInclusive string + DB *sql.DB + SqlGenerator + dbs map[string]bool + dbsLock sync.Mutex } const ( - DEFAULT = "_main" + DEFAULT_TABLE = "filemeta" ) type TxOrDB interface { @@ -57,16 +64,53 @@ func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error { } func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) { + shortPath = fullpath + bucket = DEFAULT_TABLE + if tx, ok := ctx.Value("tx").(*sql.Tx); ok { - return tx, bucket, shortPath, err + txOrDB = tx + } else { + txOrDB = store.DB + } + + if strings.HasPrefix(string(fullpath), "/buckets/") { + return } - return store.DB, bucket, shortPath, err + + // detect bucket + bucketAndObjectKey := string(fullpath)[len("/buckets/"):] + t := strings.Index(bucketAndObjectKey, "/") + if t < 0 && !isForChildren { + return + } + if t > 0 { + bucket = bucketAndObjectKey[:t] + shortPath = util.FullPath(bucketAndObjectKey[t:]) + } + + if isValidBucket(bucket) { + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + if store.dbs == nil { + store.dbs = make(map[string]bool) + } + + if _, found := store.dbs[bucket]; !found { + if err = store.createTable(bucket); err != nil { + store.dbs[bucket] = true + } + } + + } + + return } func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { - db, _, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) + db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) if err != nil { return fmt.Errorf("findDB %s : %v", entry.FullPath, err) } @@ -81,7 +125,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent meta = util.MaybeGzipData(meta) } - res, err := db.ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta) + res, err := db.ExecContext(ctx, store.GetSqlInsert(bucket), util.HashStringToLong(dir), name, dir, meta) if err == nil { return } @@ -94,7 +138,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent // now the insert failed possibly due to duplication constraints glog.V(1).Infof("insert %s falls back to update: %v", entry.FullPath, err) - res, err = db.ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir) + res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir) if err != nil { return fmt.Errorf("upsert %s: %s", entry.FullPath, err) } @@ -109,7 +153,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { - db, _, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) + db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) if err != nil { return fmt.Errorf("findDB %s : %v", entry.FullPath, err) } @@ -120,7 +164,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Ent return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - res, err := db.ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir) + res, err := db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir) if err != nil { return fmt.Errorf("update %s: %s", entry.FullPath, err) } @@ -134,13 +178,13 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Ent func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) { - db, _, shortPath, err := store.getTxOrDB(ctx, fullpath, false) + db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false) if err != nil { return nil, fmt.Errorf("findDB %s : %v", fullpath, err) } dir, name := shortPath.DirAndName() - row := db.QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir) + row := db.QueryRowContext(ctx, store.GetSqlFind(bucket), util.HashStringToLong(dir), name, dir) var data []byte if err := row.Scan(&data); err != nil { @@ -162,14 +206,14 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { - db, _, shortPath, err := store.getTxOrDB(ctx, fullpath, false) + db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false) if err != nil { return fmt.Errorf("findDB %s : %v", fullpath, err) } dir, name := shortPath.DirAndName() - res, err := db.ExecContext(ctx, store.SqlDelete, util.HashStringToLong(dir), name, dir) + res, err := db.ExecContext(ctx, store.GetSqlDelete(bucket), util.HashStringToLong(dir), name, dir) if err != nil { return fmt.Errorf("delete %s: %s", fullpath, err) } @@ -189,12 +233,16 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat return fmt.Errorf("findDB %s : %v", fullpath, err) } - if bucket != DEFAULT && shortPath == "/" { - store.deleteTable(bucket) - return nil + if isValidBucket(bucket) && shortPath == "/" { + if store.deleteTable(bucket) { + store.dbsLock.Lock() + delete(store.dbs, bucket) + store.dbsLock.Unlock() + return nil + } } - res, err := db.ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(shortPath)), fullpath) + res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), fullpath) if err != nil { return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) } @@ -209,14 +257,14 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - db, _, shortPath, err := store.getTxOrDB(ctx, dirPath, true) + db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true) if err != nil { return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err) } - sqlText := store.SqlListExclusive + sqlText := store.GetSqlListExclusive(bucket) if includeStartFile { - sqlText = store.SqlListInclusive + sqlText = store.GetSqlListInclusive(bucket) } rows, err := db.QueryContext(ctx, sqlText, util.HashStringToLong(string(shortPath)), startFileName, string(shortPath), prefix+"%", limit+1) @@ -259,5 +307,14 @@ func (store *AbstractSqlStore) Shutdown() { store.DB.Close() } -func (store *AbstractSqlStore) deleteTable(bucket string) { +func isValidBucket(bucket string) bool { + return bucket != DEFAULT_TABLE && bucket != "" +} + +func (store *AbstractSqlStore) createTable(bucket string) error { + return nil +} + +func (store *AbstractSqlStore) deleteTable(bucket string) bool { + return false } diff --git a/weed/filer/abstract_sql/abstract_sql_store_kv.go b/weed/filer/abstract_sql/abstract_sql_store_kv.go index 4e56c5db2..03b016c76 100644 --- a/weed/filer/abstract_sql/abstract_sql_store_kv.go +++ b/weed/filer/abstract_sql/abstract_sql_store_kv.go @@ -20,7 +20,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by dirStr, dirHash, name := genDirAndName(key) - res, err := db.ExecContext(ctx, store.SqlInsert, dirHash, name, dirStr, value) + res, err := db.ExecContext(ctx, store.GetSqlInsert(DEFAULT_TABLE), dirHash, name, dirStr, value) if err == nil { return } @@ -33,7 +33,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by // now the insert failed possibly due to duplication constraints glog.V(1).Infof("kv insert falls back to update: %s", err) - res, err = db.ExecContext(ctx, store.SqlUpdate, value, dirHash, name, dirStr) + res, err = db.ExecContext(ctx, store.GetSqlUpdate(DEFAULT_TABLE), value, dirHash, name, dirStr) if err != nil { return fmt.Errorf("kv upsert: %s", err) } @@ -54,7 +54,7 @@ func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []b } dirStr, dirHash, name := genDirAndName(key) - row := db.QueryRowContext(ctx, store.SqlFind, dirHash, name, dirStr) + row := db.QueryRowContext(ctx, store.GetSqlFind(DEFAULT_TABLE), dirHash, name, dirStr) err = row.Scan(&value) @@ -78,7 +78,7 @@ func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err er dirStr, dirHash, name := genDirAndName(key) - res, err := db.ExecContext(ctx, store.SqlDelete, dirHash, name, dirStr) + res, err := db.ExecContext(ctx, store.GetSqlDelete(DEFAULT_TABLE), dirHash, name, dirStr) if err != nil { return fmt.Errorf("kv delete: %s", err) } diff --git a/weed/filer/mysql/mysql_store.go b/weed/filer/mysql/mysql_store.go index 70f729fc9..479d2eed1 100644 --- a/weed/filer/mysql/mysql_store.go +++ b/weed/filer/mysql/mysql_store.go @@ -15,6 +15,41 @@ const ( CONNECTION_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?charset=utf8" ) +type SqlGenMysql struct { +} + +var ( + _ = abstract_sql.SqlGenerator(&SqlGenMysql{}) +) + +func (gen *SqlGenMysql) GetSqlInsert(bucket string) string { + return "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)" +} + +func (gen *SqlGenMysql) GetSqlUpdate(bucket string) string { + return "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?" +} + +func (gen *SqlGenMysql) GetSqlFind(bucket string) string { + return "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?" +} + +func (gen *SqlGenMysql) GetSqlDelete(bucket string) string { + return "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?" +} + +func (gen *SqlGenMysql) GetSqlDeleteFolderChildren(bucket string) string { + return "DELETE FROM filemeta WHERE dirhash=? AND directory=?" +} + +func (gen *SqlGenMysql) GetSqlListExclusive(bucket string) string { + return "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?" +} + +func (gen *SqlGenMysql) GetSqlListInclusive(bucket string) string { + return "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?" +} + func init() { filer.Stores = append(filer.Stores, &MysqlStore{}) } @@ -43,14 +78,8 @@ func (store *MysqlStore) Initialize(configuration util.Configuration, prefix str func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen, maxLifetimeSeconds int, interpolateParams bool) (err error) { - // - store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)" - store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?" - store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?" - store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?" - store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?" - store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?" - store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?" + + store.SqlGenerator = &SqlGenMysql{} sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) if interpolateParams { diff --git a/weed/filer/postgres/postgres_store.go b/weed/filer/postgres/postgres_store.go index 2325568fe..783f27c10 100644 --- a/weed/filer/postgres/postgres_store.go +++ b/weed/filer/postgres/postgres_store.go @@ -14,6 +14,41 @@ const ( CONNECTION_URL_PATTERN = "host=%s port=%d sslmode=%s connect_timeout=30" ) +type SqlGenPostgres struct { +} + +var ( + _ = abstract_sql.SqlGenerator(&SqlGenPostgres{}) +) + +func (gen *SqlGenPostgres) GetSqlInsert(bucket string) string { + return "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)" +} + +func (gen *SqlGenPostgres) GetSqlUpdate(bucket string) string { + return "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4" +} + +func (gen *SqlGenPostgres) GetSqlFind(bucket string) string { + return "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" +} + +func (gen *SqlGenPostgres) GetSqlDelete(bucket string) string { + return "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" +} + +func (gen *SqlGenPostgres) GetSqlDeleteFolderChildren(bucket string) string { + return "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2" +} + +func (gen *SqlGenPostgres) GetSqlListExclusive(bucket string) string { + return "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5" +} + +func (gen *SqlGenPostgres) GetSqlListInclusive(bucket string) string { + return "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5" +} + func init() { filer.Stores = append(filer.Stores, &PostgresStore{}) } @@ -41,13 +76,7 @@ func (store *PostgresStore) Initialize(configuration util.Configuration, prefix func (store *PostgresStore) initialize(user, password, hostname string, port int, database, sslmode string, maxIdle, maxOpen int) (err error) { - store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)" - store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4" - store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" - store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" - store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2" - store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5" - store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5" + store.SqlGenerator = &SqlGenPostgres{} sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode) if user != "" {