diff --git a/go.mod b/go.mod index 958fa7554..6aab666d6 100644 --- a/go.mod +++ b/go.mod @@ -163,6 +163,7 @@ require ( cloud.google.com/go/compute/metadata v0.5.1 // indirect cloud.google.com/go/iam v1.2.0 // indirect filippo.io/edwards25519 v1.1.0 // indirect + gitee.com/chunanyong/dm v1.8.16 // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect diff --git a/go.sum b/go.sum index 9f1f17a02..64b4dc036 100644 --- a/go.sum +++ b/go.sum @@ -531,6 +531,8 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc= +gitee.com/chunanyong/dm v1.8.16 h1:D2c2M3r/hiBX0PNZiFtcawoomwL3xM0ITis7WRTykTM= +gitee.com/chunanyong/dm v1.8.16/go.mod h1:EPRJnuPFgbyOFgJ0TRYCTGzhq+ZT4wdyaj/GW/LLcNg= github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 h1:nyQWyZvwGTvunIMxi1Y9uXkcyr+I7TeNrr/foo4Kpk8= diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index 728aecb53..d167a0b29 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -41,6 +41,39 @@ dir = "./filerrdb" # directory to store rocksdb files enabled = false dbFile = "./filer.db" # sqlite db file +[dameng] # or memsql, tidb +# CREATE TABLE SEAWEEDFS.FILEMETA ( +# DIRHASH BIGINT NOT NULL, +# NAME VARCHAR(4000) NOT NULL, +# DIRECTORY TEXT NOT NULL, +# META LONGVARBINARY, +# PRIMARY KEY (DIRHASH,NAME) +# ); + +enabled = true +# dsn will take priority over "hostname, port, username, password, database". +# [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] +dsn = "" +hostname = "" +port = 5236 +username = "SYSDBA" +password = "SYSDBA001" +database = "seaweedfs" # create or use an existing database +connection_max_idle = 2 +connection_max_open = 100 +connection_max_lifetime_seconds = 0 +interpolateParams = false +# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: +enableUpsert = true +upsertQuery = """MERGE INTO %s AS target +USING (SELECT ? AS dirhash, ? AS name, ? AS directory, ? AS meta FROM dual) AS source +ON (target.dirhash = source.dirhash AND target.name = source.name) +WHEN MATCHED THEN + UPDATE SET target.meta = source.meta +WHEN NOT MATCHED THEN + INSERT (dirhash, name, directory, meta) + VALUES (source.dirhash, source.name, source.directory, source.meta);""" + [mysql] # or memsql, tidb # CREATE TABLE IF NOT EXISTS `filemeta` ( # `dirhash` BIGINT NOT NULL COMMENT 'first 64 bits of MD5 hash value of directory field', diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index 1d175651d..2117e007f 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -95,7 +95,7 @@ func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error { return nil } -func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) { +func (store *AbstractSqlStore) GetTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) { shortPath = fullpath bucket = DEFAULT_TABLE @@ -150,7 +150,7 @@ func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.Full func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { - db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) + db, bucket, shortPath, err := store.GetTxOrDB(ctx, entry.FullPath, false) if err != nil { return fmt.Errorf("findDB %s : %v", entry.FullPath, err) } @@ -186,7 +186,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { - db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) + db, bucket, shortPath, err := store.GetTxOrDB(ctx, entry.FullPath, false) if err != nil { return fmt.Errorf("findDB %s : %v", entry.FullPath, err) } @@ -211,7 +211,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Ent func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) { - db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false) + db, bucket, shortPath, err := store.GetTxOrDB(ctx, fullpath, false) if err != nil { return nil, fmt.Errorf("findDB %s : %v", fullpath, err) } @@ -239,7 +239,7 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { - db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false) + db, bucket, shortPath, err := store.GetTxOrDB(ctx, fullpath, false) if err != nil { return fmt.Errorf("findDB %s : %v", fullpath, err) } @@ -261,7 +261,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.Fu func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { - db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true) + db, bucket, shortPath, err := store.GetTxOrDB(ctx, fullpath, true) if err != nil { return fmt.Errorf("findDB %s : %v", fullpath, err) } @@ -292,7 +292,7 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true) + db, bucket, shortPath, err := store.GetTxOrDB(ctx, dirPath, true) if err != nil { return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err) } diff --git a/weed/filer/abstract_sql/abstract_sql_store_kv.go b/weed/filer/abstract_sql/abstract_sql_store_kv.go index 221902aaa..64bf3bc8f 100644 --- a/weed/filer/abstract_sql/abstract_sql_store_kv.go +++ b/weed/filer/abstract_sql/abstract_sql_store_kv.go @@ -13,7 +13,7 @@ import ( func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { - db, _, _, err := store.getTxOrDB(ctx, "", false) + db, _, _, err := store.GetTxOrDB(ctx, "", false) if err != nil { return fmt.Errorf("findDB: %v", err) } @@ -48,7 +48,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { - db, _, _, err := store.getTxOrDB(ctx, "", false) + db, _, _, err := store.GetTxOrDB(ctx, "", false) if err != nil { return nil, fmt.Errorf("findDB: %v", err) } @@ -71,7 +71,7 @@ func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []b func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err error) { - db, _, _, err := store.getTxOrDB(ctx, "", false) + db, _, _, err := store.GetTxOrDB(ctx, "", false) if err != nil { return fmt.Errorf("findDB: %v", err) } diff --git a/weed/filer/dameng/dameng_sql_gen.go b/weed/filer/dameng/dameng_sql_gen.go new file mode 100644 index 000000000..e98c594b1 --- /dev/null +++ b/weed/filer/dameng/dameng_sql_gen.go @@ -0,0 +1,66 @@ +package dameng + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql" +) + +type SqlGenDameng struct { + CreateTableSqlTemplate string + DropTableSqlTemplate string + UpsertQueryTemplate string +} + +var ( + _ = abstract_sql.SqlGenerator(&SqlGenDameng{}) +) + +func (gen *SqlGenDameng) GetSqlInsert(tableName string) string { + sql := "" + if gen.UpsertQueryTemplate != "" { + sql = fmt.Sprintf(`MERGE INTO %s AS target + USING (SELECT ? AS dirhash, ? AS name, ? AS directory, ? AS meta FROM dual) AS source + ON (target.dirhash = source.dirhash AND target.name = source.name) + WHEN MATCHED THEN + UPDATE SET target.meta = source.meta + WHEN NOT MATCHED THEN + INSERT (dirhash, name, directory, meta) + VALUES (source.dirhash, source.name, source.directory, source.meta);`, tableName) + } else { + sql = fmt.Sprintf("INSERT INTO %s (dirhash,name,directory,meta) VALUES(?,?,?,?)", tableName) + } + return sql +} + +func (gen *SqlGenDameng) GetSqlUpdate(tableName string) string { + return fmt.Sprintf("UPDATE %s SET meta = ? WHERE dirhash = ? AND name = ? AND directory = ?", tableName) +} + +func (gen *SqlGenDameng) GetSqlFind(tableName string) string { + return fmt.Sprintf("SELECT meta FROM %s WHERE dirhash = ? AND name = ? AND directory = ?", tableName) +} + +func (gen *SqlGenDameng) GetSqlDelete(tableName string) string { + return fmt.Sprintf("DELETE FROM %s WHERE dirhash = ? AND name = ? AND directory = ?", tableName) +} + +func (gen *SqlGenDameng) GetSqlDeleteFolderChildren(tableName string) string { + return fmt.Sprintf("DELETE FROM %s WHERE dirhash = ? AND directory = ?", tableName) +} + +func (gen *SqlGenDameng) GetSqlListExclusive(tableName string) string { + return fmt.Sprintf("SELECT name, meta FROM %s WHERE dirhash = ? AND rowid > (SELECT IFNULL(MIN(rowid), 0) FROM %s WHERE directory = ? AND name = ?) AND directory = ? ORDER BY rowid ASC LIMIT ?", tableName, tableName) +} + +func (gen *SqlGenDameng) GetSqlListInclusive(tableName string) string { + return fmt.Sprintf("SELECT name, meta FROM %s WHERE dirhash = ? AND rowid >= (SELECT IFNULL(MIN(rowid), 0) FROM %s WHERE directory = ? AND name = ?) AND directory = ? ORDER BY rowid ASC LIMIT ?", tableName, tableName) +} + +func (gen *SqlGenDameng) GetSqlCreateTable(tableName string) string { + return fmt.Sprintf(gen.CreateTableSqlTemplate, tableName) +} + +func (gen *SqlGenDameng) GetSqlDropTable(tableName string) string { + return fmt.Sprintf(gen.DropTableSqlTemplate, tableName) +} diff --git a/weed/filer/dameng/dameng_store.go b/weed/filer/dameng/dameng_store.go new file mode 100644 index 000000000..66f7740de --- /dev/null +++ b/weed/filer/dameng/dameng_store.go @@ -0,0 +1,129 @@ +package dameng + +import ( + "context" + "database/sql" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "strings" + "time" + + _ "gitee.com/chunanyong/dm" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +const ( + CONNECTION_URL_PATTERN = "dm://%s:%s@%s:%d?schema=%s" +) + +func init() { + filer.Stores = append(filer.Stores, &DamengStore{}) +} + +type DamengStore struct { + abstract_sql.AbstractSqlStore +} + +func (store *DamengStore) GetName() string { + return "dameng" +} + +func (store *DamengStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"dsn"), + configuration.GetString(prefix+"upsertQuery"), + configuration.GetBool(prefix+"enableUpsert"), + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetString(prefix+"hostname"), + configuration.GetInt(prefix+"port"), + configuration.GetString(prefix+"database"), + configuration.GetInt(prefix+"connection_max_idle"), + configuration.GetInt(prefix+"connection_max_open"), + configuration.GetInt(prefix+"connection_max_lifetime_seconds"), + configuration.GetBool(prefix+"interpolateParams"), + ) +} + +func (store *DamengStore) initialize(dsn string, upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database string, maxIdle, maxOpen, + maxLifetimeSeconds int, interpolateParams bool) (err error) { + + store.SupportBucketTable = false + if !enableUpsert { + upsertQuery = "" + } + store.SqlGenerator = &SqlGenDameng{ + CreateTableSqlTemplate: "", + DropTableSqlTemplate: "DROP TABLE `%s`", + UpsertQueryTemplate: upsertQuery, + } + + dsn = fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) + + var dbErr error + store.DB, dbErr = sql.Open("dm", dsn) + if dbErr != nil { + store.DB.Close() + store.DB = nil + return fmt.Errorf("can not connect to %s error:%v", strings.ReplaceAll(dsn, "", ""), err) + } + + store.DB.SetMaxIdleConns(maxIdle) + store.DB.SetMaxOpenConns(maxOpen) + store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second) + + if err = store.DB.Ping(); err != nil { + return fmt.Errorf("connect to %s error:%v", strings.ReplaceAll(dsn, "", ""), err) + } + + return nil +} + +func (store *DamengStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) +} + +func (store *DamengStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + db, bucket, shortPath, err := store.GetTxOrDB(ctx, dirPath, true) + if err != nil { + return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err) + } + + sqlText := store.GetSqlListExclusive(bucket) + if includeStartFile { + sqlText = store.GetSqlListInclusive(bucket) + } + + rows, err := db.QueryContext(ctx, sqlText, util.HashStringToLong(string(shortPath)), string(shortPath), startFileName, string(shortPath), limit) + if err != nil { + return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) + } + defer rows.Close() + + for rows.Next() { + var name string + var data []byte + if err = rows.Scan(&name, &data); err != nil { + glog.V(0).Infof("scan %s : %v", dirPath, err) + return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err) + } + lastFileName = name + + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(dirPath), name), + } + if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { + glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err) + return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) + } + + if !eachEntryFunc(entry) { + break + } + + } + + return lastFileName, nil +} diff --git a/weed/filer/dameng/dameng_store_test.go b/weed/filer/dameng/dameng_store_test.go new file mode 100644 index 000000000..0b0c9ebc3 --- /dev/null +++ b/weed/filer/dameng/dameng_store_test.go @@ -0,0 +1,23 @@ +package dameng + +import ( + "github.com/seaweedfs/seaweedfs/weed/filer/store_test" + "testing" +) + +func TestStore(t *testing.T) { + // run "make test_ydb" under docker folder. + // to set up local env + store := &DamengStore{} + store.initialize("localhost", `MERGE INTO %s AS target +USING (SELECT ? AS dirhash, ? AS name, ? AS directory, ? AS meta FROM dual) AS source +ON (target.dirhash = source.dirhash AND target.name = source.name) +WHEN MATCHED THEN + UPDATE SET target.meta = source.meta +WHEN NOT MATCHED THEN + INSERT (dirhash, name, directory, meta) + VALUES (source.dirhash, source.name, source.directory, source.meta);`, + true, "SYSDBA", "SYSDBA001", "localhost", 5236, + "seaweedfs", 100, 2, 10, false) + store_test.TestFilerStore(t, store) +} diff --git a/weed/filer/store_test/test_suite.go b/weed/filer/store_test/test_suite.go index 1e4149589..0dae7d7fa 100644 --- a/weed/filer/store_test/test_suite.go +++ b/weed/filer/store_test/test_suite.go @@ -17,7 +17,7 @@ func TestFilerStore(t *testing.T, store filer.FilerStore) { store.InsertEntry(ctx, makeEntry(util.FullPath("/a"), true)) store.InsertEntry(ctx, makeEntry(util.FullPath("/a/b"), true)) store.InsertEntry(ctx, makeEntry(util.FullPath("/a/b/c"), true)) - for i := 0; i < 2000; i++ { + for i := 1; i <= 2000; i++ { store.InsertEntry(ctx, makeEntry(util.FullPath(fmt.Sprintf("/a/b/c/f%05d", i)), false)) } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index ee052579c..f8524e6fd 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -25,6 +25,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" _ "github.com/seaweedfs/seaweedfs/weed/filer/arangodb" _ "github.com/seaweedfs/seaweedfs/weed/filer/cassandra" + _ "github.com/seaweedfs/seaweedfs/weed/filer/dameng" _ "github.com/seaweedfs/seaweedfs/weed/filer/elastic/v7" _ "github.com/seaweedfs/seaweedfs/weed/filer/etcd" _ "github.com/seaweedfs/seaweedfs/weed/filer/hbase"