Browse Source

support dameng as filer store

pull/6061/head
Vegetable540 4 months ago
parent
commit
f45fec6940
  1. 1
      go.mod
  2. 2
      go.sum
  3. 33
      weed/command/scaffold/filer.toml
  4. 14
      weed/filer/abstract_sql/abstract_sql_store.go
  5. 6
      weed/filer/abstract_sql/abstract_sql_store_kv.go
  6. 66
      weed/filer/dameng/dameng_sql_gen.go
  7. 129
      weed/filer/dameng/dameng_store.go
  8. 23
      weed/filer/dameng/dameng_store_test.go
  9. 2
      weed/filer/store_test/test_suite.go
  10. 1
      weed/server/filer_server.go

1
go.mod

@ -163,6 +163,7 @@ require (
cloud.google.com/go/compute/metadata v0.5.1 // indirect
cloud.google.com/go/iam v1.2.0 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
gitee.com/chunanyong/dm v1.8.16 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect

2
go.sum

@ -531,6 +531,8 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc=
gitee.com/chunanyong/dm v1.8.16 h1:D2c2M3r/hiBX0PNZiFtcawoomwL3xM0ITis7WRTykTM=
gitee.com/chunanyong/dm v1.8.16/go.mod h1:EPRJnuPFgbyOFgJ0TRYCTGzhq+ZT4wdyaj/GW/LLcNg=
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 h1:nyQWyZvwGTvunIMxi1Y9uXkcyr+I7TeNrr/foo4Kpk8=

33
weed/command/scaffold/filer.toml

@ -41,6 +41,39 @@ dir = "./filerrdb" # directory to store rocksdb files
enabled = false
dbFile = "./filer.db" # sqlite db file
[dameng] # or memsql, tidb
# CREATE TABLE SEAWEEDFS.FILEMETA (
# DIRHASH BIGINT NOT NULL,
# NAME VARCHAR(4000) NOT NULL,
# DIRECTORY TEXT NOT NULL,
# META LONGVARBINARY,
# PRIMARY KEY (DIRHASH,NAME)
# );
enabled = true
# dsn will take priority over "hostname, port, username, password, database".
# [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
dsn = ""
hostname = ""
port = 5236
username = "SYSDBA"
password = "SYSDBA001"
database = "seaweedfs" # create or use an existing database
connection_max_idle = 2
connection_max_open = 100
connection_max_lifetime_seconds = 0
interpolateParams = false
# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
enableUpsert = true
upsertQuery = """MERGE INTO %s AS target
USING (SELECT ? AS dirhash, ? AS name, ? AS directory, ? AS meta FROM dual) AS source
ON (target.dirhash = source.dirhash AND target.name = source.name)
WHEN MATCHED THEN
UPDATE SET target.meta = source.meta
WHEN NOT MATCHED THEN
INSERT (dirhash, name, directory, meta)
VALUES (source.dirhash, source.name, source.directory, source.meta);"""
[mysql] # or memsql, tidb
# CREATE TABLE IF NOT EXISTS `filemeta` (
# `dirhash` BIGINT NOT NULL COMMENT 'first 64 bits of MD5 hash value of directory field',

14
weed/filer/abstract_sql/abstract_sql_store.go

@ -95,7 +95,7 @@ func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
return nil
}
func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) {
func (store *AbstractSqlStore) GetTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) {
shortPath = fullpath
bucket = DEFAULT_TABLE
@ -150,7 +150,7 @@ func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.Full
func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
db, bucket, shortPath, err := store.GetTxOrDB(ctx, entry.FullPath, false)
if err != nil {
return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
}
@ -186,7 +186,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
db, bucket, shortPath, err := store.GetTxOrDB(ctx, entry.FullPath, false)
if err != nil {
return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
}
@ -211,7 +211,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Ent
func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
db, bucket, shortPath, err := store.GetTxOrDB(ctx, fullpath, false)
if err != nil {
return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
}
@ -239,7 +239,7 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full
func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
db, bucket, shortPath, err := store.GetTxOrDB(ctx, fullpath, false)
if err != nil {
return fmt.Errorf("findDB %s : %v", fullpath, err)
}
@ -261,7 +261,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.Fu
func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true)
db, bucket, shortPath, err := store.GetTxOrDB(ctx, fullpath, true)
if err != nil {
return fmt.Errorf("findDB %s : %v", fullpath, err)
}
@ -292,7 +292,7 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
db, bucket, shortPath, err := store.GetTxOrDB(ctx, dirPath, true)
if err != nil {
return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
}

6
weed/filer/abstract_sql/abstract_sql_store_kv.go

