Browse Source

enable query cache policy instead of prepare

pull/2996/head
Konstantin Lebedev 3 years ago
parent
commit
cb3c7a3cdb
  1. 9
      weed/filer/ydb/ydb_queries.go
  2. 10
      weed/filer/ydb/ydb_store.go
  3. 33
      weed/filer/ydb/ydb_store_kv.go

9
weed/filer/ydb/ydb_queries.go

@ -23,16 +23,14 @@ const (
REPLACE INTO ` + asql.DEFAULT_TABLE + `
(dir_hash, name, directory, meta)
VALUES
($dir_hash, $name, $directory, $meta)
COMMIT;`
($dir_hash, $name, $directory, $meta);`
deleteQuery = `
DECLARE $dir_hash AS int64;
DECLARE $name AS Utf8;
DELETE FROM ` + asql.DEFAULT_TABLE + `
WHERE dir_hash = $dir_hash AND name = $name;
COMMIT;`
WHERE dir_hash = $dir_hash AND name = $name;`
findQuery = `
DECLARE $dir_hash AS int64;
@ -47,8 +45,7 @@ const (
DECLARE $directory AS Utf8;
DELETE FROM ` + asql.DEFAULT_TABLE + `
WHERE dir_hash = $dir_hash AND directory = $directory;
COMMIT;`
WHERE dir_hash = $dir_hash AND directory = $directory;`
listDirectoryQuery = `
DECLARE $dir_hash AS int64;

10
weed/filer/ydb/ydb_store.go

@ -12,6 +12,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/sugar"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
@ -104,17 +105,14 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix
func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *table.QueryParameters, tc *table.TransactionControl, processResultFunc func(res result.Result) error) (err error) {
var res result.Result
if tx, ok := ctx.Value("tx").(table.Transaction); ok {
res, err = tx.Execute(ctx, *query, params)
res, err = tx.Execute(ctx, *query, params, options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache()))
if err != nil {
return fmt.Errorf("execute transaction: %v", err)
}
} else {
err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) {
stmt, err := s.Prepare(ctx, *query)
if err != nil {
return fmt.Errorf("prepare: %v", err)
}
_, res, err = stmt.Execute(ctx, tc, params)
_, res, err = s.Execute(ctx, tc, *query,
params, options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache()))
if err != nil {
return fmt.Errorf("execute statement: %v", err)
}

33
weed/filer/ydb/ydb_store_kv.go

@ -3,6 +3,7 @@ package ydb
import (
"context"
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
@ -16,11 +17,9 @@ func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err
dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
fileMeta := FileMeta{dirHash, name, dirStr, value}
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) {
stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dirStr), insertQuery))
if err != nil {
return fmt.Errorf("kv put prepare %s: %v", util.NewFullPath(dirStr, name).Name(), err)
}
_, _, err = stmt.Execute(ctx, rwTX, fileMeta.queryParameters())
_, _, err = s.Execute(ctx, rwTX, withPragma(store.getPrefix(ctx, dirStr), insertQuery),
fileMeta.queryParameters(),
options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache()))
if err != nil {
return fmt.Errorf("kv put execute %s: %v", util.NewFullPath(dirStr, name).Name(), err)
}
@ -32,13 +31,11 @@ func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err
dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
valueFound := false
err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dirStr), findQuery))
if err != nil {
return fmt.Errorf("kv get prepare %s: %v", util.NewFullPath(dirStr, name), err)
}
_, res, err := stmt.Execute(ctx, roTX, table.NewQueryParameters(
table.ValueParam("$dir_hash", types.Int64Value(dirHash)),
table.ValueParam("$name", types.UTF8Value(name))))
_, res, err := s.Execute(ctx, roTX, withPragma(store.getPrefix(ctx, dirStr), findQuery),
table.NewQueryParameters(
table.ValueParam("$dir_hash", types.Int64Value(dirHash)),
table.ValueParam("$name", types.UTF8Value(name))),
options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache()))
if err != nil {
return fmt.Errorf("kv get execute %s: %v", util.NewFullPath(dirStr, name).Name(), err)
}
@ -65,13 +62,11 @@ func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err
func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) {
dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) {
stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dirStr), insertQuery))
if err != nil {
return fmt.Errorf("Prepare %s: %v", util.NewFullPath(dirStr, name).Name(), err)
}
_, _, err = stmt.Execute(ctx, rwTX, table.NewQueryParameters(
table.ValueParam("$dir_hash", types.Int64Value(dirHash)),
table.ValueParam("$name", types.UTF8Value(name))))
_, _, err = s.Execute(ctx, rwTX, withPragma(store.getPrefix(ctx, dirStr), insertQuery),
table.NewQueryParameters(
table.ValueParam("$dir_hash", types.Int64Value(dirHash)),
table.ValueParam("$name", types.UTF8Value(name))),
options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache()))
if err != nil {
return fmt.Errorf("kv delete %s: %v", util.NewFullPath(dirStr, name).Name(), err)
}

Loading…
Cancel
Save