From abd51028199d250906004e3feacdbdc2c2bb66fa Mon Sep 17 00:00:00 2001 From: SmoothDenis Date: Fri, 20 Jun 2025 02:16:01 +0500 Subject: [PATCH] ydb filer improvements (#6890) * fix(ydb): table creation with params * fix(ydb): migrate to new query client & prevent creation table-bucket on get request * fix(ydb): use new query client with kv req * fix(ydb): use directory in every query * fix(ydb): del unused import * fix(ydb): tests & default const usage --- weed/filer/ydb/ydb_queries.go | 6 +- weed/filer/ydb/ydb_store.go | 278 ++++++++++++++++++------------- weed/filer/ydb/ydb_store_kv.go | 57 ++++--- weed/filer/ydb/ydb_store_test.go | 3 +- weed/filer/ydb/ydb_types.go | 23 ++- 5 files changed, 216 insertions(+), 151 deletions(-) diff --git a/weed/filer/ydb/ydb_queries.go b/weed/filer/ydb/ydb_queries.go index 73a8cfeba..baafc59a1 100644 --- a/weed/filer/ydb/ydb_queries.go +++ b/weed/filer/ydb/ydb_queries.go @@ -22,19 +22,21 @@ const ( deleteQuery = ` PRAGMA TablePathPrefix("%v"); DECLARE $dir_hash AS int64; + DECLARE $directory AS Utf8; DECLARE $name AS Utf8; DELETE FROM ` + asql.DEFAULT_TABLE + ` - WHERE dir_hash = $dir_hash AND name = $name;` + WHERE dir_hash = $dir_hash AND directory = $directory AND name = $name;` findQuery = ` PRAGMA TablePathPrefix("%v"); DECLARE $dir_hash AS int64; + DECLARE $directory AS Utf8; DECLARE $name AS Utf8; SELECT meta FROM ` + asql.DEFAULT_TABLE + ` - WHERE dir_hash = $dir_hash AND name = $name;` + WHERE dir_hash = $dir_hash AND directory = $directory AND name = $name;` deleteFolderChildrenQuery = ` PRAGMA TablePathPrefix("%v"); diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go index a9ad6666e..1af40b6ae 100644 --- a/weed/filer/ydb/ydb_store.go +++ b/weed/filer/ydb/ydb_store.go @@ -6,6 +6,8 @@ package ydb import ( "context" "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3/query" + "github.com/ydb-platform/ydb-go-sdk/v3/table/options" "os" "path" "strings" @@ -20,30 +22,37 @@ import ( environ "github.com/ydb-platform/ydb-go-sdk-auth-environ" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/table" - "github.com/ydb-platform/ydb-go-sdk/v3/table/result" - "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" "github.com/ydb-platform/ydb-go-sdk/v3/table/types" ) const ( - defaultDialTimeOut = 10 + defaultDialTimeOut = 10 + defaultPartitionBySizeEnabled = true + defaultPartitionSizeMb = 200 + defaultPartitionByLoadEnabled = true + defaultMinPartitionsCount = 5 + defaultMaxPartitionsCount = 1000 + defaultMaxListChunk = 2000 ) var ( - roTX = table.TxControl( - table.BeginTx(table.WithOnlineReadOnly()), - table.CommitTx(), - ) - rwTX = table.DefaultTxControl() + roQC = query.WithTxControl(query.OnlineReadOnlyTxControl()) + rwQC = query.WithTxControl(query.DefaultTxControl()) ) type YdbStore struct { - DB ydb.Connection - dirBuckets string - tablePathPrefix string - SupportBucketTable bool - dbs map[string]bool - dbsLock sync.Mutex + DB *ydb.Driver + dirBuckets string + tablePathPrefix string + SupportBucketTable bool + partitionBySizeEnabled options.FeatureFlag + partitionSizeMb uint64 + partitionByLoadEnabled options.FeatureFlag + minPartitionsCount uint64 + maxPartitionsCount uint64 + maxListChunk int + dbs map[string]bool + dbsLock sync.Mutex } func init() { @@ -55,6 +64,12 @@ func (store *YdbStore) GetName() string { } func (store *YdbStore) Initialize(configuration util.Configuration, prefix string) (err error) { + configuration.SetDefault(prefix+"partitionBySizeEnabled", defaultPartitionBySizeEnabled) + configuration.SetDefault(prefix+"partitionSizeMb", defaultPartitionSizeMb) + configuration.SetDefault(prefix+"partitionByLoadEnabled", defaultPartitionByLoadEnabled) + configuration.SetDefault(prefix+"minPartitionsCount", defaultMinPartitionsCount) + configuration.SetDefault(prefix+"maxPartitionsCount", defaultMaxPartitionsCount) + configuration.SetDefault(prefix+"maxListChunk", defaultMaxListChunk) return store.initialize( configuration.GetString("filer.options.buckets_folder"), configuration.GetString(prefix+"dsn"), @@ -62,18 +77,37 @@ func (store *YdbStore) Initialize(configuration util.Configuration, prefix strin configuration.GetBool(prefix+"useBucketPrefix"), configuration.GetInt(prefix+"dialTimeOut"), configuration.GetInt(prefix+"poolSizeLimit"), + configuration.GetBool(prefix+"partitionBySizeEnabled"), + uint64(configuration.GetInt(prefix+"partitionSizeMb")), + configuration.GetBool(prefix+"partitionByLoadEnabled"), + uint64(configuration.GetInt(prefix+"minPartitionsCount")), + uint64(configuration.GetInt(prefix+"maxPartitionsCount")), + configuration.GetInt(prefix+"maxListChunk"), ) } -func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix string, useBucketPrefix bool, dialTimeOut int, poolSizeLimit int) (err error) { +func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix string, useBucketPrefix bool, dialTimeOut int, poolSizeLimit int, partitionBySizeEnabled bool, partitionSizeMb uint64, partitionByLoadEnabled bool, minPartitionsCount uint64, maxPartitionsCount uint64, maxListChunk int) (err error) { store.dirBuckets = dirBuckets store.SupportBucketTable = useBucketPrefix + if partitionBySizeEnabled { + store.partitionBySizeEnabled = options.FeatureEnabled + } else { + store.partitionBySizeEnabled = options.FeatureDisabled + } + if partitionByLoadEnabled { + store.partitionByLoadEnabled = options.FeatureEnabled + } else { + store.partitionByLoadEnabled = options.FeatureDisabled + } + store.partitionSizeMb = partitionSizeMb + store.minPartitionsCount = minPartitionsCount + store.maxPartitionsCount = maxPartitionsCount + store.maxListChunk = maxListChunk if store.SupportBucketTable { glog.V(0).Infof("enabled BucketPrefix") } store.dbs = make(map[string]bool) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := context.Background() if dialTimeOut == 0 { dialTimeOut = defaultDialTimeOut } @@ -89,11 +123,7 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix } store.DB, err = ydb.Open(ctx, dsn, opts...) if err != nil { - if store.DB != nil { - _ = store.DB.Close(ctx) - store.DB = nil - } - return fmt.Errorf("can not connect to %s error: %v", dsn, err) + return fmt.Errorf("can not connect to %s: %w", dsn, err) } store.tablePathPrefix = path.Join(store.DB.Name(), tablePathPrefix) @@ -104,29 +134,27 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix return err } -func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *table.QueryParameters, tc *table.TransactionControl, processResultFunc func(res result.Result) error) (err error) { - var res result.Result - if tx, ok := ctx.Value("tx").(table.Transaction); ok { - res, err = tx.Execute(ctx, *query, params) +func (store *YdbStore) doTxOrDB(ctx context.Context, q *string, params *table.QueryParameters, ts query.ExecuteOption, processResultFunc func(res query.Result) error) (err error) { + var res query.Result + if tx, ok := ctx.Value("tx").(query.Transaction); ok { + res, err = tx.Query(ctx, *q, query.WithParameters(params)) if err != nil { return fmt.Errorf("execute transaction: %v", err) } } else { - err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - _, res, err = s.Execute(ctx, tc, *query, params) + err = store.DB.Query().Do(ctx, func(ctx context.Context, s query.Session) (err error) { + res, err = s.Query(ctx, *q, query.WithParameters(params), ts) if err != nil { return fmt.Errorf("execute statement: %v", err) } return nil - }, - table.WithIdempotent(), - ) + }, query.WithIdempotent()) } if err != nil { return err } if res != nil { - defer func() { _ = res.Close() }() + defer func() { _ = res.Close(ctx) }() if processResultFunc != nil { if err = processResultFunc(res); err != nil { return fmt.Errorf("process result: %v", err) @@ -148,7 +176,7 @@ func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Ent } tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) fileMeta := FileMeta{util.HashStringToLong(dir), name, *shortDir, meta} - return store.doTxOrDB(ctx, withPragma(tablePathPrefix, upsertQuery), fileMeta.queryParameters(entry.TtlSec), rwTX, nil) + return store.doTxOrDB(ctx, withPragma(tablePathPrefix, upsertQuery), fileMeta.queryParameters(entry.TtlSec), rwQC, nil) } func (store *YdbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { @@ -164,23 +192,29 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e var data []byte entryFound := false tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) - query := withPragma(tablePathPrefix, findQuery) + q := withPragma(tablePathPrefix, findQuery) queryParams := table.NewQueryParameters( table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), + table.ValueParam("$directory", types.UTF8Value(*shortDir)), table.ValueParam("$name", types.UTF8Value(name))) - err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error { - if !res.NextResultSet(ctx) || !res.HasNextRow() { - return nil - } - for res.NextRow() { - if err = res.ScanNamed(named.OptionalWithDefault("meta", &data)); err != nil { - return fmt.Errorf("scanNamed %s : %v", fullpath, err) + err = store.doTxOrDB(ctx, q, queryParams, roQC, func(res query.Result) error { + for rs, err := range res.ResultSets(ctx) { + if err != nil { + return err + } + for row, err := range rs.Rows(ctx) { + if err != nil { + return err + } + if scanErr := row.Scan(&data); scanErr != nil { + return fmt.Errorf("scan %s: %v", fullpath, scanErr) + } + entryFound = true + return nil } - entryFound = true - return nil } - return res.Err() + return nil }) if err != nil { return nil, err @@ -189,37 +223,35 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e return nil, filer_pb.ErrNotFound } - entry = &filer.Entry{ - FullPath: fullpath, + entry = &filer.Entry{FullPath: fullpath} + if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil { + return nil, fmt.Errorf("decode %s: %v", fullpath, decodeErr) } - if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { - return nil, fmt.Errorf("decode %s : %v", fullpath, err) - } - return entry, nil } func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { dir, name := fullpath.DirAndName() tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) - query := withPragma(tablePathPrefix, deleteQuery) + q := withPragma(tablePathPrefix, deleteQuery) glog.V(4).Infof("DeleteEntry %s, tablePathPrefix %s, shortDir %s", fullpath, *tablePathPrefix, *shortDir) queryParams := table.NewQueryParameters( table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), + table.ValueParam("$directory", types.UTF8Value(*shortDir)), table.ValueParam("$name", types.UTF8Value(name))) - return store.doTxOrDB(ctx, query, queryParams, rwTX, nil) + return store.doTxOrDB(ctx, q, queryParams, rwQC, nil) } func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { dir := string(fullpath) tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) - query := withPragma(tablePathPrefix, deleteFolderChildrenQuery) + q := withPragma(tablePathPrefix, deleteFolderChildrenQuery) queryParams := table.NewQueryParameters( table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), table.ValueParam("$directory", types.UTF8Value(*shortDir))) - return store.doTxOrDB(ctx, query, queryParams, rwTX, nil) + return store.doTxOrDB(ctx, q, queryParams, rwQC, nil) } func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { @@ -229,71 +261,79 @@ func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.Fu func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { dir := string(dirPath) tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) - var query *string - if includeStartFile { - query = withPragma(tablePathPrefix, listInclusiveDirectoryQuery) - } else { - query = withPragma(tablePathPrefix, listDirectoryQuery) - } - truncated := true - eachEntryFuncIsNotBreake := true - entryCount := int64(0) - for truncated && eachEntryFuncIsNotBreake { - if lastFileName != "" { - startFileName = lastFileName - if includeStartFile { - query = withPragma(tablePathPrefix, listDirectoryQuery) - } + baseInclusive := withPragma(tablePathPrefix, listInclusiveDirectoryQuery) + baseExclusive := withPragma(tablePathPrefix, listDirectoryQuery) + var entryCount int64 + var prevFetchedLessThanChunk bool + for entryCount < limit { + if prevFetchedLessThanChunk { + break + } + var q *string + if entryCount == 0 && includeStartFile { + q = baseInclusive + } else { + q = baseExclusive } - restLimit := limit - entryCount - const maxChunk = int64(1000) - chunkLimit := restLimit - if chunkLimit > maxChunk { - chunkLimit = maxChunk + rest := limit - entryCount + chunkLimit := rest + if chunkLimit > int64(store.maxListChunk) { + chunkLimit = int64(store.maxListChunk) } - glog.V(4).Infof("startFileName %s, restLimit %d, chunkLimit %d", startFileName, restLimit, chunkLimit) + var rowCount int64 - queryParams := table.NewQueryParameters( + params := table.NewQueryParameters( table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), table.ValueParam("$directory", types.UTF8Value(*shortDir)), table.ValueParam("$start_name", types.UTF8Value(startFileName)), table.ValueParam("$prefix", types.UTF8Value(prefix+"%")), table.ValueParam("$limit", types.Uint64Value(uint64(chunkLimit))), ) - err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error { - var name string - var data []byte - if !res.NextResultSet(ctx) || !res.HasNextRow() { - truncated = false - return nil - } - truncated = res.CurrentResultSet().Truncated() - glog.V(4).Infof("truncated %v, entryCount %d", truncated, entryCount) - for res.NextRow() { - if err := res.ScanNamed( - named.OptionalWithDefault("name", &name), - named.OptionalWithDefault("meta", &data)); err != nil { - return fmt.Errorf("list scanNamed %s : %v", dir, err) - } - glog.V(8).Infof("name %s, fullpath %s", name, util.NewFullPath(dir, name)) - lastFileName = name - entry := &filer.Entry{ - FullPath: util.NewFullPath(dir, name), - } - if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { - return fmt.Errorf("scan decode %s : %v", entry.FullPath, err) + + err := store.doTxOrDB(ctx, q, params, roQC, func(res query.Result) error { + for rs, err := range res.ResultSets(ctx) { + if err != nil { + return err } - if !eachEntryFunc(entry) { - eachEntryFuncIsNotBreake = false - break + for row, err := range rs.Rows(ctx) { + if err != nil { + return err + } + + var name string + var data []byte + if scanErr := row.Scan(&name, &data); scanErr != nil { + return fmt.Errorf("scan %s: %w", dir, scanErr) + } + + lastFileName = name + entry := &filer.Entry{FullPath: util.NewFullPath(dir, name)} + if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil { + return fmt.Errorf("decode entry %s: %w", entry.FullPath, decodeErr) + } + + if !eachEntryFunc(entry) { + return nil + } + + rowCount++ + entryCount++ + startFileName = lastFileName + + if entryCount >= limit { + return nil + } } - entryCount += 1 } - return res.Err() + return nil }) - } - if err != nil { - return lastFileName, err + if err != nil { + return lastFileName, err + } + + if rowCount < chunkLimit { + prevFetchedLessThanChunk = true + } } return lastFileName, nil } @@ -380,7 +420,7 @@ func (store *YdbStore) OnBucketDeletion(bucket string) { func (store *YdbStore) createTable(ctx context.Context, prefix string) error { return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - return s.CreateTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE), createTableOptions()...) + return s.CreateTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE), store.createTableOptions()...) }) } @@ -424,16 +464,22 @@ func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPre store.dbsLock.Lock() defer store.dbsLock.Unlock() - tablePathPrefixWithBucket := path.Join(store.tablePathPrefix, bucket) if _, found := store.dbs[bucket]; !found { - if err := store.createTable(ctx, tablePathPrefixWithBucket); err == nil { - store.dbs[bucket] = true - glog.V(4).Infof("created table %s", tablePathPrefixWithBucket) - } else { - glog.Errorf("createTable %s: %v", tablePathPrefixWithBucket, err) + glog.V(4).Infof("bucket %q not in cache, verifying existence via DescribeTable", bucket) + tablePath := path.Join(store.tablePathPrefix, bucket, abstract_sql.DEFAULT_TABLE) + err2 := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, err3 := s.DescribeTable(ctx, tablePath) + return err3 + }) + if err2 != nil { + glog.V(4).Infof("bucket %q not found (DescribeTable %s failed)", bucket, tablePath) + return } + glog.V(4).Infof("bucket %q exists, adding to cache", bucket) + store.dbs[bucket] = true } - tablePathPrefix = &tablePathPrefixWithBucket + bucketPrefix := path.Join(store.tablePathPrefix, bucket) + tablePathPrefix = &bucketPrefix } return } @@ -444,7 +490,7 @@ func (store *YdbStore) ensureTables(ctx context.Context) error { glog.V(4).Infof("creating base table %s", prefixFull) baseTable := path.Join(prefixFull, abstract_sql.DEFAULT_TABLE) if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - return s.CreateTable(ctx, baseTable, createTableOptions()...) + return s.CreateTable(ctx, baseTable, store.createTableOptions()...) }); err != nil { return fmt.Errorf("failed to create base table %s: %v", baseTable, err) } @@ -457,7 +503,7 @@ func (store *YdbStore) ensureTables(ctx context.Context) error { glog.V(4).Infof("creating bucket table %s", bucket) bucketTable := path.Join(prefixFull, bucket, abstract_sql.DEFAULT_TABLE) if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - return s.CreateTable(ctx, bucketTable, createTableOptions()...) + return s.CreateTable(ctx, bucketTable, store.createTableOptions()...) }); err != nil { glog.Errorf("failed to create bucket table %s: %v", bucketTable, err) } diff --git a/weed/filer/ydb/ydb_store_kv.go b/weed/filer/ydb/ydb_store_kv.go index b5d356b4c..070f17e23 100644 --- a/weed/filer/ydb/ydb_store_kv.go +++ b/weed/filer/ydb/ydb_store_kv.go @@ -9,48 +9,54 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/ydb-platform/ydb-go-sdk/v3/query" "github.com/ydb-platform/ydb-go-sdk/v3/table" - "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" "github.com/ydb-platform/ydb-go-sdk/v3/table/types" ) func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { dirStr, dirHash, name := abstract_sql.GenDirAndName(key) fileMeta := FileMeta{dirHash, name, dirStr, value} - return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - _, _, err = s.Execute(ctx, rwTX, *withPragma(&store.tablePathPrefix, upsertQuery), - fileMeta.queryParameters(0)) + return store.DB.Query().Do(ctx, func(ctx context.Context, s query.Session) (err error) { + _, err = s.Query(ctx, *withPragma(&store.tablePathPrefix, upsertQuery), + query.WithParameters(fileMeta.queryParameters(0)), rwQC) if err != nil { return fmt.Errorf("kv put execute %s: %v", util.NewFullPath(dirStr, name).Name(), err) } return nil - }) + }, query.WithIdempotent()) } func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { dirStr, dirHash, name := abstract_sql.GenDirAndName(key) valueFound := false - err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, res, err := s.Execute(ctx, roTX, *withPragma(&store.tablePathPrefix, findQuery), - table.NewQueryParameters( + err = store.DB.Query().Do(ctx, func(ctx context.Context, s query.Session) error { + res, err := s.Query(ctx, *withPragma(&store.tablePathPrefix, findQuery), + query.WithParameters(table.NewQueryParameters( table.ValueParam("$dir_hash", types.Int64Value(dirHash)), - table.ValueParam("$name", types.UTF8Value(name)))) + table.ValueParam("$directory", types.UTF8Value(dirStr)), + table.ValueParam("$name", types.UTF8Value(name)))), roQC) if err != nil { return fmt.Errorf("kv get execute %s: %v", util.NewFullPath(dirStr, name).Name(), err) } - defer func() { _ = res.Close() }() - if !res.NextResultSet(ctx) || !res.HasNextRow() { - return nil - } - for res.NextRow() { - if err := res.ScanNamed(named.OptionalWithDefault("meta", &value)); err != nil { - return fmt.Errorf("scanNamed %s : %v", util.NewFullPath(dirStr, name).Name(), err) + defer func() { _ = res.Close(ctx) }() + for rs, err := range res.ResultSets(ctx) { + if err != nil { + return err + } + for row, err := range rs.Rows(ctx) { + if err != nil { + return err + } + if err := row.Scan(&value); err != nil { + return fmt.Errorf("scan %s : %v", util.NewFullPath(dirStr, name).Name(), err) + } + valueFound = true + return nil } - valueFound = true - return nil } - return res.Err() - }) + return nil + }, query.WithIdempotent()) if !valueFound { return nil, filer.ErrKvNotFound @@ -61,15 +67,16 @@ func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) { dirStr, dirHash, name := abstract_sql.GenDirAndName(key) - return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { - _, _, err = s.Execute(ctx, rwTX, *withPragma(&store.tablePathPrefix, deleteQuery), - table.NewQueryParameters( + return store.DB.Query().Do(ctx, func(ctx context.Context, s query.Session) (err error) { + _, err = s.Query(ctx, *withPragma(&store.tablePathPrefix, deleteQuery), + query.WithParameters(table.NewQueryParameters( table.ValueParam("$dir_hash", types.Int64Value(dirHash)), - table.ValueParam("$name", types.UTF8Value(name)))) + table.ValueParam("$directory", types.UTF8Value(dirStr)), + table.ValueParam("$name", types.UTF8Value(name)))), rwQC) if err != nil { return fmt.Errorf("kv delete %s: %v", util.NewFullPath(dirStr, name).Name(), err) } return nil - }) + }, query.WithIdempotent()) } diff --git a/weed/filer/ydb/ydb_store_test.go b/weed/filer/ydb/ydb_store_test.go index b692ba29f..1deef465c 100644 --- a/weed/filer/ydb/ydb_store_test.go +++ b/weed/filer/ydb/ydb_store_test.go @@ -13,7 +13,8 @@ func TestStore(t *testing.T) { // to set up local env if false { store := &YdbStore{} - store.initialize("/buckets", "grpc://localhost:2136/?database=local", "seaweedfs", true, 10, 50) + store.initialize("/buckets", "grpc://localhost:2136/?database=local", "seaweedfs", true, 10, 50, + true, 200, true, 5, 1000, 2000) store_test.TestFilerStore(t, store) } } diff --git a/weed/filer/ydb/ydb_types.go b/weed/filer/ydb/ydb_types.go index dd869cac6..05e0b7173 100644 --- a/weed/filer/ydb/ydb_types.go +++ b/weed/filer/ydb/ydb_types.go @@ -30,26 +30,35 @@ func (fm *FileMeta) queryParameters(ttlSec int32) *table.QueryParameters { table.ValueParam("$dir_hash", types.Int64Value(fm.DirHash)), table.ValueParam("$directory", types.UTF8Value(fm.Directory)), table.ValueParam("$name", types.UTF8Value(fm.Name)), - table.ValueParam("$meta", types.StringValue(fm.Meta)), + table.ValueParam("$meta", types.BytesValue(fm.Meta)), table.ValueParam("$expire_at", expireAtValue)) } -func createTableOptions() []options.CreateTableOption { +func (store *YdbStore) createTableOptions() []options.CreateTableOption { columnUnit := options.TimeToLiveUnitSeconds return []options.CreateTableOption{ - options.WithColumn("dir_hash", types.Optional(types.TypeInt64)), - options.WithColumn("directory", types.Optional(types.TypeUTF8)), - options.WithColumn("name", types.Optional(types.TypeUTF8)), - options.WithColumn("meta", types.Optional(types.TypeString)), + options.WithColumn("dir_hash", types.TypeInt64), + options.WithColumn("directory", types.TypeUTF8), + options.WithColumn("name", types.TypeUTF8), + options.WithColumn("meta", types.TypeString), options.WithColumn("expire_at", types.Optional(types.TypeUint32)), - options.WithPrimaryKeyColumn("dir_hash", "name"), + options.WithPrimaryKeyColumn("dir_hash", "directory", "name"), options.WithTimeToLiveSettings(options.TimeToLiveSettings{ ColumnName: "expire_at", ColumnUnit: &columnUnit, Mode: options.TimeToLiveModeValueSinceUnixEpoch}, ), + options.WithPartitioningSettings( + options.WithPartitioningBy([]string{"dir_hash", "name"}), + options.WithPartitioningBySize(store.partitionBySizeEnabled), + options.WithPartitionSizeMb(store.partitionSizeMb), + options.WithPartitioningByLoad(store.partitionByLoadEnabled), + options.WithMinPartitionsCount(store.minPartitionsCount), + options.WithMaxPartitionsCount(store.maxPartitionsCount), + ), } } + func withPragma(prefix *string, query string) *string { queryWithPragma := fmt.Sprintf(query, *prefix) return &queryWithPragma