|
|
@ -15,7 +15,6 @@ import ( |
|
|
|
"github.com/ydb-platform/ydb-go-sdk/v3" |
|
|
|
"github.com/ydb-platform/ydb-go-sdk/v3/sugar" |
|
|
|
"github.com/ydb-platform/ydb-go-sdk/v3/table" |
|
|
|
"github.com/ydb-platform/ydb-go-sdk/v3/table/options" |
|
|
|
"github.com/ydb-platform/ydb-go-sdk/v3/table/result" |
|
|
|
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" |
|
|
|
"github.com/ydb-platform/ydb-go-sdk/v3/table/types" |
|
|
@ -28,7 +27,6 @@ import ( |
|
|
|
|
|
|
|
const ( |
|
|
|
defaultDialTimeOut = 10 |
|
|
|
maxRowsInQuery = 1000 // Limit number of rows in query results https://cloud.yandex.com/en-ru/docs/ydb/concepts/limits-ydb
|
|
|
|
) |
|
|
|
|
|
|
|
var ( |
|
|
@ -112,14 +110,13 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix |
|
|
|
func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *table.QueryParameters, tc *table.TransactionControl, processResultFunc func(res result.Result) error) (err error) { |
|
|
|
var res result.Result |
|
|
|
if tx, ok := ctx.Value("tx").(table.Transaction); ok { |
|
|
|
res, err = tx.Execute(ctx, *query, params, options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) |
|
|
|
res, err = tx.Execute(ctx, *query, params) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("execute transaction: %v", err) |
|
|
|
} |
|
|
|
} else { |
|
|
|
err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { |
|
|
|
_, res, err = s.Execute(ctx, tc, *query, |
|
|
|
params, options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) |
|
|
|
_, res, err = s.Execute(ctx, tc, *query, params) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("execute statement: %v", err) |
|
|
|
} |
|
|
@ -240,10 +237,6 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath |
|
|
|
} |
|
|
|
truncated := true |
|
|
|
eachEntryFuncIsNotBreake := true |
|
|
|
shortLimit := limit |
|
|
|
if limit > maxRowsInQuery { |
|
|
|
shortLimit = maxRowsInQuery * 2 |
|
|
|
} |
|
|
|
entryCount := int64(0) |
|
|
|
for truncated && eachEntryFuncIsNotBreake { |
|
|
|
if lastFileName != "" { |
|
|
@ -253,15 +246,12 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath |
|
|
|
} |
|
|
|
} |
|
|
|
restLimit := limit - entryCount |
|
|
|
if maxRowsInQuery > restLimit { |
|
|
|
shortLimit = restLimit |
|
|
|
} |
|
|
|
queryParams := table.NewQueryParameters( |
|
|
|
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), |
|
|
|
table.ValueParam("$directory", types.UTF8Value(*shortDir)), |
|
|
|
table.ValueParam("$start_name", types.UTF8Value(startFileName)), |
|
|
|
table.ValueParam("$prefix", types.UTF8Value(prefix+"%")), |
|
|
|
table.ValueParam("$limit", types.Uint64Value(uint64(shortLimit))), |
|
|
|
table.ValueParam("$limit", types.Uint64Value(uint64(restLimit))), |
|
|
|
) |
|
|
|
err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error { |
|
|
|
var name string |
|
|
|