From ec0ed41e375d99ffc7d6a4290e92470c7eabc8e7 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Sun, 1 May 2022 21:20:37 +0500 Subject: [PATCH] ydb kv interface --- .../abstract_sql/abstract_sql_store_kv.go | 8 +-- weed/filer/ydb/ydb_store.go | 15 ---- weed/filer/ydb/ydb_store_kv.go | 72 +++++++++++++++++++ 3 files changed, 76 insertions(+), 19 deletions(-) create mode 100644 weed/filer/ydb/ydb_store_kv.go diff --git a/weed/filer/abstract_sql/abstract_sql_store_kv.go b/weed/filer/abstract_sql/abstract_sql_store_kv.go index 03b016c76..aaf1c196c 100644 --- a/weed/filer/abstract_sql/abstract_sql_store_kv.go +++ b/weed/filer/abstract_sql/abstract_sql_store_kv.go @@ -18,7 +18,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by return fmt.Errorf("findDB: %v", err) } - dirStr, dirHash, name := genDirAndName(key) + dirStr, dirHash, name := GenDirAndName(key) res, err := db.ExecContext(ctx, store.GetSqlInsert(DEFAULT_TABLE), dirHash, name, dirStr, value) if err == nil { @@ -53,7 +53,7 @@ func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []b return nil, fmt.Errorf("findDB: %v", err) } - dirStr, dirHash, name := genDirAndName(key) + dirStr, dirHash, name := GenDirAndName(key) row := db.QueryRowContext(ctx, store.GetSqlFind(DEFAULT_TABLE), dirHash, name, dirStr) err = row.Scan(&value) @@ -76,7 +76,7 @@ func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err er return fmt.Errorf("findDB: %v", err) } - dirStr, dirHash, name := genDirAndName(key) + dirStr, dirHash, name := GenDirAndName(key) res, err := db.ExecContext(ctx, store.GetSqlDelete(DEFAULT_TABLE), dirHash, name, dirStr) if err != nil { @@ -92,7 +92,7 @@ func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err er } -func genDirAndName(key []byte) (dirStr string, dirHash int64, name string) { +func GenDirAndName(key []byte) (dirStr string, dirHash int64, name string) { for len(key) < 8 { key = append(key, 0) } diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go index 48017bf6f..aedc11ec5 100644 --- a/weed/filer/ydb/ydb_store.go +++ b/weed/filer/ydb/ydb_store.go @@ -246,21 +246,6 @@ func (store *YdbStore) RollbackTransaction(ctx context.Context) error { return nil } -func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { - //TODO implement me - panic("implement me") -} - -func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { - //TODO implement me - panic("implement me") -} - -func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) { - //TODO implement me - panic("implement me") -} - func (store *YdbStore) Shutdown() { store.DB.Close() } diff --git a/weed/filer/ydb/ydb_store_kv.go b/weed/filer/ydb/ydb_store_kv.go new file mode 100644 index 000000000..3473d756d --- /dev/null +++ b/weed/filer/ydb/ydb_store_kv.go @@ -0,0 +1,72 @@ +package ydb + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" + "github.com/yandex-cloud/ydb-go-sdk/v2" + "github.com/yandex-cloud/ydb-go-sdk/v2/table" +) + +func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + dirStr, dirHash, name := abstract_sql.GenDirAndName(key) + fileMeta := FileMeta{dirHash, name, dirStr, value} + return table.Retry(ctx, store.DB.Table().Pool(), + table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) { + stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dirStr), insertQuery)) + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + _, _, err = stmt.Execute(ctx, rwTX, fileMeta.QueryParameters()) + return fmt.Errorf("kv put: %v", err) + }), + ) +} + +func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + dirStr, dirHash, name := abstract_sql.GenDirAndName(key) + var res *table.Result + err = table.Retry(ctx, store.DB.Table().Pool(), + table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) { + stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dirStr), findQuery)) + if err != nil { + return err + } + _, res, err = stmt.Execute(ctx, roTX, table.NewQueryParameters( + table.ValueParam("$dir_hash", ydb.Int64Value(dirHash)), + table.ValueParam("$name", ydb.UTF8Value(name)))) + return err + }), + ) + if err != nil { + return nil, fmt.Errorf("kv get: %v", err) + } + defer res.Close() + + for res.NextResultSet(ctx) { + for res.NextRow() { + if err = res.Scan(&value); err != nil { + return nil, fmt.Errorf("kv get: %v", err) + } + return + } + } + return nil, filer.ErrKvNotFound +} + +func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) { + dirStr, dirHash, name := abstract_sql.GenDirAndName(key) + return table.Retry(ctx, store.DB.Table().Pool(), + table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) { + stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dirStr), deleteQuery)) + if err != nil { + return fmt.Errorf("kv delete: %s", err) + } + _, _, err = stmt.Execute(ctx, rwTX, table.NewQueryParameters( + table.ValueParam("$dir_hash", ydb.Int64Value(dirHash)), + table.ValueParam("$name", ydb.UTF8Value(name)))) + return fmt.Errorf("kv delete: %s", err) + }), + ) +}