From d5add83e85da0c61fe107842e7dd82b52af2bcdb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 19 Jan 2021 18:07:29 -0800 Subject: [PATCH] filer store: add postgres2 --- weed/command/scaffold.go | 20 +++++ weed/filer/abstract_sql/abstract_sql_store.go | 4 +- .../{mysql_store.go => mysql2_store.go} | 5 ++ weed/filer/postgres/postgres_sql_gen.go | 53 +++++++++++ weed/filer/postgres/postgres_store.go | 55 ++---------- weed/filer/postgres2/postgres2_store.go | 87 +++++++++++++++++++ weed/server/filer_server.go | 1 + 7 files changed, 175 insertions(+), 50 deletions(-) rename weed/filer/mysql2/{mysql_store.go => mysql2_store.go} (92%) create mode 100644 weed/filer/postgres/postgres_sql_gen.go create mode 100644 weed/filer/postgres2/postgres2_store.go diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index b44bc41c1..c42dce7be 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -160,6 +160,26 @@ sslmode = "disable" connection_max_idle = 100 connection_max_open = 100 +[postgres2] +enabled = false +createTable = """ + CREATE TABLE IF NOT EXISTS %s ( + dirhash BIGINT, + name VARCHAR(65535), + directory VARCHAR(65535), + meta bytea, + PRIMARY KEY (dirhash, name) + ); +""" +hostname = "localhost" +port = 5432 +username = "postgres" +password = "" +database = "" # create or use an existing database +sslmode = "disable" +connection_max_idle = 100 +connection_max_open = 100 + [cassandra] # CREATE TABLE filemeta ( # directory varchar, diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index 8345d7a7b..91b0bc98f 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -107,7 +107,7 @@ func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.Full } if _, found := store.dbs[bucket]; !found { - if err = store.createTable(ctx, bucket); err != nil { + if err = store.CreateTable(ctx, bucket); err != nil { store.dbs[bucket] = true } } @@ -322,7 +322,7 @@ func isValidBucket(bucket string) bool { return bucket != DEFAULT_TABLE && bucket != "" } -func (store *AbstractSqlStore) createTable(ctx context.Context, bucket string) error { +func (store *AbstractSqlStore) CreateTable(ctx context.Context, bucket string) error { if !store.SupportBucketTable { return nil } diff --git a/weed/filer/mysql2/mysql_store.go b/weed/filer/mysql2/mysql2_store.go similarity index 92% rename from weed/filer/mysql2/mysql_store.go rename to weed/filer/mysql2/mysql2_store.go index fb74b6f7f..15216b651 100644 --- a/weed/filer/mysql2/mysql_store.go +++ b/weed/filer/mysql2/mysql2_store.go @@ -1,6 +1,7 @@ package mysql2 import ( + "context" "database/sql" "fmt" "time" @@ -73,5 +74,9 @@ func (store *MysqlStore2) initialize(createTable, user, password, hostname strin return fmt.Errorf("connect to %s error:%v", sqlUrl, err) } + if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil { + return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err) + } + return nil } diff --git a/weed/filer/postgres/postgres_sql_gen.go b/weed/filer/postgres/postgres_sql_gen.go new file mode 100644 index 000000000..284cf254b --- /dev/null +++ b/weed/filer/postgres/postgres_sql_gen.go @@ -0,0 +1,53 @@ +package postgres + +import ( + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" + _ "github.com/lib/pq" +) + +type SqlGenPostgres struct { + CreateTableSqlTemplate string + DropTableSqlTemplate string +} + +var ( + _ = abstract_sql.SqlGenerator(&SqlGenPostgres{}) +) + +func (gen *SqlGenPostgres) GetSqlInsert(bucket string) string { + return fmt.Sprintf("INSERT INTO %s (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)", bucket) +} + +func (gen *SqlGenPostgres) GetSqlUpdate(bucket string) string { + return fmt.Sprintf("UPDATE %s SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4", bucket) +} + +func (gen *SqlGenPostgres) GetSqlFind(bucket string) string { + return fmt.Sprintf("SELECT meta FROM %s WHERE dirhash=$1 AND name=$2 AND directory=$3", bucket) +} + +func (gen *SqlGenPostgres) GetSqlDelete(bucket string) string { + return fmt.Sprintf("DELETE FROM %s WHERE dirhash=$1 AND name=$2 AND directory=$3", bucket) +} + +func (gen *SqlGenPostgres) GetSqlDeleteFolderChildren(bucket string) string { + return fmt.Sprintf("DELETE FROM %s WHERE dirhash=$1 AND directory=$2", bucket) +} + +func (gen *SqlGenPostgres) GetSqlListExclusive(bucket string) string { + return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5", bucket) +} + +func (gen *SqlGenPostgres) GetSqlListInclusive(bucket string) string { + return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5", bucket) +} + +func (gen *SqlGenPostgres) GetSqlCreateTable(bucket string) string { + return fmt.Sprintf(gen.CreateTableSqlTemplate, bucket) +} + +func (gen *SqlGenPostgres) GetSqlDropTable(bucket string) string { + return fmt.Sprintf(gen.DropTableSqlTemplate, bucket) +} diff --git a/weed/filer/postgres/postgres_store.go b/weed/filer/postgres/postgres_store.go index 8c36b8672..27c6278c7 100644 --- a/weed/filer/postgres/postgres_store.go +++ b/weed/filer/postgres/postgres_store.go @@ -14,51 +14,6 @@ const ( CONNECTION_URL_PATTERN = "host=%s port=%d sslmode=%s connect_timeout=30" ) -type SqlGenPostgres struct { - createTableSqlTemplate string - dropTableSqlTemplate string -} - -var ( - _ = abstract_sql.SqlGenerator(&SqlGenPostgres{}) -) - -func (gen *SqlGenPostgres) GetSqlInsert(bucket string) string { - return "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)" -} - -func (gen *SqlGenPostgres) GetSqlUpdate(bucket string) string { - return "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4" -} - -func (gen *SqlGenPostgres) GetSqlFind(bucket string) string { - return "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" -} - -func (gen *SqlGenPostgres) GetSqlDelete(bucket string) string { - return "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" -} - -func (gen *SqlGenPostgres) GetSqlDeleteFolderChildren(bucket string) string { - return "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2" -} - -func (gen *SqlGenPostgres) GetSqlListExclusive(bucket string) string { - return "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5" -} - -func (gen *SqlGenPostgres) GetSqlListInclusive(bucket string) string { - return "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5" -} - -func (gen *SqlGenPostgres) GetSqlCreateTable(bucket string) string { - return fmt.Sprintf(gen.createTableSqlTemplate, bucket) -} - -func (gen *SqlGenPostgres) GetSqlDropTable(bucket string) string { - return fmt.Sprintf(gen.dropTableSqlTemplate, bucket) -} - func init() { filer.Stores = append(filer.Stores, &PostgresStore{}) } @@ -78,18 +33,19 @@ func (store *PostgresStore) Initialize(configuration util.Configuration, prefix configuration.GetString(prefix+"hostname"), configuration.GetInt(prefix+"port"), configuration.GetString(prefix+"database"), + configuration.GetString(prefix+"schema"), configuration.GetString(prefix+"sslmode"), configuration.GetInt(prefix+"connection_max_idle"), configuration.GetInt(prefix+"connection_max_open"), ) } -func (store *PostgresStore) initialize(user, password, hostname string, port int, database, sslmode string, maxIdle, maxOpen int) (err error) { +func (store *PostgresStore) initialize(user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen int) (err error) { store.SupportBucketTable = false store.SqlGenerator = &SqlGenPostgres{ - createTableSqlTemplate: "", - dropTableSqlTemplate: "drop table %s", + CreateTableSqlTemplate: "", + DropTableSqlTemplate: "drop table %s", } sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode) @@ -102,6 +58,9 @@ func (store *PostgresStore) initialize(user, password, hostname string, port int if database != "" { sqlUrl += " dbname=" + database } + if schema != "" { + sqlUrl += " search_path=" + schema + } var dbErr error store.DB, dbErr = sql.Open("postgres", sqlUrl) if dbErr != nil { diff --git a/weed/filer/postgres2/postgres2_store.go b/weed/filer/postgres2/postgres2_store.go new file mode 100644 index 000000000..82552376f --- /dev/null +++ b/weed/filer/postgres2/postgres2_store.go @@ -0,0 +1,87 @@ +package postgres2 + +import ( + "context" + "database/sql" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" + "github.com/chrislusf/seaweedfs/weed/filer/postgres" + "github.com/chrislusf/seaweedfs/weed/util" + _ "github.com/lib/pq" +) + +const ( + CONNECTION_URL_PATTERN = "host=%s port=%d sslmode=%s connect_timeout=30" +) + +func init() { + filer.Stores = append(filer.Stores, &PostgresStore2{}) +} + +type PostgresStore2 struct { + abstract_sql.AbstractSqlStore +} + +func (store *PostgresStore2) GetName() string { + return "postgres2" +} + +func (store *PostgresStore2) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"createTable"), + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetString(prefix+"hostname"), + configuration.GetInt(prefix+"port"), + configuration.GetString(prefix+"database"), + configuration.GetString(prefix+"schema"), + configuration.GetString(prefix+"sslmode"), + configuration.GetInt(prefix+"connection_max_idle"), + configuration.GetInt(prefix+"connection_max_open"), + ) +} + +func (store *PostgresStore2) initialize(createTable, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen int) (err error) { + + store.SupportBucketTable = true + store.SqlGenerator = &postgres.SqlGenPostgres{ + CreateTableSqlTemplate: createTable, + DropTableSqlTemplate: "drop table %s", + } + + sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode) + if user != "" { + sqlUrl += " user=" + user + } + if password != "" { + sqlUrl += " password=" + password + } + if database != "" { + sqlUrl += " dbname=" + database + } + if schema != "" { + sqlUrl += " search_path=" + schema + } + var dbErr error + store.DB, dbErr = sql.Open("postgres", sqlUrl) + if dbErr != nil { + store.DB.Close() + store.DB = nil + return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err) + } + + store.DB.SetMaxIdleConns(maxIdle) + store.DB.SetMaxOpenConns(maxOpen) + + if err = store.DB.Ping(); err != nil { + return fmt.Errorf("connect to %s error:%v", sqlUrl, err) + } + + if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil { + return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err) + } + + return nil +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 8ea2b8b82..22474a5e2 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -31,6 +31,7 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" _ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" "github.com/chrislusf/seaweedfs/weed/glog"