From 0dc44dda63515d690161f3f60504733489189c66 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 2 May 2022 15:33:29 +0500 Subject: [PATCH] ydb do Tx or DB --- weed/filer/ydb/ydb_store.go | 114 ++++++++++++++++++------------------ weed/filer/ydb/ydb_types.go | 2 +- 2 files changed, 59 insertions(+), 57 deletions(-) diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go index 9f136703b..8271c6e18 100644 --- a/weed/filer/ydb/ydb_store.go +++ b/weed/filer/ydb/ydb_store.go @@ -12,6 +12,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/sugar" "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/result" "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" "github.com/ydb-platform/ydb-go-sdk/v3/table/types" "os" @@ -111,6 +112,34 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix return nil } +func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *table.QueryParameters, tc *table.TransactionControl, processResultFunc func(res result.Result) error) (err error) { + var res result.Result + if tx, ok := ctx.Value("tx").(table.Transaction); ok { + res, err = tx.Execute(ctx, *query, params) + if err != nil { + return fmt.Errorf("execute transaction: %v", err) + } + } else { + err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { + stmt, err := s.Prepare(ctx, *query) + if err != nil { + return fmt.Errorf("prepare: %v", err) + } + _, res, err = stmt.Execute(ctx, tc, params) + if err != nil { + return fmt.Errorf("execute statement: %v", err) + } + return nil + }) + } + if err != nil && processResultFunc != nil && res != nil { + if err = processResultFunc(res); err != nil { + return fmt.Errorf("process resul: %v", err) + } + } + return err +} + func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Entry, query string) (err error) { dir, name := entry.FullPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() @@ -121,16 +150,9 @@ func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Ent if len(entry.Chunks) > filer.CountEntryChunksForGzip { meta = util.MaybeGzipData(meta) } - + queryWithPragma := withPragma(store.getPrefix(ctx, dir), query) fileMeta := FileMeta{util.HashStringToLong(dir), name, dir, meta} - return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dir), query)) - if err != nil { - return fmt.Errorf("Prepare %s : %v", dir, err) - } - _, _, err = stmt.Execute(ctx, rwTX, fileMeta.queryParameters()) - return err - }) + return store.doTxOrDB(ctx, &queryWithPragma, fileMeta.queryParameters(), rwTX, nil) } func (store *YdbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { @@ -145,17 +167,12 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e dir, name := fullpath.DirAndName() var data []byte entryFound := false - err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dir), findQuery)) - if err != nil { - return fmt.Errorf("Prepare %s : %v", entry.FullPath, err) - } - _, res, err := stmt.Execute(ctx, roTX, table.NewQueryParameters( - table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(dir))), - table.ValueParam("$name", types.UTF8Value(name)))) - if err != nil { - return fmt.Errorf("Execute %s : %v", entry.FullPath, err) - } + queryWithPragma := withPragma(store.getPrefix(ctx, dir), findQuery) + queryParams := table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(dir))), + table.ValueParam("$name", types.UTF8Value(name))) + + err = store.doTxOrDB(ctx, &queryWithPragma, queryParams, roTX, func(res result.Result) error { defer func() { _ = res.Close() }() @@ -184,30 +201,22 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { dir, name := fullpath.DirAndName() - return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dir), deleteQuery)) - if err != nil { - return fmt.Errorf("Prepare %s : %v", dir, err) - } - _, _, err = stmt.Execute(ctx, rwTX, table.NewQueryParameters( - table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(dir))), - table.ValueParam("$name", types.UTF8Value(name)))) - return err - }) + queryWithPragma := withPragma(store.getPrefix(ctx, dir), deleteQuery) + queryParams := table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(dir))), + table.ValueParam("$name", types.UTF8Value(name))) + + return store.doTxOrDB(ctx, &queryWithPragma, queryParams, rwTX, nil) } func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { dir, _ := fullpath.DirAndName() - return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dir), deleteFolderChildrenQuery)) - if err != nil { - return fmt.Errorf("Prepare %s : %v", dir, err) - } - _, _, err = stmt.Execute(ctx, rwTX, table.NewQueryParameters( - table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(dir))), - table.ValueParam("$directory", types.UTF8Value(dir)))) - return err - }) + queryWithPragma := withPragma(store.getPrefix(ctx, dir), deleteFolderChildrenQuery) + queryParams := table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(dir))), + table.ValueParam("$directory", types.UTF8Value(dir))) + + return store.doTxOrDB(ctx, &queryWithPragma, queryParams, rwTX, nil) } func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { @@ -220,21 +229,15 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath if includeStartFile { startFileCompOp = ">=" } - err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dir), fmt.Sprintf(listDirectoryQuery, startFileCompOp))) - if err != nil { - return fmt.Errorf("Prepare %s : %v", dir, err) - } - _, res, err := stmt.Execute(ctx, roTX, table.NewQueryParameters( - table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(dir))), - table.ValueParam("$directory", types.UTF8Value(dir)), - table.ValueParam("$start_name", types.UTF8Value(startFileName)), - table.ValueParam("$prefix", types.UTF8Value(prefix)), - table.ValueParam("$limit", types.Int64Value(limit)), - )) - if err != nil { - return fmt.Errorf("Execute %s : %v", dir, err) - } + queryWithPragma := withPragma(store.getPrefix(ctx, dir), fmt.Sprintf(listDirectoryQuery, startFileCompOp)) + queryParams := table.NewQueryParameters( + table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(dir))), + table.ValueParam("$directory", types.UTF8Value(dir)), + table.ValueParam("$start_name", types.UTF8Value(startFileName)), + table.ValueParam("$prefix", types.UTF8Value(prefix)), + table.ValueParam("$limit", types.Int64Value(limit)), + ) + err = store.doTxOrDB(ctx, &queryWithPragma, queryParams, roTX, func(res result.Result) error { defer func() { _ = res.Close() }() @@ -252,7 +255,6 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath FullPath: util.NewFullPath(dir, name), } if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { - glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err) return fmt.Errorf("scan decode %s : %v", entry.FullPath, err) } if !eachEntryFunc(entry) { diff --git a/weed/filer/ydb/ydb_types.go b/weed/filer/ydb/ydb_types.go index 3c381797a..8c24c4738 100644 --- a/weed/filer/ydb/ydb_types.go +++ b/weed/filer/ydb/ydb_types.go @@ -36,6 +36,6 @@ func createTableOptions() []options.CreateTableOption { options.WithPrimaryKeyColumn("dir_hash", "name"), } } -func withPragma(prefix, query string) string { +func withPragma(prefix string, query string) string { return `PRAGMA TablePathPrefix("` + prefix + `");` + query }