@ -6,6 +6,8 @@ package ydb
import (
import (
"context"
"context"
"fmt"
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"os"
"os"
"path"
"path"
"strings"
"strings"
@ -20,30 +22,37 @@ import (
environ "github.com/ydb-platform/ydb-go-sdk-auth-environ"
environ "github.com/ydb-platform/ydb-go-sdk-auth-environ"
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
)
)
const (
const (
defaultDialTimeOut = 10
defaultDialTimeOut = 10
defaultPartitionBySizeEnabled = true
defaultPartitionSizeMb = 200
defaultPartitionByLoadEnabled = true
defaultMinPartitionsCount = 5
defaultMaxPartitionsCount = 1000
defaultMaxListChunk = 2000
)
)
var (
var (
roTX = table . TxControl (
table . BeginTx ( table . WithOnlineReadOnly ( ) ) ,
table . CommitTx ( ) ,
)
rwTX = table . DefaultTxControl ( )
roQC = query . WithTxControl ( query . OnlineReadOnlyTxControl ( ) )
rwQC = query . WithTxControl ( query . DefaultTxControl ( ) )
)
)
type YdbStore struct {
type YdbStore struct {
DB ydb . Connection
dirBuckets string
tablePathPrefix string
SupportBucketTable bool
dbs map [ string ] bool
dbsLock sync . Mutex
DB * ydb . Driver
dirBuckets string
tablePathPrefix string
SupportBucketTable bool
partitionBySizeEnabled options . FeatureFlag
partitionSizeMb uint64
partitionByLoadEnabled options . FeatureFlag
minPartitionsCount uint64
maxPartitionsCount uint64
maxListChunk int
dbs map [ string ] bool
dbsLock sync . Mutex
}
}
func init ( ) {
func init ( ) {
@ -55,6 +64,12 @@ func (store *YdbStore) GetName() string {
}
}
func ( store * YdbStore ) Initialize ( configuration util . Configuration , prefix string ) ( err error ) {
func ( store * YdbStore ) Initialize ( configuration util . Configuration , prefix string ) ( err error ) {
configuration . SetDefault ( prefix + "partitionBySizeEnabled" , defaultPartitionBySizeEnabled )
configuration . SetDefault ( prefix + "partitionSizeMb" , defaultPartitionSizeMb )
configuration . SetDefault ( prefix + "partitionByLoadEnabled" , defaultPartitionByLoadEnabled )
configuration . SetDefault ( prefix + "minPartitionsCount" , defaultMinPartitionsCount )
configuration . SetDefault ( prefix + "maxPartitionsCount" , defaultMaxPartitionsCount )
configuration . SetDefault ( prefix + "maxListChunk" , defaultMaxListChunk )
return store . initialize (
return store . initialize (
configuration . GetString ( "filer.options.buckets_folder" ) ,
configuration . GetString ( "filer.options.buckets_folder" ) ,
configuration . GetString ( prefix + "dsn" ) ,
configuration . GetString ( prefix + "dsn" ) ,
@ -62,18 +77,37 @@ func (store *YdbStore) Initialize(configuration util.Configuration, prefix strin
configuration . GetBool ( prefix + "useBucketPrefix" ) ,
configuration . GetBool ( prefix + "useBucketPrefix" ) ,
configuration . GetInt ( prefix + "dialTimeOut" ) ,
configuration . GetInt ( prefix + "dialTimeOut" ) ,
configuration . GetInt ( prefix + "poolSizeLimit" ) ,
configuration . GetInt ( prefix + "poolSizeLimit" ) ,
configuration . GetBool ( prefix + "partitionBySizeEnabled" ) ,
uint64 ( configuration . GetInt ( prefix + "partitionSizeMb" ) ) ,
configuration . GetBool ( prefix + "partitionByLoadEnabled" ) ,
uint64 ( configuration . GetInt ( prefix + "minPartitionsCount" ) ) ,
uint64 ( configuration . GetInt ( prefix + "maxPartitionsCount" ) ) ,
configuration . GetInt ( prefix + "maxListChunk" ) ,
)
)
}
}
func ( store * YdbStore ) initialize ( dirBuckets string , dsn string , tablePathPrefix string , useBucketPrefix bool , dialTimeOut int , poolSizeLimit int ) ( err error ) {
func ( store * YdbStore ) initialize ( dirBuckets string , dsn string , tablePathPrefix string , useBucketPrefix bool , dialTimeOut int , poolSizeLimit int , partitionBySizeEnabled bool , partitionSizeMb uint64 , partitionByLoadEnabled bool , minPartitionsCount uint64 , maxPartitionsCount uint64 , maxListChunk int ) ( err error ) {
store . dirBuckets = dirBuckets
store . dirBuckets = dirBuckets
store . SupportBucketTable = useBucketPrefix
store . SupportBucketTable = useBucketPrefix
if partitionBySizeEnabled {
store . partitionBySizeEnabled = options . FeatureEnabled
} else {
store . partitionBySizeEnabled = options . FeatureDisabled
}
if partitionByLoadEnabled {
store . partitionByLoadEnabled = options . FeatureEnabled
} else {
store . partitionByLoadEnabled = options . FeatureDisabled
}
store . partitionSizeMb = partitionSizeMb
store . minPartitionsCount = minPartitionsCount
store . maxPartitionsCount = maxPartitionsCount
store . maxListChunk = maxListChunk
if store . SupportBucketTable {
if store . SupportBucketTable {
glog . V ( 0 ) . Infof ( "enabled BucketPrefix" )
glog . V ( 0 ) . Infof ( "enabled BucketPrefix" )
}
}
store . dbs = make ( map [ string ] bool )
store . dbs = make ( map [ string ] bool )
ctx , cancel := context . WithCancel ( context . Background ( ) )
defer cancel ( )
ctx := context . Background ( )
if dialTimeOut == 0 {
if dialTimeOut == 0 {
dialTimeOut = defaultDialTimeOut
dialTimeOut = defaultDialTimeOut
}
}
@ -89,11 +123,7 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix
}
}
store . DB , err = ydb . Open ( ctx , dsn , opts ... )
store . DB , err = ydb . Open ( ctx , dsn , opts ... )
if err != nil {
if err != nil {
if store . DB != nil {
_ = store . DB . Close ( ctx )
store . DB = nil
}
return fmt . Errorf ( "can not connect to %s error: %v" , dsn , err )
return fmt . Errorf ( "can not connect to %s: %w" , dsn , err )
}
}
store . tablePathPrefix = path . Join ( store . DB . Name ( ) , tablePathPrefix )
store . tablePathPrefix = path . Join ( store . DB . Name ( ) , tablePathPrefix )
@ -104,29 +134,27 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix
return err
return err
}
}
func ( store * YdbStore ) doTxOrDB ( ctx context . Context , query * string , params * table . QueryParameters , tc * table . TransactionControl , processResultFunc func ( res result . Result ) error ) ( err error ) {
var res result . Result
if tx , ok := ctx . Value ( "tx" ) . ( table . Transaction ) ; ok {
res , err = tx . Execute ( ctx , * query , params )
func ( store * YdbStore ) doTxOrDB ( ctx context . Context , q * string , params * table . QueryParameters , ts query . ExecuteOption , processResultFunc func ( res query . Result ) error ) ( err error ) {
var res query . Result
if tx , ok := ctx . Value ( "tx" ) . ( query . Transaction ) ; ok {
res , err = tx . Query ( ctx , * q , query . WithParameters ( params ) )
if err != nil {
if err != nil {
return fmt . Errorf ( "execute transaction: %v" , err )
return fmt . Errorf ( "execute transaction: %v" , err )
}
}
} else {
} else {
err = store . DB . Table ( ) . Do ( ctx , func ( ctx context . Context , s table . Session ) ( err error ) {
_ , res , err = s . Execute ( ctx , tc , * query , param s)
err = store . DB . Query ( ) . Do ( ctx , func ( ctx context . Context , s query . Session ) ( err error ) {
res , err = s . Query ( ctx , * q , query . WithParameters ( params ) , t s)
if err != nil {
if err != nil {
return fmt . Errorf ( "execute statement: %v" , err )
return fmt . Errorf ( "execute statement: %v" , err )
}
}
return nil
return nil
} ,
table . WithIdempotent ( ) ,
)
} , query . WithIdempotent ( ) )
}
}
if err != nil {
if err != nil {
return err
return err
}
}
if res != nil {
if res != nil {
defer func ( ) { _ = res . Close ( ) } ( )
defer func ( ) { _ = res . Close ( ctx ) } ( )
if processResultFunc != nil {
if processResultFunc != nil {
if err = processResultFunc ( res ) ; err != nil {
if err = processResultFunc ( res ) ; err != nil {
return fmt . Errorf ( "process result: %v" , err )
return fmt . Errorf ( "process result: %v" , err )
@ -148,7 +176,7 @@ func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Ent
}
}
tablePathPrefix , shortDir := store . getPrefix ( ctx , & dir )
tablePathPrefix , shortDir := store . getPrefix ( ctx , & dir )
fileMeta := FileMeta { util . HashStringToLong ( dir ) , name , * shortDir , meta }
fileMeta := FileMeta { util . HashStringToLong ( dir ) , name , * shortDir , meta }
return store . doTxOrDB ( ctx , withPragma ( tablePathPrefix , upsertQuery ) , fileMeta . queryParameters ( entry . TtlSec ) , rwTX , nil )
return store . doTxOrDB ( ctx , withPragma ( tablePathPrefix , upsertQuery ) , fileMeta . queryParameters ( entry . TtlSec ) , rwQC , nil )
}
}
func ( store * YdbStore ) InsertEntry ( ctx context . Context , entry * filer . Entry ) ( err error ) {
func ( store * YdbStore ) InsertEntry ( ctx context . Context , entry * filer . Entry ) ( err error ) {
@ -164,23 +192,29 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e
var data [ ] byte
var data [ ] byte
entryFound := false
entryFound := false
tablePathPrefix , shortDir := store . getPrefix ( ctx , & dir )
tablePathPrefix , shortDir := store . getPrefix ( ctx , & dir )
query := withPragma ( tablePathPrefix , findQuery )
q := withPragma ( tablePathPrefix , findQuery )
queryParams := table . NewQueryParameters (
queryParams := table . NewQueryParameters (
table . ValueParam ( "$dir_hash" , types . Int64Value ( util . HashStringToLong ( * shortDir ) ) ) ,
table . ValueParam ( "$dir_hash" , types . Int64Value ( util . HashStringToLong ( * shortDir ) ) ) ,
table . ValueParam ( "$directory" , types . UTF8Value ( * shortDir ) ) ,
table . ValueParam ( "$name" , types . UTF8Value ( name ) ) )
table . ValueParam ( "$name" , types . UTF8Value ( name ) ) )
err = store . doTxOrDB ( ctx , query , queryParams , roTX , func ( res result . Result ) error {
if ! res . NextResultSet ( ctx ) || ! res . HasNextRow ( ) {
return nil
}
for res . NextRow ( ) {
if err = res . ScanNamed ( named . OptionalWithDefault ( "meta" , & data ) ) ; err != nil {
return fmt . Errorf ( "scanNamed %s : %v" , fullpath , err )
err = store . doTxOrDB ( ctx , q , queryParams , roQC , func ( res query . Result ) error {
for rs , err := range res . ResultSets ( ctx ) {
if err != nil {
return err
}
for row , err := range rs . Rows ( ctx ) {
if err != nil {
return err
}
if scanErr := row . Scan ( & data ) ; scanErr != nil {
return fmt . Errorf ( "scan %s: %v" , fullpath , scanErr )
}
entryFound = true
return nil
}
}
entryFound = true
return nil
}
}
return res . Err ( )
return nil
} )
} )
if err != nil {
if err != nil {
return nil , err
return nil , err
@ -189,37 +223,35 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e
return nil , filer_pb . ErrNotFound
return nil , filer_pb . ErrNotFound
}
}
entry = & filer . Entry {
FullPath : fullpath ,
entry = & filer . Entry { FullPath : fullpath }
if decodeErr := entry . DecodeAttributesAndChunks ( util . MaybeDecompressData ( data ) ) ; decodeErr != nil {
return nil , fmt . Errorf ( "decode %s: %v" , fullpath , decodeErr )
}
}
if err := entry . DecodeAttributesAndChunks ( util . MaybeDecompressData ( data ) ) ; err != nil {
return nil , fmt . Errorf ( "decode %s : %v" , fullpath , err )
}
return entry , nil
return entry , nil
}
}
func ( store * YdbStore ) DeleteEntry ( ctx context . Context , fullpath util . FullPath ) ( err error ) {
func ( store * YdbStore ) DeleteEntry ( ctx context . Context , fullpath util . FullPath ) ( err error ) {
dir , name := fullpath . DirAndName ( )
dir , name := fullpath . DirAndName ( )
tablePathPrefix , shortDir := store . getPrefix ( ctx , & dir )
tablePathPrefix , shortDir := store . getPrefix ( ctx , & dir )
query := withPragma ( tablePathPrefix , deleteQuery )
q := withPragma ( tablePathPrefix , deleteQuery )
glog . V ( 4 ) . Infof ( "DeleteEntry %s, tablePathPrefix %s, shortDir %s" , fullpath , * tablePathPrefix , * shortDir )
glog . V ( 4 ) . Infof ( "DeleteEntry %s, tablePathPrefix %s, shortDir %s" , fullpath , * tablePathPrefix , * shortDir )
queryParams := table . NewQueryParameters (
queryParams := table . NewQueryParameters (
table . ValueParam ( "$dir_hash" , types . Int64Value ( util . HashStringToLong ( * shortDir ) ) ) ,
table . ValueParam ( "$dir_hash" , types . Int64Value ( util . HashStringToLong ( * shortDir ) ) ) ,
table . ValueParam ( "$directory" , types . UTF8Value ( * shortDir ) ) ,
table . ValueParam ( "$name" , types . UTF8Value ( name ) ) )
table . ValueParam ( "$name" , types . UTF8Value ( name ) ) )
return store . doTxOrDB ( ctx , query , queryParams , rwTX , nil )
return store . doTxOrDB ( ctx , q , queryParams , rwQC , nil )
}
}
func ( store * YdbStore ) DeleteFolderChildren ( ctx context . Context , fullpath util . FullPath ) ( err error ) {
func ( store * YdbStore ) DeleteFolderChildren ( ctx context . Context , fullpath util . FullPath ) ( err error ) {
dir := string ( fullpath )
dir := string ( fullpath )
tablePathPrefix , shortDir := store . getPrefix ( ctx , & dir )
tablePathPrefix , shortDir := store . getPrefix ( ctx , & dir )
query := withPragma ( tablePathPrefix , deleteFolderChildrenQuery )
q := withPragma ( tablePathPrefix , deleteFolderChildrenQuery )
queryParams := table . NewQueryParameters (
queryParams := table . NewQueryParameters (
table . ValueParam ( "$dir_hash" , types . Int64Value ( util . HashStringToLong ( * shortDir ) ) ) ,
table . ValueParam ( "$dir_hash" , types . Int64Value ( util . HashStringToLong ( * shortDir ) ) ) ,
table . ValueParam ( "$directory" , types . UTF8Value ( * shortDir ) ) )
table . ValueParam ( "$directory" , types . UTF8Value ( * shortDir ) ) )
return store . doTxOrDB ( ctx , query , queryParams , rwTX , nil )
return store . doTxOrDB ( ctx , q , queryParams , rwQC , nil )
}
}
func ( store * YdbStore ) ListDirectoryEntries ( ctx context . Context , dirPath util . FullPath , startFileName string , includeStartFile bool , limit int64 , eachEntryFunc filer . ListEachEntryFunc ) ( lastFileName string , err error ) {
func ( store * YdbStore ) ListDirectoryEntries ( ctx context . Context , dirPath util . FullPath , startFileName string , includeStartFile bool , limit int64 , eachEntryFunc filer . ListEachEntryFunc ) ( lastFileName string , err error ) {
@ -229,71 +261,79 @@ func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.Fu
func ( store * YdbStore ) ListDirectoryPrefixedEntries ( ctx context . Context , dirPath util . FullPath , startFileName string , includeStartFile bool , limit int64 , prefix string , eachEntryFunc filer . ListEachEntryFunc ) ( lastFileName string , err error ) {
func ( store * YdbStore ) ListDirectoryPrefixedEntries ( ctx context . Context , dirPath util . FullPath , startFileName string , includeStartFile bool , limit int64 , prefix string , eachEntryFunc filer . ListEachEntryFunc ) ( lastFileName string , err error ) {
dir := string ( dirPath )
dir := string ( dirPath )
tablePathPrefix , shortDir := store . getPrefix ( ctx , & dir )
tablePathPrefix , shortDir := store . getPrefix ( ctx , & dir )
var query * string
if includeStartFile {
query = withPragma ( tablePathPrefix , listInclusiveDirectoryQuery )
} else {
query = withPragma ( tablePathPrefix , listDirectoryQuery )
}
truncated := true
eachEntryFuncIsNotBreake := true
entryCount := int64 ( 0 )
for truncated && eachEntryFuncIsNotBreake {
if lastFileName != "" {
startFileName = lastFileName
if includeStartFile {
query = withPragma ( tablePathPrefix , listDirectoryQuery )
}
baseInclusive := withPragma ( tablePathPrefix , listInclusiveDirectoryQuery )
baseExclusive := withPragma ( tablePathPrefix , listDirectoryQuery )
var entryCount int64
var prevFetchedLessThanChunk bool
for entryCount < limit {
if prevFetchedLessThanChunk {
break
}
var q * string
if entryCount == 0 && includeStartFile {
q = baseInclusive
} else {
q = baseExclusive
}
}
restLimit := limit - entryCount
const maxChunk = int64 ( 1000 )
chunkLimit := restLimit
if chunkLimit > maxChunk {
chunkLimit = maxChunk
rest := limit - entryCount
chunkLimit := rest
if chunkLimit > int64 ( store . maxListChunk ) {
chunkLimit = int64 ( store . maxListChunk )
}
}
glog . V ( 4 ) . Infof ( "startFileName %s, restLimit %d, chunkLimit %d" , startFileName , restLimit , chunkLimit )
var rowCount int64
queryP arams := table . NewQueryParameters (
p arams := table . NewQueryParameters (
table . ValueParam ( "$dir_hash" , types . Int64Value ( util . HashStringToLong ( * shortDir ) ) ) ,
table . ValueParam ( "$dir_hash" , types . Int64Value ( util . HashStringToLong ( * shortDir ) ) ) ,
table . ValueParam ( "$directory" , types . UTF8Value ( * shortDir ) ) ,
table . ValueParam ( "$directory" , types . UTF8Value ( * shortDir ) ) ,
table . ValueParam ( "$start_name" , types . UTF8Value ( startFileName ) ) ,
table . ValueParam ( "$start_name" , types . UTF8Value ( startFileName ) ) ,
table . ValueParam ( "$prefix" , types . UTF8Value ( prefix + "%" ) ) ,
table . ValueParam ( "$prefix" , types . UTF8Value ( prefix + "%" ) ) ,
table . ValueParam ( "$limit" , types . Uint64Value ( uint64 ( chunkLimit ) ) ) ,
table . ValueParam ( "$limit" , types . Uint64Value ( uint64 ( chunkLimit ) ) ) ,
)
)
err = store . doTxOrDB ( ctx , query , queryParams , roTX , func ( res result . Result ) error {
var name string
var data [ ] byte
if ! res . NextResultSet ( ctx ) || ! res . HasNextRow ( ) {
truncated = false
return nil
}
truncated = res . CurrentResultSet ( ) . Truncated ( )
glog . V ( 4 ) . Infof ( "truncated %v, entryCount %d" , truncated , entryCount )
for res . NextRow ( ) {
if err := res . ScanNamed (
named . OptionalWithDefault ( "name" , & name ) ,
named . OptionalWithDefault ( "meta" , & data ) ) ; err != nil {
return fmt . Errorf ( "list scanNamed %s : %v" , dir , err )
}
glog . V ( 8 ) . Infof ( "name %s, fullpath %s" , name , util . NewFullPath ( dir , name ) )
lastFileName = name
entry := & filer . Entry {
FullPath : util . NewFullPath ( dir , name ) ,
}
if err = entry . DecodeAttributesAndChunks ( util . MaybeDecompressData ( data ) ) ; err != nil {
return fmt . Errorf ( "scan decode %s : %v" , entry . FullPath , err )
err := store . doTxOrDB ( ctx , q , params , roQC , func ( res query . Result ) error {
for rs , err := range res . ResultSets ( ctx ) {
if err != nil {
return err
}
}
if ! eachEntryFunc ( entry ) {
eachEntryFuncIsNotBreake = false
break
for row , err := range rs . Rows ( ctx ) {
if err != nil {
return err
}
var name string
var data [ ] byte
if scanErr := row . Scan ( & name , & data ) ; scanErr != nil {
return fmt . Errorf ( "scan %s: %w" , dir , scanErr )
}
lastFileName = name
entry := & filer . Entry { FullPath : util . NewFullPath ( dir , name ) }
if decodeErr := entry . DecodeAttributesAndChunks ( util . MaybeDecompressData ( data ) ) ; decodeErr != nil {
return fmt . Errorf ( "decode entry %s: %w" , entry . FullPath , decodeErr )
}
if ! eachEntryFunc ( entry ) {
return nil
}
rowCount ++
entryCount ++
startFileName = lastFileName
if entryCount >= limit {
return nil
}
}
}
entryCount += 1
}
}
return res . Err ( )
return nil
} )
} )
}
if err != nil {
return lastFileName , err
if err != nil {
return lastFileName , err
}
if rowCount < chunkLimit {
prevFetchedLessThanChunk = true
}
}
}
return lastFileName , nil
return lastFileName , nil
}
}
@ -380,7 +420,7 @@ func (store *YdbStore) OnBucketDeletion(bucket string) {
func ( store * YdbStore ) createTable ( ctx context . Context , prefix string ) error {
func ( store * YdbStore ) createTable ( ctx context . Context , prefix string ) error {
return store . DB . Table ( ) . Do ( ctx , func ( ctx context . Context , s table . Session ) error {
return store . DB . Table ( ) . Do ( ctx , func ( ctx context . Context , s table . Session ) error {
return s . CreateTable ( ctx , path . Join ( prefix , abstract_sql . DEFAULT_TABLE ) , createTableOptions ( ) ... )
return s . CreateTable ( ctx , path . Join ( prefix , abstract_sql . DEFAULT_TABLE ) , store . createTableOptions ( ) ... )
} )
} )
}
}
@ -424,16 +464,22 @@ func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPre
store . dbsLock . Lock ( )
store . dbsLock . Lock ( )
defer store . dbsLock . Unlock ( )
defer store . dbsLock . Unlock ( )
tablePathPrefixWithBucket := path . Join ( store . tablePathPrefix , bucket )
if _ , found := store . dbs [ bucket ] ; ! found {
if _ , found := store . dbs [ bucket ] ; ! found {
if err := store . createTable ( ctx , tablePathPrefixWithBucket ) ; err == nil {
store . dbs [ bucket ] = true
glog . V ( 4 ) . Infof ( "created table %s" , tablePathPrefixWithBucket )
} else {
glog . Errorf ( "createTable %s: %v" , tablePathPrefixWithBucket , err )
glog . V ( 4 ) . Infof ( "bucket %q not in cache, verifying existence via DescribeTable" , bucket )
tablePath := path . Join ( store . tablePathPrefix , bucket , abstract_sql . DEFAULT_TABLE )
err2 := store . DB . Table ( ) . Do ( ctx , func ( ctx context . Context , s table . Session ) error {
_ , err3 := s . DescribeTable ( ctx , tablePath )
return err3
} )
if err2 != nil {
glog . V ( 4 ) . Infof ( "bucket %q not found (DescribeTable %s failed)" , bucket , tablePath )
return
}
}
glog . V ( 4 ) . Infof ( "bucket %q exists, adding to cache" , bucket )
store . dbs [ bucket ] = true
}
}
tablePathPrefix = & tablePathPrefixWithBucket
bucketPrefix := path . Join ( store . tablePathPrefix , bucket )
tablePathPrefix = & bucketPrefix
}
}
return
return
}
}
@ -444,7 +490,7 @@ func (store *YdbStore) ensureTables(ctx context.Context) error {
glog . V ( 4 ) . Infof ( "creating base table %s" , prefixFull )
glog . V ( 4 ) . Infof ( "creating base table %s" , prefixFull )
baseTable := path . Join ( prefixFull , abstract_sql . DEFAULT_TABLE )
baseTable := path . Join ( prefixFull , abstract_sql . DEFAULT_TABLE )
if err := store . DB . Table ( ) . Do ( ctx , func ( ctx context . Context , s table . Session ) error {
if err := store . DB . Table ( ) . Do ( ctx , func ( ctx context . Context , s table . Session ) error {
return s . CreateTable ( ctx , baseTable , createTableOptions ( ) ... )
return s . CreateTable ( ctx , baseTable , store . createTableOptions ( ) ... )
} ) ; err != nil {
} ) ; err != nil {
return fmt . Errorf ( "failed to create base table %s: %v" , baseTable , err )
return fmt . Errorf ( "failed to create base table %s: %v" , baseTable , err )
}
}
@ -457,7 +503,7 @@ func (store *YdbStore) ensureTables(ctx context.Context) error {
glog . V ( 4 ) . Infof ( "creating bucket table %s" , bucket )
glog . V ( 4 ) . Infof ( "creating bucket table %s" , bucket )
bucketTable := path . Join ( prefixFull , bucket , abstract_sql . DEFAULT_TABLE )
bucketTable := path . Join ( prefixFull , bucket , abstract_sql . DEFAULT_TABLE )
if err := store . DB . Table ( ) . Do ( ctx , func ( ctx context . Context , s table . Session ) error {
if err := store . DB . Table ( ) . Do ( ctx , func ( ctx context . Context , s table . Session ) error {
return s . CreateTable ( ctx , bucketTable , createTableOptions ( ) ... )
return s . CreateTable ( ctx , bucketTable , store . createTableOptions ( ) ... )
} ) ; err != nil {
} ) ; err != nil {
glog . Errorf ( "failed to create bucket table %s: %v" , bucketTable , err )
glog . Errorf ( "failed to create bucket table %s: %v" , bucketTable , err )
}
}