|
|
@ -28,6 +28,7 @@ import ( |
|
|
|
|
|
|
|
const ( |
|
|
|
defaultDialTimeOut = 10 |
|
|
|
maxRowsInQuery = 1000 // Limit number of rows in query results https://cloud.yandex.com/en-ru/docs/ydb/concepts/limits-ydb
|
|
|
|
) |
|
|
|
|
|
|
|
var ( |
|
|
@ -89,7 +90,7 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix |
|
|
|
dsn = os.Getenv("YDB_CONNECTION_STRING") |
|
|
|
} |
|
|
|
store.DB, err = ydb.Open(ctx, dsn, opts...) |
|
|
|
if err != nil || store.DB == nil { |
|
|
|
if err != nil { |
|
|
|
if store.DB != nil { |
|
|
|
_ = store.DB.Close(ctx) |
|
|
|
store.DB = nil |
|
|
@ -139,7 +140,7 @@ func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *tabl |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Entry, isUpdate bool) (err error) { |
|
|
|
func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { |
|
|
|
dir, name := entry.FullPath.DirAndName() |
|
|
|
meta, err := entry.EncodeAttributesAndChunks() |
|
|
|
if err != nil { |
|
|
@ -151,21 +152,15 @@ func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Ent |
|
|
|
} |
|
|
|
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) |
|
|
|
fileMeta := FileMeta{util.HashStringToLong(dir), name, *shortDir, meta} |
|
|
|
var query *string |
|
|
|
if isUpdate { |
|
|
|
query = withPragma(tablePathPrefix, updateQuery) |
|
|
|
} else { |
|
|
|
query = withPragma(tablePathPrefix, insertQuery) |
|
|
|
} |
|
|
|
return store.doTxOrDB(ctx, query, fileMeta.queryParameters(entry.TtlSec), rwTX, nil) |
|
|
|
return store.doTxOrDB(ctx, withPragma(tablePathPrefix, upsertQuery), fileMeta.queryParameters(entry.TtlSec), rwTX, nil) |
|
|
|
} |
|
|
|
|
|
|
|
func (store *YdbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { |
|
|
|
return store.insertOrUpdateEntry(ctx, entry, false) |
|
|
|
return store.insertOrUpdateEntry(ctx, entry) |
|
|
|
} |
|
|
|
|
|
|
|
func (store *YdbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { |
|
|
|
return store.insertOrUpdateEntry(ctx, entry, true) |
|
|
|
return store.insertOrUpdateEntry(ctx, entry) |
|
|
|
} |
|
|
|
|
|
|
|
func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { |
|
|
@ -179,14 +174,15 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e |
|
|
|
table.ValueParam("$name", types.UTF8Value(name))) |
|
|
|
|
|
|
|
err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error { |
|
|
|
for res.NextResultSet(ctx) { |
|
|
|
for res.NextRow() { |
|
|
|
if err = res.ScanNamed(named.OptionalWithDefault("meta", &data)); err != nil { |
|
|
|
return fmt.Errorf("scanNamed %s : %v", fullpath, err) |
|
|
|
} |
|
|
|
entryFound = true |
|
|
|
return nil |
|
|
|
if !res.NextResultSet(ctx) || !res.HasNextRow() { |
|
|
|
return nil |
|
|
|
} |
|
|
|
for res.NextRow() { |
|
|
|
if err = res.ScanNamed(named.OptionalWithDefault("meta", &data)); err != nil { |
|
|
|
return fmt.Errorf("scanNamed %s : %v", fullpath, err) |
|
|
|
} |
|
|
|
entryFound = true |
|
|
|
return nil |
|
|
|
} |
|
|
|
return res.Err() |
|
|
|
}) |
|
|
@ -230,7 +226,7 @@ func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.F |
|
|
|
} |
|
|
|
|
|
|
|
func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { |
|
|
|
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil) |
|
|
|
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) |
|
|
|
} |
|
|
|
|
|
|
|
func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { |
|
|
@ -242,17 +238,39 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath |
|
|
|
} else { |
|
|
|
query = withPragma(tablePathPrefix, listDirectoryQuery) |
|
|
|
} |
|
|
|
queryParams := table.NewQueryParameters( |
|
|
|
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), |
|
|
|
table.ValueParam("$directory", types.UTF8Value(*shortDir)), |
|
|
|
table.ValueParam("$start_name", types.UTF8Value(startFileName)), |
|
|
|
table.ValueParam("$prefix", types.UTF8Value(prefix+"%")), |
|
|
|
table.ValueParam("$limit", types.Uint64Value(uint64(limit))), |
|
|
|
) |
|
|
|
err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error { |
|
|
|
var name string |
|
|
|
var data []byte |
|
|
|
for res.NextResultSet(ctx) { |
|
|
|
truncated := true |
|
|
|
eachEntryFuncIsNotBreake := true |
|
|
|
shortLimit := limit |
|
|
|
if limit > maxRowsInQuery { |
|
|
|
shortLimit = maxRowsInQuery * 2 |
|
|
|
} |
|
|
|
entryCount := int64(0) |
|
|
|
for truncated && eachEntryFuncIsNotBreake { |
|
|
|
if lastFileName != "" { |
|
|
|
startFileName = lastFileName |
|
|
|
if includeStartFile { |
|
|
|
query = withPragma(tablePathPrefix, listDirectoryQuery) |
|
|
|
} |
|
|
|
} |
|
|
|
restLimit := limit - entryCount |
|
|
|
if maxRowsInQuery > restLimit { |
|
|
|
shortLimit = restLimit |
|
|
|
} |
|
|
|
queryParams := table.NewQueryParameters( |
|
|
|
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), |
|
|
|
table.ValueParam("$directory", types.UTF8Value(*shortDir)), |
|
|
|
table.ValueParam("$start_name", types.UTF8Value(startFileName)), |
|
|
|
table.ValueParam("$prefix", types.UTF8Value(prefix+"%")), |
|
|
|
table.ValueParam("$limit", types.Uint64Value(uint64(shortLimit))), |
|
|
|
) |
|
|
|
err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error { |
|
|
|
var name string |
|
|
|
var data []byte |
|
|
|
if !res.NextResultSet(ctx) || !res.HasNextRow() { |
|
|
|
truncated = false |
|
|
|
return nil |
|
|
|
} |
|
|
|
truncated = res.CurrentResultSet().Truncated() |
|
|
|
for res.NextRow() { |
|
|
|
if err := res.ScanNamed( |
|
|
|
named.OptionalWithDefault("name", &name), |
|
|
@ -267,12 +285,14 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath |
|
|
|
return fmt.Errorf("scan decode %s : %v", entry.FullPath, err) |
|
|
|
} |
|
|
|
if !eachEntryFunc(entry) { |
|
|
|
eachEntryFuncIsNotBreake = false |
|
|
|
break |
|
|
|
} |
|
|
|
entryCount += 1 |
|
|
|
} |
|
|
|
} |
|
|
|
return res.Err() |
|
|
|
}) |
|
|
|
return res.Err() |
|
|
|
}) |
|
|
|
} |
|
|
|
if err != nil { |
|
|
|
return lastFileName, err |
|
|
|
} |
|
|
|