@ -28,10 +28,11 @@ type SqlGenerator interface {
type AbstractSqlStore struct {
SqlGenerator
DB * sql . DB
SupportBucketTable bool
dbs map [ string ] bool
dbsLock sync . Mutex
DB * sql . DB
SupportBucketTable bool
dbs map [ string ] bool
dbsLock sync . Mutex
RetryableErrorCallback func ( err error ) bool
}
var _ filer . BucketAware = ( * AbstractSqlStore ) ( nil )
@ -151,63 +152,86 @@ func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.Full
func ( store * AbstractSqlStore ) InsertEntry ( ctx context . Context , entry * filer . Entry ) ( err error ) {
db , bucket , shortPath , err := store . getTxOrDB ( ctx , entry . FullPath , false )
if err != nil {
return fmt . Errorf ( "findDB %s : %v" , entry . FullPath , err )
}
// define the work to be done
var doInsert func ( ) error
doInsert = func ( ) error {
db , bucket , shortPath , err := store . getTxOrDB ( ctx , entry . FullPath , false )
if err != nil {
return fmt . Errorf ( "findDB %s : %w" , entry . FullPath , err )
}
dir , name := shortPath . DirAndName ( )
meta , err := entry . EncodeAttributesAndChunks ( )
if err != nil {
return fmt . Errorf ( "encode %s: %s " , entry . FullPath , err )
}
dir , name := shortPath . DirAndName ( )
meta , err := entry . EncodeAttributesAndChunks ( )
if err != nil {
return fmt . Errorf ( "encode %s: %w " , entry . FullPath , err )
}
if len ( entry . GetChunks ( ) ) > filer . CountEntryChunksForGzip {
meta = util . MaybeGzipData ( meta )
}
sqlInsert := "insert"
res , err := db . ExecContext ( ctx , store . GetSqlInsert ( bucket ) , util . HashStringToLong ( dir ) , name , dir , meta )
if err != nil && strings . Contains ( strings . ToLower ( err . Error ( ) ) , "duplicate entry" ) {
// now the insert failed possibly due to duplication constraints
sqlInsert = "falls back to update"
glog . V ( 1 ) . InfofCtx ( ctx , "insert %s %s: %v" , entry . FullPath , sqlInsert , err )
res , err = db . ExecContext ( ctx , store . GetSqlUpdate ( bucket ) , meta , util . HashStringToLong ( dir ) , name , dir )
}
if err != nil {
return fmt . Errorf ( "%s %s: %s " , sqlInsert , entry . FullPath , err )
}
if len ( entry . GetChunks ( ) ) > filer . CountEntryChunksForGzip {
meta = util . MaybeGzipData ( meta )
}
sqlInsert := "insert"
res , err := db . ExecContext ( ctx , store . GetSqlInsert ( bucket ) , util . HashStringToLong ( dir ) , name , dir , meta )
if err != nil && strings . Contains ( strings . ToLower ( err . Error ( ) ) , "duplicate entry" ) {
// now the insert failed possibly due to duplication constraints
sqlInsert = "falls back to update"
glog . V ( 1 ) . InfofCtx ( ctx , "insert %s %s: %v" , entry . FullPath , sqlInsert , err )
res , err = db . ExecContext ( ctx , store . GetSqlUpdate ( bucket ) , meta , util . HashStringToLong ( dir ) , name , dir )
}
if err != nil {
return fmt . Errorf ( "%s %s: %w " , sqlInsert , entry . FullPath , err )
}
_ , err = res . RowsAffected ( )
if err != nil {
return fmt . Errorf ( "%s %s but no rows affected: %s" , sqlInsert , entry . FullPath , err )
_ , err = res . RowsAffected ( )
if err != nil {
return fmt . Errorf ( "%s %s but no rows affected: %w" , sqlInsert , entry . FullPath , err )
}
return nil
}
return nil
if store . RetryableErrorCallback != nil {
if ctx . Value ( "tx" ) != nil {
return doInsert ( )
}
return util . RetryUntil ( "InsertEntry" , doInsert , store . RetryableErrorCallback )
}
return doInsert ( )
}
func ( store * AbstractSqlStore ) UpdateEntry ( ctx context . Context , entry * filer . Entry ) ( err error ) {
db , bucket , shortPath , err := store . getTxOrDB ( ctx , entry . FullPath , false )
if err != nil {
return fmt . Errorf ( "findDB %s : %v" , entry . FullPath , err )
}
var doUpdate func ( ) error
doUpdate = func ( ) error {
db , bucket , shortPath , err := store . getTxOrDB ( ctx , entry . FullPath , false )
if err != nil {
return fmt . Errorf ( "findDB %s : %w" , entry . FullPath , err )
}
dir , name := shortPath . DirAndName ( )
meta , err := entry . EncodeAttributesAndChunks ( )
if err != nil {
return fmt . Errorf ( "encode %s: %s " , entry . FullPath , err )
}
dir , name := shortPath . DirAndName ( )
meta , err := entry . EncodeAttributesAndChunks ( )
if err != nil {
return fmt . Errorf ( "encode %s: %w " , entry . FullPath , err )
}
res , err := db . ExecContext ( ctx , store . GetSqlUpdate ( bucket ) , meta , util . HashStringToLong ( dir ) , name , dir )
if err != nil {
return fmt . Errorf ( "update %s: %s" , entry . FullPath , err )
res , err := db . ExecContext ( ctx , store . GetSqlUpdate ( bucket ) , meta , util . HashStringToLong ( dir ) , name , dir )
if err != nil {
return fmt . Errorf ( "update %s: %w" , entry . FullPath , err )
}
_ , err = res . RowsAffected ( )
if err != nil {
return fmt . Errorf ( "update %s but no rows affected: %w" , entry . FullPath , err )
}
return nil
}
_ , err = res . RowsAffected ( )
if err != nil {
return fmt . Errorf ( "update %s but no rows affected: %s" , entry . FullPath , err )
if store . RetryableErrorCallback != nil {
if ctx . Value ( "tx" ) != nil {
return doUpdate ( )
}
return util . RetryUntil ( "UpdateEntry" , doUpdate , store . RetryableErrorCallback )
}
return nil
return doUpdate ( )
}
func ( store * AbstractSqlStore ) FindEntry ( ctx context . Context , fullpath util . FullPath ) ( * filer . Entry , error ) {
@ -240,55 +264,76 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full
func ( store * AbstractSqlStore ) DeleteEntry ( ctx context . Context , fullpath util . FullPath ) error {
db , bucket , shortPath , err := store . getTxOrDB ( ctx , fullpath , false )
if err != nil {
return fmt . Errorf ( "findDB %s : %v" , fullpath , err )
}
var doDelete func ( ) error
doDelete = func ( ) error {
db , bucket , shortPath , err := store . getTxOrDB ( ctx , fullpath , false )
if err != nil {
return fmt . Errorf ( "findDB %s : %w" , fullpath , err )
}
dir , name := shortPath . DirAndName ( )
dir , name := shortPath . DirAndName ( )
res , err := db . ExecContext ( ctx , store . GetSqlDelete ( bucket ) , util . HashStringToLong ( dir ) , name , dir )
if err != nil {
return fmt . Errorf ( "delete %s: %s " , fullpath , err )
}
res , err := db . ExecContext ( ctx , store . GetSqlDelete ( bucket ) , util . HashStringToLong ( dir ) , name , dir )
if err != nil {
return fmt . Errorf ( "delete %s: %w " , fullpath , err )
}
_ , err = res . RowsAffected ( )
if err != nil {
return fmt . Errorf ( "delete %s but no rows affected: %s" , fullpath , err )
_ , err = res . RowsAffected ( )
if err != nil {
return fmt . Errorf ( "delete %s but no rows affected: %w" , fullpath , err )
}
return nil
}
return nil
if store . RetryableErrorCallback != nil {
if ctx . Value ( "tx" ) != nil {
return doDelete ( )
}
return util . RetryUntil ( "DeleteEntry" , doDelete , store . RetryableErrorCallback )
}
return doDelete ( )
}
func ( store * AbstractSqlStore ) DeleteFolderChildren ( ctx context . Context , fullpath util . FullPath ) error {
db , bucket , shortPath , err := store . getTxOrDB ( ctx , fullpath , true )
if err != nil {
return fmt . Errorf ( "findDB %s : %v" , fullpath , err )
}
var doDeleteFolderChildren func ( ) error
doDeleteFolderChildren = func ( ) error {
db , bucket , shortPath , err := store . getTxOrDB ( ctx , fullpath , true )
if err != nil {
return fmt . Errorf ( "findDB %s : %w" , fullpath , err )
}
if isValidBucket ( bucket ) && shortPath == "/" {
if err = store . deleteTable ( ctx , bucket ) ; err == nil {
store . dbsLock . Lock ( )
delete ( store . dbs , bucket )
store . dbsLock . Unlock ( )
return nil
} else {
return err
if isValidBucket ( bucket ) && shortPath == "/" {
if err = store . deleteTable ( ctx , bucket ) ; err == nil {
store . dbsLock . Lock ( )
delete ( store . dbs , bucket )
store . dbsLock . Unlock ( )
return nil
} else {
return err
}
}
}
glog . V ( 4 ) . InfofCtx ( ctx , "delete %s SQL %s %d" , string ( shortPath ) , store . GetSqlDeleteFolderChildren ( bucket ) , util . HashStringToLong ( string ( shortPath ) ) )
res , err := db . ExecContext ( ctx , store . GetSqlDeleteFolderChildren ( bucket ) , util . HashStringToLong ( string ( shortPath ) ) , string ( shortPath ) )
if err != nil {
return fmt . Errorf ( "deleteFolderChildren %s: %s" , fullpath , err )
glog . V ( 4 ) . InfofCtx ( ctx , "delete %s SQL %s %d" , string ( shortPath ) , store . GetSqlDeleteFolderChildren ( bucket ) , util . HashStringToLong ( string ( shortPath ) ) )
res , err := db . ExecContext ( ctx , store . GetSqlDeleteFolderChildren ( bucket ) , util . HashStringToLong ( string ( shortPath ) ) , string ( shortPath ) )
if err != nil {
return fmt . Errorf ( "deleteFolderChildren %s: %w" , fullpath , err )
}
_ , err = res . RowsAffected ( )
if err != nil {
return fmt . Errorf ( "deleteFolderChildren %s but no rows affected: %w" , fullpath , err )
}
return nil
}
_ , err = res . RowsAffected ( )
if err != nil {
return fmt . Errorf ( "deleteFolderChildren %s but no rows affected: %s" , fullpath , err )
if store . RetryableErrorCallback != nil {
if ctx . Value ( "tx" ) != nil {
return doDeleteFolderChildren ( )
}
return util . RetryUntil ( "DeleteFolderChildren" , doDeleteFolderChildren , store . RetryableErrorCallback )
}
return nil
return doDeleteFolderChildren ( )
}
func ( store * AbstractSqlStore ) ListDirectoryPrefixedEntries ( ctx context . Context , dirPath util . FullPath , startFileName string , includeStartFile bool , limit int64 , prefix string , eachEntryFunc filer . ListEachEntryFunc ) ( lastFileName string , err error ) {