diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index a83b33341..0f7a8b06c 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -66,11 +66,27 @@ const ( DEFAULT_TABLE = "filemeta" ) -type TxOrDB interface { +type TxOrDBWithoutClose interface { ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) } +type TxOrDB interface { + TxOrDBWithoutClose + Close() error +} + +type TxOrDbWrapper struct { + TxOrDBWithoutClose + CloseFunc func() error +} + +func (t *TxOrDbWrapper) Close() error { + if t.CloseFunc == nil { + return nil + } + return t.CloseFunc() +} func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) { tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{ @@ -102,9 +118,12 @@ func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.Full bucket = DEFAULT_TABLE if tx, ok := ctx.Value("tx").(*sql.Tx); ok { - txOrDB = tx + txOrDB = &TxOrDbWrapper{tx, nil} } else { - txOrDB = store.DB + txOrDB, err = store.DB.Conn(context.Background()) + if err != nil { + return + } } if !store.SupportBucketTable { @@ -155,6 +174,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent if err != nil { return fmt.Errorf("findDB %s : %v", entry.FullPath, err) } + defer db.Close() dir, name := shortPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() @@ -191,6 +211,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Ent if err != nil { return fmt.Errorf("findDB %s : %v", entry.FullPath, err) } + defer db.Close() dir, name := shortPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() @@ -216,6 +237,7 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full if err != nil { return nil, fmt.Errorf("findDB %s : %v", fullpath, err) } + defer db.Close() dir, name := shortPath.DirAndName() row := db.QueryRowContext(ctx, store.GetSqlFind(bucket), util.HashStringToLong(dir), name, dir) @@ -244,6 +266,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.Fu if err != nil { return fmt.Errorf("findDB %s : %v", fullpath, err) } + defer db.Close() dir, name := shortPath.DirAndName() @@ -266,6 +289,7 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat if err != nil { return fmt.Errorf("findDB %s : %v", fullpath, err) } + defer db.Close() if isValidBucket(bucket) && shortPath == "/" { if err = store.deleteTable(ctx, bucket); err == nil { @@ -297,6 +321,7 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, if err != nil { return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err) } + defer db.Close() sqlText := store.GetSqlListExclusive(bucket) if includeStartFile {