@ -13,7 +13,7 @@ import (
func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
db, _, _, err := store.getTxOrDB(ctx, "", false)
db, _, _, err := store.GetTxOrDB(ctx, "", false)
if err != nil {
return fmt.Errorf("findDB: %v", err)
}
@ -48,7 +48,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by
func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
db, _, _, err := store.getTxOrDB(ctx, "", false)
db, _, _, err := store.GetTxOrDB(ctx, "", false)
if err != nil {
return nil, fmt.Errorf("findDB: %v", err)
}
@ -71,7 +71,7 @@ func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []b
func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err error) {
db, _, _, err := store.getTxOrDB(ctx, "", false)
db, _, _, err := store.GetTxOrDB(ctx, "", false)
if err != nil {
return fmt.Errorf("findDB: %v", err)
}

66
weed/filer/dameng/dameng_sql_gen.go

@ -0,0 +1,66 @@
package dameng
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
)
type SqlGenDameng struct {
CreateTableSqlTemplate string
DropTableSqlTemplate string
UpsertQueryTemplate string
}
var (
_ = abstract_sql.SqlGenerator(&SqlGenDameng{})
)
func (gen *SqlGenDameng) GetSqlInsert(tableName string) string {
sql := ""
if gen.UpsertQueryTemplate != "" {
sql = fmt.Sprintf(`MERGE INTO %s AS target
USING (SELECT ? AS dirhash, ? AS name, ? AS directory, ? AS meta FROM dual) AS source
ON (target.dirhash = source.dirhash AND target.name = source.name)
WHEN MATCHED THEN
UPDATE SET target.meta = source.meta
WHEN NOT MATCHED THEN
INSERT (dirhash, name, directory, meta)
VALUES (source.dirhash, source.name, source.directory, source.meta);`, tableName)
} else {
sql = fmt.Sprintf("INSERT INTO %s (dirhash,name,directory,meta) VALUES(?,?,?,?)", tableName)
}
return sql
}
func (gen *SqlGenDameng) GetSqlUpdate(tableName string) string {
return fmt.Sprintf("UPDATE %s SET meta = ? WHERE dirhash = ? AND name = ? AND directory = ?", tableName)
}
func (gen *SqlGenDameng) GetSqlFind(tableName string) string {
return fmt.Sprintf("SELECT meta FROM %s WHERE dirhash = ? AND name = ? AND directory = ?", tableName)
}
func (gen *SqlGenDameng) GetSqlDelete(tableName string) string {
return fmt.Sprintf("DELETE FROM %s WHERE dirhash = ? AND name = ? AND directory = ?", tableName)
}
func (gen *SqlGenDameng) GetSqlDeleteFolderChildren(tableName string) string {
return fmt.Sprintf("DELETE FROM %s WHERE dirhash = ? AND directory = ?", tableName)
}
func (gen *SqlGenDameng) GetSqlListExclusive(tableName string) string {
return fmt.Sprintf("SELECT name, meta FROM %s WHERE dirhash = ? AND rowid > (SELECT IFNULL(MIN(rowid), 0) FROM %s WHERE directory = ? AND name = ?) AND directory = ? ORDER BY rowid ASC LIMIT ?", tableName, tableName)
}
func (gen *SqlGenDameng) GetSqlListInclusive(tableName string) string {
return fmt.Sprintf("SELECT name, meta FROM %s WHERE dirhash = ? AND rowid >= (SELECT IFNULL(MIN(rowid), 0) FROM %s WHERE directory = ? AND name = ?) AND directory = ? ORDER BY rowid ASC LIMIT ?", tableName, tableName)
}
func (gen *SqlGenDameng) GetSqlCreateTable(tableName string) string {
return fmt.Sprintf(gen.CreateTableSqlTemplate, tableName)
}
func (gen *SqlGenDameng) GetSqlDropTable(tableName string) string {
return fmt.Sprintf(gen.DropTableSqlTemplate, tableName)
}

129
weed/filer/dameng/dameng_store.go

