@ -7,9 +7,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/yandex-cloud/ydb-go-sdk/v2"
"github.com/yandex-cloud/ydb-go-sdk/v2/connect"
"github.com/yandex-cloud/ydb-go-sdk/v2/table"
environ "github.com/ydb-platform/ydb-go-sdk-auth-environ"
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/sugar"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"path"
"strings"
"time"
@ -24,15 +27,12 @@ var (
table . BeginTx ( table . WithOnlineReadOnly ( ) ) ,
table . CommitTx ( ) ,
)
rwTX = table . TxControl (
table . BeginTx ( table . WithSerializableReadWrite ( ) ) ,
table . CommitTx ( ) ,
)
rwTX = table . DefaultTxControl ( )
)
type YdbStore struct {
SupportBucketTable bool
DB * connect . Connection
DB ydb . Connection
dirBuckets string
tablePathPrefix string
}
@ -48,34 +48,43 @@ func (store *YdbStore) GetName() string {
func ( store * YdbStore ) Initialize ( configuration util . Configuration , prefix string ) ( err error ) {
return store . initialize (
configuration . GetString ( "filer.options.buckets_folder" ) ,
configuration . GetString ( prefix + "coonectionUrl " ) ,
configuration . GetString ( prefix + "dsn " ) ,
configuration . GetString ( prefix + "tablePathPrefix" ) ,
configuration . GetBool ( prefix + "useBucketPrefix" ) ,
configuration . GetInt ( prefix + "connectionTimeOut" ) ,
configuration . GetInt ( prefix + "poolSizeLimit" ) ,
)
}
func ( store * YdbStore ) initialize ( dirBuckets string , sqlUrl string , tablePathPrefix string , useBucketPrefix bool , connectionTimeOut int ) ( err error ) {
func ( store * YdbStore ) initialize ( dirBuckets string , dsn string , tablePathPrefix string , useBucketPrefix bool , connectionTimeOut int , poolSizeLimi t int ) ( err error ) {
store . dirBuckets = dirBuckets
store . tablePathPrefix = tablePathPrefix
store . SupportBucketTable = useBucketPrefix
ctx , cancel := context . WithCancel ( context . Background ( ) )
defer cancel ( )
if connectionTimeOut == 0 {
connectionTimeOut = defaultConnectionTimeOut
}
var cancel context . CancelFunc
connCtx , cancel := context . WithTimeout ( context . Background ( ) , time . Duration ( connectionTimeOut ) * time . Second )
defer cancel ( )
connParams := connect . MustConnectionString ( sqlUrl )
store . DB , err = connect . New ( connCtx , connParams )
opts := [ ] ydb . Option {
environ . WithEnvironCredentials ( ctx ) ,
ydb . WithDialTimeout ( time . Duration ( connectionTimeOut ) * time . Second ) ,
}
if poolSizeLimit > 0 {
opts = append ( opts , ydb . WithSessionPoolSizeLimit ( poolSizeLimit ) )
}
store . DB , err = ydb . Open ( ctx , dsn , opts ... )
if err != nil {
store . DB . Close ( )
_ = store . DB . Close ( ctx )
store . DB = nil
return fmt . Errorf ( "can not connect to %s error:%v" , sqlUrl , err )
return fmt . Errorf ( "can not connect to %s error:%v" , dsn , err )
}
defer store . DB . Close ( )
if err = store . DB . EnsurePathExists ( connCtx , connParams . Database ( ) ) ; err != nil {
return fmt . Errorf ( "connect to %s error:%v" , sqlUrl , err )
defer func ( ) { _ = store . DB . Close ( ctx ) } ( )
store . tablePathPrefix = path . Join ( store . DB . Name ( ) , tablePathPrefix )
if err = sugar . RemoveRecursive ( ctx , store . DB , store . tablePathPrefix ) ; err != nil {
return fmt . Errorf ( "RemoveRecursive %s : %v" , store . tablePathPrefix , err )
}
if err = sugar . MakeRecursive ( ctx , store . DB , store . tablePathPrefix ) ; err != nil {
return fmt . Errorf ( "MakeRecursive %s : %v" , store . tablePathPrefix , err )
}
return nil
}
@ -92,16 +101,14 @@ func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Ent
}
fileMeta := FileMeta { util . HashStringToLong ( dir ) , name , dir , meta }
return table . Retry ( ctx , store . DB . Table ( ) . Pool ( ) ,
table . OperationFunc ( func ( ctx context . Context , s * table . Session ) ( err error ) {
stmt , err := s . Prepare ( ctx , store . withPragma ( store . getPrefix ( dir ) , query ) )
if err != nil {
return err
}
_ , _ , err = stmt . Execute ( ctx , rwTX , fileMeta . QueryParameters ( ) )
return err
} ) ,
)
return store . DB . Table ( ) . Do ( ctx , func ( ctx context . Context , s table . Session ) ( err error ) {
stmt , err := s . Prepare ( ctx , store . withPragma ( store . getPrefix ( dir ) , query ) )
if err != nil {
return fmt . Errorf ( "Prepare %s : %v" , dir , err )
}
_ , _ , err = stmt . Execute ( ctx , rwTX , fileMeta . QueryParameters ( ) )
return err
} )
}
func ( store * YdbStore ) InsertEntry ( ctx context . Context , entry * filer . Entry ) ( err error ) {
@ -114,68 +121,71 @@ func (store *YdbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err
func ( store * YdbStore ) FindEntry ( ctx context . Context , fullpath util . FullPath ) ( entry * filer . Entry , err error ) {
dir , name := fullpath . DirAndName ( )
var res * table . Result
err = table . Retry ( ctx , store . DB . Table ( ) . Pool ( ) ,
table . OperationFunc ( func ( ctx context . Context , s * table . Session ) ( err error ) {
stmt , err := s . Prepare ( ctx , store . withPragma ( store . getPrefix ( dir ) , findQuery ) )
if err != nil {
return err
var data [ ] byte
entryFound := false
err = store . DB . Table ( ) . Do ( ctx , func ( ctx context . Context , s table . Session ) error {
stmt , err := s . Prepare ( ctx , store . withPragma ( store . getPrefix ( dir ) , findQuery ) )
if err != nil {
return fmt . Errorf ( "Prepare %s : %v" , entry . FullPath , err )
}
_ , res , err := stmt . Execute ( ctx , roTX , table . NewQueryParameters (
table . ValueParam ( "$dir_hash" , types . Int64Value ( util . HashStringToLong ( dir ) ) ) ,
table . ValueParam ( "$name" , types . UTF8Value ( name ) ) ) )
if err != nil {
return fmt . Errorf ( "Execute %s : %v" , entry . FullPath , err )
}
defer func ( ) {
_ = res . Close ( )
} ( )
for res . NextRow ( ) {
if err := res . ScanNamed ( named . Required ( "meta" , & data ) ) ; err != nil {
return fmt . Errorf ( "scanNamed %s : %v" , entry . FullPath , err )
}
_ , res , err = stmt . Execute ( ctx , roTX , table . NewQueryParameters (
table . ValueParam ( "$dir_hash" , ydb . Int64Value ( util . HashStringToLong ( dir ) ) ) ,
table . ValueParam ( "$name" , ydb . UTF8Value ( name ) ) ) )
return err
} ) ,
)
entryFound = true
return nil
}
return res . Err ( )
} )
if err != nil {
return nil , err
}
defer res . Close ( )
for res . NextResultSet ( ctx ) {
for res . NextRow ( ) {
res . SeekItem ( "meta" )
entry . FullPath = fullpath
if err := entry . DecodeAttributesAndChunks ( util . MaybeDecompressData ( res . String ( ) ) ) ; err != nil {
return entry , fmt . Errorf ( "decode %s : %v" , entry . FullPath , err )
}
return entry , nil
}
if ! entryFound {
return nil , filer_pb . ErrNotFound
}
entry . FullPath = fullpath
if err := entry . DecodeAttributesAndChunks ( util . MaybeDecompressData ( data ) ) ; err != nil {
return nil , fmt . Errorf ( "decode %s : %v" , entry . FullPath , err )
}
return nil , filer_pb . ErrNotFound
return entry , nil
}
func ( store * YdbStore ) DeleteEntry ( ctx context . Context , fullpath util . FullPath ) ( err error ) {
dir , name := fullpath . DirAndName ( )
return table . Retry ( ctx , store . DB . Table ( ) . Pool ( ) ,
table . OperationFunc ( func ( ctx context . Context , s * table . Session ) ( err error ) {
stmt , err := s . Prepare ( ctx , store . withPragma ( store . getPrefix ( dir ) , deleteQuery ) )
if err != nil {
return err
}
_ , _ , err = stmt . Execute ( ctx , rwTX , table . NewQueryParameters (
table . ValueParam ( "$dir_hash" , ydb . Int64Value ( util . HashStringToLong ( dir ) ) ) ,
table . ValueParam ( "$name" , ydb . UTF8Value ( name ) ) ) )
return err
} ) ,
)
return store . DB . Table ( ) . Do ( ctx , func ( ctx context . Context , s table . Session ) ( err error ) {
stmt , err := s . Prepare ( ctx , store . withPragma ( store . getPrefix ( dir ) , deleteQuery ) )
if err != nil {
return fmt . Errorf ( "Prepare %s : %v" , dir , err )
}
_ , _ , err = stmt . Execute ( ctx , rwTX , table . NewQueryParameters (
table . ValueParam ( "$dir_hash" , types . Int64Value ( util . HashStringToLong ( dir ) ) ) ,
table . ValueParam ( "$name" , types . UTF8Value ( name ) ) ) )
return err
} )
}
func ( store * YdbStore ) DeleteFolderChildren ( ctx context . Context , fullpath util . FullPath ) ( err error ) {
dir , _ := fullpath . DirAndName ( )
return table . Retry ( ctx , store . DB . Table ( ) . Pool ( ) ,
table . OperationFunc ( func ( ctx context . Context , s * table . Session ) ( err error ) {
stmt , err := s . Prepare ( ctx , store . withPragma ( store . getPrefix ( dir ) , deleteFolderChildrenQuery ) )
if err != nil {
return err
}
_ , _ , err = stmt . Execute ( ctx , rwTX , table . NewQueryParameters (
table . ValueParam ( "$dir_hash" , ydb . Int64Value ( util . HashStringToLong ( dir ) ) ) ,
table . ValueParam ( "$directory" , ydb . UTF8Value ( dir ) ) ) )
return err
} ) ,
)
return store . DB . Table ( ) . Do ( ctx , func ( ctx context . Context , s table . Session ) ( err error ) {
stmt , err := s . Prepare ( ctx , store . withPragma ( store . getPrefix ( dir ) , deleteFolderChildrenQuery ) )
if err != nil {
return fmt . Errorf ( "Prepare %s : %v" , dir , err )
}
_ , _ , err = stmt . Execute ( ctx , rwTX , table . NewQueryParameters (
table . ValueParam ( "$dir_hash" , types . Int64Value ( util . HashStringToLong ( dir ) ) ) ,
table . ValueParam ( "$directory" , types . UTF8Value ( dir ) ) ) )
return err
} )
}
func ( store * YdbStore ) ListDirectoryEntries ( ctx context . Context , dirPath util . FullPath , startFileName string , includeStartFile bool , limit int64 , eachEntryFunc filer . ListEachEntryFunc ) ( lastFileName string , err error ) {
@ -184,62 +194,60 @@ func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.Fu
func ( store * YdbStore ) ListDirectoryPrefixedEntries ( ctx context . Context , dirPath util . FullPath , startFileName string , includeStartFile bool , limit int64 , prefix string , eachEntryFunc filer . ListEachEntryFunc ) ( lastFileName string , err error ) {
dir := string ( dirPath )
var res * table . Result
startFileCompOp := ">"
if includeStartFile {
startFileCompOp = ">="
}
err = table . Retry ( ctx , store . DB . Table ( ) . Pool ( ) ,
table . OperationFunc ( func ( ctx context . Context , s * table . Session ) ( err error ) {
stmt , err := s . Prepare ( ctx , store . withPragma ( store . getPrefix ( dir ) , fmt . Sprintf ( ListDirectoryQuery , startFileCompOp ) ) )
if err != nil {
return err
err = store . DB . Table ( ) . Do ( ctx , func ( ctx context . Context , s table . Session ) error {
stmt , err := s . Prepare ( ctx , store . withPragma ( store . getPrefix ( dir ) , fmt . Sprintf ( ListDirectoryQuery , startFileCompOp ) ) )
if err != nil {
return fmt . Errorf ( "Prepare %s : %v" , dir , err )
}
_ , res , err := stmt . Execute ( ctx , roTX , table . NewQueryParameters (
table . ValueParam ( "$dir_hash" , types . Int64Value ( util . HashStringToLong ( dir ) ) ) ,
table . ValueParam ( "$directory" , types . UTF8Value ( dir ) ) ,
table . ValueParam ( "$start_name" , types . UTF8Value ( startFileName ) ) ,
table . ValueParam ( "$prefix" , types . UTF8Value ( prefix ) ) ,
table . ValueParam ( "$limit" , types . Int64Value ( limit ) ) ,
) )
if err != nil {
return fmt . Errorf ( "Execute %s : %v" , dir , err )
}
defer func ( ) {
_ = res . Close ( )
} ( )
for res . NextResultSet ( ctx ) {
for res . NextRow ( ) {
var name string
var data [ ] byte
if err := res . ScanNamed (
named . Required ( "name" , & name ) ,
named . Required ( "meta" , & data ) ) ; err != nil {
return fmt . Errorf ( "scanNamed %s : %v" , dir , err )
}
lastFileName = name
entry := & filer . Entry {
FullPath : util . NewFullPath ( dir , name ) ,
}
if err = entry . DecodeAttributesAndChunks ( util . MaybeDecompressData ( data ) ) ; err != nil {
glog . V ( 0 ) . Infof ( "scan decode %s : %v" , entry . FullPath , err )
return fmt . Errorf ( "scan decode %s : %v" , entry . FullPath , err )
}
if ! eachEntryFunc ( entry ) {
break
}
}
_ , res , err = stmt . Execute ( ctx , roTX , table . NewQueryParameters (
table . ValueParam ( "$dir_hash" , ydb . Int64Value ( util . HashStringToLong ( dir ) ) ) ,
table . ValueParam ( "$directory" , ydb . UTF8Value ( dir ) ) ,
table . ValueParam ( "$start_name" , ydb . UTF8Value ( startFileName ) ) ,
table . ValueParam ( "$prefix" , ydb . UTF8Value ( prefix ) ) ,
table . ValueParam ( "$limit" , ydb . Int64Value ( limit ) ) ,
) )
return err
} ) ,
)
}
return res . Err ( )
} )
if err != nil {
return lastFileName , err
}
defer res . Close ( )
for res . NextSet ( ) {
for res . NextRow ( ) {
res . SeekItem ( "name" )
name := res . UTF8 ( )
res . SeekItem ( "meta" )
data := res . String ( )
if res . Err ( ) != nil {
glog . V ( 0 ) . Infof ( "scan %s : %v" , dirPath , err )
return lastFileName , fmt . Errorf ( "scan %s: %v" , dirPath , err )
}
lastFileName = name
entry := & filer . Entry {
FullPath : util . NewFullPath ( dir , name ) ,
}
if err = entry . DecodeAttributesAndChunks ( util . MaybeDecompressData ( data ) ) ; err != nil {
glog . V ( 0 ) . Infof ( "scan decode %s : %v" , entry . FullPath , err )
return lastFileName , fmt . Errorf ( "scan decode %s : %v" , entry . FullPath , err )
}
if ! eachEntryFunc ( entry ) {
break
}
}
}
return lastFileName , nil
}
func ( store * YdbStore ) BeginTransaction ( ctx context . Context ) ( context . Context , error ) {
session , err := store . DB . Table ( ) . Pool ( ) . Create ( ctx )
session , err := store . DB . Table ( ) . CreateSession ( ctx )
if err != nil {
return ctx , err
}
@ -251,21 +259,22 @@ func (store *YdbStore) BeginTransaction(ctx context.Context) (context.Context, e
}
func ( store * YdbStore ) CommitTransaction ( ctx context . Context ) error {
if tx , ok := ctx . Value ( "tx" ) . ( * table . Transaction ) ; ok {
return tx . Commit ( ctx )
if tx , ok := ctx . Value ( "tx" ) . ( table . Transaction ) ; ok {
_ , err := tx . CommitTx ( ctx )
return err
}
return nil
}
func ( store * YdbStore ) RollbackTransaction ( ctx context . Context ) error {
if tx , ok := ctx . Value ( "tx" ) . ( * table . Transaction ) ; ok {
if tx , ok := ctx . Value ( "tx" ) . ( table . Transaction ) ; ok {
return tx . Rollback ( ctx )
}
return nil
}
func ( store * YdbStore ) Shutdown ( ) {
store . DB . Close ( )
_ = store . DB . Close ( context . Background ( ) )
}
func ( store * YdbStore ) getPrefix ( dir string ) string {