From 6a052f6ff222032ca9c089c97f1bb1b50a2e6ac3 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Sun, 1 May 2022 20:33:03 +0500 Subject: [PATCH] ydb Sql interface --- go.mod | 2 + go.sum | 6 ++ weed/filer/ydb/ydb_queries.go | 72 ++++++++++++++++ weed/filer/ydb/ydb_store.go | 156 ++++++++++++++++++++-------------- 4 files changed, 171 insertions(+), 65 deletions(-) create mode 100644 weed/filer/ydb/ydb_queries.go diff --git a/go.mod b/go.mod index bdebd73a8..5dbd0c6d7 100644 --- a/go.mod +++ b/go.mod @@ -203,6 +203,8 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.2.0 // indirect github.com/tinylib/msgp v1.1.6 // indirect + github.com/yandex-cloud/ydb-go-sdk/v2 v2.12.0 // indirect + github.com/ydb-platform/ydb-go-genproto v0.0.0-20220203104745-929cf9c248bc // indirect go.etcd.io/etcd/api/v3 v3.5.4 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.4 // indirect go.uber.org/atomic v1.9.0 // indirect diff --git a/go.sum b/go.sum index 96d98d9f0..4f0276f94 100644 --- a/go.sum +++ b/go.sum @@ -375,6 +375,7 @@ github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRx github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= @@ -936,6 +937,11 @@ github.com/xdg-go/scram v1.1.0 h1:d70R37I0HrDLsafRrMBXyrD4lmQbCHE873t00Vr0gm0= github.com/xdg-go/scram v1.1.0/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= github.com/xdg-go/stringprep v1.0.2 h1:6iq84/ryjjeRmMJwxutI51F2GIPlP5BfTvXHeYjyhBc= github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= +github.com/yandex-cloud/go-genproto v0.0.0-20210809082946-a97da516c588/go.mod h1:HEUYX/p8966tMUHHT+TsS0hF/Ca/NYwqprC5WXSDMfE= +github.com/yandex-cloud/ydb-go-sdk/v2 v2.12.0 h1:TjMLCV3Poata+0Nw+Oa/ztEFagFrochAkGg91PG49jk= +github.com/yandex-cloud/ydb-go-sdk/v2 v2.12.0/go.mod h1:L09Rymwmcpp9QMCbfGAlMtJ4kLFFsbGjkP3qv58r2yY= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20220203104745-929cf9c248bc h1:xvTP0fhYNm+Ws+xC34jzF9EdorPUKkucJr0TyybqVSk= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20220203104745-929cf9c248bc/go.mod h1:cc138nptTn9eKptCQl/grxP6pBKpo/bnXDiOxuVZtps= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/weed/filer/ydb/ydb_queries.go b/weed/filer/ydb/ydb_queries.go new file mode 100644 index 000000000..57b282a7a --- /dev/null +++ b/weed/filer/ydb/ydb_queries.go @@ -0,0 +1,72 @@ +package ydb + +const ( + createQuery = ` + PRAGMA TablePathPrefix("%s"); + CREATE TABLE file_meta ( + dir_hash int64, + name Utf8, + directory Utf8, + meta String, + PRIMARY KEY (dir_hash, name) + );` + + insertQuery = ` + DECLARE $dir_hash int64; + DECLARE $name AS Utf8; + DECLARE $directory AS Utf8; + DECLARE $meta AS String; + + UPSERT INTO file_meta + (dir_hash, name, directory, meta) + VALUES + ($dir_hash, $name, $directory, $meta);` + + updateQuery = ` + DECLARE $dir_hash int64; + DECLARE $name AS Utf8; + DECLARE $directory AS Utf8; + DECLARE $meta AS String; + + REPLACE INTO file_meta + (dir_hash, name, directory, meta) + VALUES + ($dir_hash, $name, $directory, $meta) + COMMIT;` + + deleteQuery = ` + DECLARE $dir_hash int64; + DECLARE $name AS Utf8; + + DELETE FROM file_meta + WHERE dir_hash == $dir_hash AND name == $name; + COMMIT;` + + findQuery = ` + DECLARE $dir_hash int64; + DECLARE $name AS Utf8; + + SELECT meta + FROM file_meta + WHERE dir_hash == $dir_hash AND name == $name;` + + deleteFolderChildrenQuery = ` + DECLARE $dir_hash int64; + DECLARE $directory AS Utf8; + + DELETE FROM file_meta + WHERE dir_hash == $dir_hash AND directory == $directory; + COMMIT;` + + ListDirectoryQuery = ` + DECLARE $dir_hash int64; + DECLARE $directory AS Utf8; + DECLARE $start_name AS Utf8; + DECLARE $prefix AS Utf8; + DECLARE $limit AS int64; + + SELECT name, meta + FROM file_meta + WHERE dir_hash == $dir_hash AND directory == $directory and name %v $start_name and name LIKE '$prefix%' + ORDER BY name ASC LIMIT $limit;` +) diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go index 7278c6301..48017bf6f 100644 --- a/weed/filer/ydb/ydb_store.go +++ b/weed/filer/ydb/ydb_store.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/yandex-cloud/ydb-go-sdk/v2" "github.com/yandex-cloud/ydb-go-sdk/v2/connect" "github.com/yandex-cloud/ydb-go-sdk/v2/table" + "path" "strings" "time" ) @@ -24,53 +26,6 @@ var ( ) ) -const ( - createQuery = ` - PRAGMA TablePathPrefix("%s"); - CREATE TABLE file_meta ( - dir_hash int64, - name Utf8, - directory Utf8, - meta String, - PRIMARY KEY (dir_hash, name) - );` - insertQuery = ` - DECLARE $dir_hash int64; - DECLARE $name AS Utf8; - DECLARE $directory AS Utf8; - DECLARE $meta AS String; - - UPSERT INTO file_meta - (dir_hash, name, directory, meta) - VALUES - ($dir_hash, $name, $directory, $meta);` - updateQuery = ` - DECLARE $dir_hash int64; - DECLARE $name AS Utf8; - DECLARE $directory AS Utf8; - DECLARE $meta AS String; - - REPLACE INTO file_meta - (dir_hash, name, directory, meta) - VALUES - ($dir_hash, $name, $directory, $meta) - COMMIT;` - deleteQuery = ` - DECLARE $dir_hash int64; - DECLARE $name AS Utf8; - - DELETE FROM file_meta - WHERE dir_hash == $dir_hash AND name == $name; - COMMIT;` - findQuery = ` - DECLARE $dir_hash int64; - DECLARE $name AS Utf8; - - SELECT meta - FROM file_meta - WHERE dir_hash == $dir_hash AND name == $name;` -) - type YdbStore struct { SupportBucketTable bool DB *connect.Connection @@ -148,7 +103,7 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e if err != nil { return err } - _, res, err = stmt.Execute(ctx, rwTX, table.NewQueryParameters( + _, res, err = stmt.Execute(ctx, roTX, table.NewQueryParameters( table.ValueParam("$dir_hash", ydb.Int64Value(util.HashStringToLong(dir))), table.ValueParam("$name", ydb.UTF8Value(name)))) return err @@ -157,6 +112,8 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e if err != nil { return nil, err } + defer res.Close() + for res.NextSet() { for res.NextRow() { res.SeekItem("meta") @@ -188,32 +145,105 @@ func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) } func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { - return nil + dir, _ := fullpath.DirAndName() + return table.Retry(ctx, store.DB.Table().Pool(), + table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) { + stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), deleteFolderChildrenQuery)) + if err != nil { + return err + } + _, _, err = stmt.Execute(ctx, rwTX, table.NewQueryParameters( + table.ValueParam("$dir_hash", ydb.Int64Value(util.HashStringToLong(dir))), + table.ValueParam("$directory", ydb.UTF8Value(dir)))) + return err + }), + ) } func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - //TODO implement me - panic("implement me") + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil) } func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - //TODO implement me - panic("implement me") + dir := string(dirPath) + var res *table.Result + startFileCompOp := ">" + if includeStartFile { + startFileCompOp = ">=" + } + err = table.Retry(ctx, store.DB.Table().Pool(), + table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) { + stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), fmt.Sprintf(ListDirectoryQuery, startFileCompOp))) + if err != nil { + return err + } + _, res, err = stmt.Execute(ctx, roTX, table.NewQueryParameters( + table.ValueParam("$dir_hash", ydb.Int64Value(util.HashStringToLong(dir))), + table.ValueParam("$directory", ydb.UTF8Value(dir)), + table.ValueParam("$start_name", ydb.UTF8Value(startFileName)), + table.ValueParam("$prefix", ydb.UTF8Value(prefix)), + table.ValueParam("$limit", ydb.Int64Value(limit)), + )) + return err + }), + ) + if err != nil { + return lastFileName, err + } + defer res.Close() + + for res.NextSet() { + for res.NextRow() { + res.SeekItem("name") + name := res.UTF8() + res.SeekItem("meta") + data := res.String() + if res.Err() != nil { + glog.V(0).Infof("scan %s : %v", dirPath, err) + return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err) + } + lastFileName = name + + entry := &filer.Entry{ + FullPath: util.NewFullPath(dir, name), + } + if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { + glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err) + return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) + } + + if !eachEntryFunc(entry) { + break + } + } + } + return lastFileName, nil } func (store *YdbStore) BeginTransaction(ctx context.Context) (context.Context, error) { - //TODO implement me - panic("implement me") + session, err := store.DB.Table().Pool().Create(ctx) + if err != nil { + return ctx, err + } + tx, err := session.BeginTransaction(ctx, table.TxSettings(table.WithSerializableReadWrite())) + if err != nil { + return ctx, err + } + return context.WithValue(ctx, "tx", tx), nil } func (store *YdbStore) CommitTransaction(ctx context.Context) error { - //TODO implement me - panic("implement me") + if tx, ok := ctx.Value("tx").(*table.Transaction); ok { + return tx.Commit(ctx) + } + return nil } func (store *YdbStore) RollbackTransaction(ctx context.Context) error { - //TODO implement me - panic("implement me") + if tx, ok := ctx.Value("tx").(*table.Transaction); ok { + return tx.Rollback(ctx) + } + return nil } func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { @@ -232,8 +262,7 @@ func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) { } func (store *YdbStore) Shutdown() { - //TODO implement me - panic("implement me") + store.DB.Close() } func (store *YdbStore) getPrefix(dir string) string { @@ -249,8 +278,5 @@ func (store *YdbStore) getPrefix(dir string) string { } func (store *YdbStore) withPragma(prefix, query string) string { - if store.tablePathPrefix != "" && store.tablePathPrefix != "/" { - prefix = store.tablePathPrefix + "/" + prefix - } - return `PRAGMA TablePathPrefix("` + prefix + `");` + query + return `PRAGMA TablePathPrefix("` + path.Join(store.tablePathPrefix, prefix) + `");` + query }