@ -0,0 +1,129 @@
package dameng
import (
"context"
"database/sql"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"strings"
"time"
_ "gitee.com/chunanyong/dm"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
"github.com/seaweedfs/seaweedfs/weed/util"
)
const (
CONNECTION_URL_PATTERN = "dm://%s:%s@%s:%d?schema=%s"
)
func init() {
filer.Stores = append(filer.Stores, &DamengStore{})
}
type DamengStore struct {
abstract_sql.AbstractSqlStore
}
func (store *DamengStore) GetName() string {
return "dameng"
}
func (store *DamengStore) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
configuration.GetString(prefix+"dsn"),
configuration.GetString(prefix+"upsertQuery"),
configuration.GetBool(prefix+"enableUpsert"),
configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"),
configuration.GetString(prefix+"hostname"),
configuration.GetInt(prefix+"port"),
configuration.GetString(prefix+"database"),
configuration.GetInt(prefix+"connection_max_idle"),
configuration.GetInt(prefix+"connection_max_open"),
configuration.GetInt(prefix+"connection_max_lifetime_seconds"),
configuration.GetBool(prefix+"interpolateParams"),
)
}
func (store *DamengStore) initialize(dsn string, upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database string, maxIdle, maxOpen,
maxLifetimeSeconds int, interpolateParams bool) (err error) {
store.SupportBucketTable = false
if !enableUpsert {
upsertQuery = ""
}
store.SqlGenerator = &SqlGenDameng{
CreateTableSqlTemplate: "",
DropTableSqlTemplate: "DROP TABLE `%s`",
UpsertQueryTemplate: upsertQuery,
}
dsn = fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
var dbErr error
store.DB, dbErr = sql.Open("dm", dsn)
if dbErr != nil {
store.DB.Close()
store.DB = nil
return fmt.Errorf("can not connect to %s error:%v", strings.ReplaceAll(dsn, "", "<ADAPTED>"), err)
}
store.DB.SetMaxIdleConns(maxIdle)
store.DB.SetMaxOpenConns(maxOpen)
store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second)
if err = store.DB.Ping(); err != nil {
return fmt.Errorf("connect to %s error:%v", strings.ReplaceAll(dsn, "", "<ADAPTED>"), err)
}
return nil
}
func (store *DamengStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *DamengStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
db, bucket, shortPath, err := store.GetTxOrDB(ctx, dirPath, true)
if err != nil {
return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
}
sqlText := store.GetSqlListExclusive(bucket)
if includeStartFile {
sqlText = store.GetSqlListInclusive(bucket)
}
rows, err := db.QueryContext(ctx, sqlText, util.HashStringToLong(string(shortPath)), string(shortPath), startFileName, string(shortPath), limit)
if err != nil {
return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
defer rows.Close()
for rows.Next() {
var name string
var data []byte
if err = rows.Scan(&name, &data); err != nil {
glog.V(0).Infof("scan %s : %v", dirPath, err)
return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
}
lastFileName = name
entry := &filer.Entry{
FullPath: util.NewFullPath(string(dirPath), name),
}
if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
}
if !eachEntryFunc(entry) {
break
}
}
return lastFileName, nil
}

23
weed/filer/dameng/dameng_store_test.go

@ -0,0 +1,23 @@
package dameng
import (
"github.com/seaweedfs/seaweedfs/weed/filer/store_test"
"testing"
)
func TestStore(t *testing.T) {
// run "make test_ydb" under docker folder.
// to set up local env
store := &DamengStore{}
store.initialize("localhost", `MERGE INTO %s AS target
USING (SELECT ? AS dirhash, ? AS name, ? AS directory, ? AS meta FROM dual) AS source
ON (target.dirhash = source.dirhash AND target.name = source.name)
WHEN MATCHED THEN
UPDATE SET target.meta = source.meta
WHEN NOT MATCHED THEN
INSERT (dirhash, name, directory, meta)
VALUES (source.dirhash, source.name, source.directory, source.meta);`,
true, "SYSDBA", "SYSDBA001", "localhost", 5236,
"seaweedfs", 100, 2, 10, false)
store_test.TestFilerStore(t, store)
}

2
weed/filer/store_test/test_suite.go

@ -17,7 +17,7 @@ func TestFilerStore(t *testing.T, store filer.FilerStore) {
store.InsertEntry(ctx, makeEntry(util.FullPath("/a"), true))
store.InsertEntry(ctx, makeEntry(util.FullPath("/a/b"), true))
store.InsertEntry(ctx, makeEntry(util.FullPath("/a/b/c"), true))
for i := 0; i < 2000; i++ {
for i := 1; i <= 2000; i++ {
store.InsertEntry(ctx, makeEntry(util.FullPath(fmt.Sprintf("/a/b/c/f%05d", i)), false))
}

1
weed/server/filer_server.go

@ -25,6 +25,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer"
_ "github.com/seaweedfs/seaweedfs/weed/filer/arangodb"
_ "github.com/seaweedfs/seaweedfs/weed/filer/cassandra"
_ "github.com/seaweedfs/seaweedfs/weed/filer/dameng"
_ "github.com/seaweedfs/seaweedfs/weed/filer/elastic/v7"
_ "github.com/seaweedfs/seaweedfs/weed/filer/etcd"
_ "github.com/seaweedfs/seaweedfs/weed/filer/hbase"

Loading…
Cancel
Save