diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index 0b27104cf..69ea8d2ab 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -28,10 +28,11 @@ type SqlGenerator interface { type AbstractSqlStore struct { SqlGenerator - DB *sql.DB - SupportBucketTable bool - dbs map[string]bool - dbsLock sync.Mutex + DB *sql.DB + SupportBucketTable bool + dbs map[string]bool + dbsLock sync.Mutex + RetryableErrorCallback func(err error) bool } var _ filer.BucketAware = (*AbstractSqlStore)(nil) @@ -151,63 +152,86 @@ func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.Full func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { - db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) - if err != nil { - return fmt.Errorf("findDB %s : %v", entry.FullPath, err) - } + // define the work to be done + var doInsert func() error + doInsert = func() error { + db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) + if err != nil { + return fmt.Errorf("findDB %s : %w", entry.FullPath, err) + } - dir, name := shortPath.DirAndName() - meta, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encode %s: %s", entry.FullPath, err) - } + dir, name := shortPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %w", entry.FullPath, err) + } - if len(entry.GetChunks()) > filer.CountEntryChunksForGzip { - meta = util.MaybeGzipData(meta) - } - sqlInsert := "insert" - res, err := db.ExecContext(ctx, store.GetSqlInsert(bucket), util.HashStringToLong(dir), name, dir, meta) - if err != nil && strings.Contains(strings.ToLower(err.Error()), "duplicate entry") { - // now the insert failed possibly due to duplication constraints - sqlInsert = "falls back to update" - glog.V(1).InfofCtx(ctx, "insert %s %s: %v", entry.FullPath, sqlInsert, err) - res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir) - } - if err != nil { - return fmt.Errorf("%s %s: %s", sqlInsert, entry.FullPath, err) - } + if len(entry.GetChunks()) > filer.CountEntryChunksForGzip { + meta = util.MaybeGzipData(meta) + } + sqlInsert := "insert" + res, err := db.ExecContext(ctx, store.GetSqlInsert(bucket), util.HashStringToLong(dir), name, dir, meta) + if err != nil && strings.Contains(strings.ToLower(err.Error()), "duplicate entry") { + // now the insert failed possibly due to duplication constraints + sqlInsert = "falls back to update" + glog.V(1).InfofCtx(ctx, "insert %s %s: %v", entry.FullPath, sqlInsert, err) + res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir) + } + if err != nil { + return fmt.Errorf("%s %s: %w", sqlInsert, entry.FullPath, err) + } - _, err = res.RowsAffected() - if err != nil { - return fmt.Errorf("%s %s but no rows affected: %s", sqlInsert, entry.FullPath, err) + _, err = res.RowsAffected() + if err != nil { + return fmt.Errorf("%s %s but no rows affected: %w", sqlInsert, entry.FullPath, err) + } + + return nil } - return nil + if store.RetryableErrorCallback != nil { + if ctx.Value("tx") != nil { + return doInsert() + } + return util.RetryUntil("InsertEntry", doInsert, store.RetryableErrorCallback) + } + return doInsert() } func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { - db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) - if err != nil { - return fmt.Errorf("findDB %s : %v", entry.FullPath, err) - } + var doUpdate func() error + doUpdate = func() error { + db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) + if err != nil { + return fmt.Errorf("findDB %s : %w", entry.FullPath, err) + } - dir, name := shortPath.DirAndName() - meta, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encode %s: %s", entry.FullPath, err) - } + dir, name := shortPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %w", entry.FullPath, err) + } - res, err := db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir) - if err != nil { - return fmt.Errorf("update %s: %s", entry.FullPath, err) + res, err := db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir) + if err != nil { + return fmt.Errorf("update %s: %w", entry.FullPath, err) + } + + _, err = res.RowsAffected() + if err != nil { + return fmt.Errorf("update %s but no rows affected: %w", entry.FullPath, err) + } + return nil } - _, err = res.RowsAffected() - if err != nil { - return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err) + if store.RetryableErrorCallback != nil { + if ctx.Value("tx") != nil { + return doUpdate() + } + return util.RetryUntil("UpdateEntry", doUpdate, store.RetryableErrorCallback) } - return nil + return doUpdate() } func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) { @@ -240,55 +264,76 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { - db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false) - if err != nil { - return fmt.Errorf("findDB %s : %v", fullpath, err) - } + var doDelete func() error + doDelete = func() error { + db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false) + if err != nil { + return fmt.Errorf("findDB %s : %w", fullpath, err) + } - dir, name := shortPath.DirAndName() + dir, name := shortPath.DirAndName() - res, err := db.ExecContext(ctx, store.GetSqlDelete(bucket), util.HashStringToLong(dir), name, dir) - if err != nil { - return fmt.Errorf("delete %s: %s", fullpath, err) - } + res, err := db.ExecContext(ctx, store.GetSqlDelete(bucket), util.HashStringToLong(dir), name, dir) + if err != nil { + return fmt.Errorf("delete %s: %w", fullpath, err) + } - _, err = res.RowsAffected() - if err != nil { - return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err) + _, err = res.RowsAffected() + if err != nil { + return fmt.Errorf("delete %s but no rows affected: %w", fullpath, err) + } + return nil } - return nil + if store.RetryableErrorCallback != nil { + if ctx.Value("tx") != nil { + return doDelete() + } + return util.RetryUntil("DeleteEntry", doDelete, store.RetryableErrorCallback) + } + return doDelete() } func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { - db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true) - if err != nil { - return fmt.Errorf("findDB %s : %v", fullpath, err) - } + var doDeleteFolderChildren func() error + doDeleteFolderChildren = func() error { + db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true) + if err != nil { + return fmt.Errorf("findDB %s : %w", fullpath, err) + } - if isValidBucket(bucket) && shortPath == "/" { - if err = store.deleteTable(ctx, bucket); err == nil { - store.dbsLock.Lock() - delete(store.dbs, bucket) - store.dbsLock.Unlock() - return nil - } else { - return err + if isValidBucket(bucket) && shortPath == "/" { + if err = store.deleteTable(ctx, bucket); err == nil { + store.dbsLock.Lock() + delete(store.dbs, bucket) + store.dbsLock.Unlock() + return nil + } else { + return err + } } - } - glog.V(4).InfofCtx(ctx, "delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath))) - res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath)) - if err != nil { - return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) + glog.V(4).InfofCtx(ctx, "delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath))) + res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath)) + if err != nil { + return fmt.Errorf("deleteFolderChildren %s: %w", fullpath, err) + } + + _, err = res.RowsAffected() + if err != nil { + return fmt.Errorf("deleteFolderChildren %s but no rows affected: %w", fullpath, err) + } + return nil } - _, err = res.RowsAffected() - if err != nil { - return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err) + if store.RetryableErrorCallback != nil { + if ctx.Value("tx") != nil { + return doDeleteFolderChildren() + } + return util.RetryUntil("DeleteFolderChildren", doDeleteFolderChildren, store.RetryableErrorCallback) } - return nil + return doDeleteFolderChildren() } func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { diff --git a/weed/filer/mysql/mysql_store.go b/weed/filer/mysql/mysql_store.go index a37d4dbd3..a4f8a3ce2 100644 --- a/weed/filer/mysql/mysql_store.go +++ b/weed/filer/mysql/mysql_store.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "crypto/x509" "database/sql" + "errors" "fmt" "os" "strings" @@ -67,6 +68,19 @@ func (store *MysqlStore) initialize(dsn string, upsertQuery string, enableUpsert UpsertQueryTemplate: upsertQuery, } + store.RetryableErrorCallback = func(err error) bool { + var mysqlError *mysql.MySQLError + if errors.As(err, &mysqlError) { + if mysqlError.Number == 1213 { // ER_LOCK_DEADLOCK + return true + } + if mysqlError.Number == 1205 { // ER_LOCK_WAIT_TIMEOUT + return true + } + } + return false + } + if enableTls { rootCertPool := x509.NewCertPool() pem, err := os.ReadFile(caCrtDir) diff --git a/weed/util/retry.go b/weed/util/retry.go index 150c10af5..756d30d50 100644 --- a/weed/util/retry.go +++ b/weed/util/retry.go @@ -58,13 +58,13 @@ func MultiRetry(name string, errList []string, job func() error) (err error) { } // RetryUntil retries until the job returns no error or onErrFn returns false -func RetryUntil(name string, job func() error, onErrFn func(err error) (shouldContinue bool)) { +func RetryUntil(name string, job func() error, onErrFn func(err error) (shouldContinue bool)) error { waitTime := time.Second for { err := job() if err == nil { waitTime = time.Second - break + return nil } if onErrFn(err) { if strings.Contains(err.Error(), "transport") || strings.Contains(err.Error(), "ResourceExhausted") || strings.Contains(err.Error(), "Unavailable") { @@ -76,7 +76,7 @@ func RetryUntil(name string, job func() error, onErrFn func(err error) (shouldCo } continue } else { - break + return err } } } diff --git a/weed/util/retry_test.go b/weed/util/retry_test.go new file mode 100644 index 000000000..23c4132fe --- /dev/null +++ b/weed/util/retry_test.go @@ -0,0 +1,65 @@ +package util + +import ( + "errors" + "testing" +) + +func TestRetryUntil(t *testing.T) { + // Test case 1: Function succeeds immediately + t.Run("SucceedsImmediately", func(t *testing.T) { + callCount := 0 + err := RetryUntil("test", func() error { + callCount++ + return nil + }, func(err error) bool { + return false + }) + + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + if callCount != 1 { + t.Errorf("Expected 1 call, got %d", callCount) + } + }) + + // Test case 2: Function fails with retryable error, then succeeds + t.Run("SucceedsAfterRetry", func(t *testing.T) { + callCount := 0 + err := RetryUntil("test", func() error { + callCount++ + if callCount < 3 { + return errors.New("retryable error") + } + return nil + }, func(err error) bool { + return err.Error() == "retryable error" + }) + + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + if callCount != 3 { + t.Errorf("Expected 3 calls, got %d", callCount) + } + }) + + // Test case 3: Function fails with non-retryable error + t.Run("FailsNonRetryable", func(t *testing.T) { + callCount := 0 + err := RetryUntil("test", func() error { + callCount++ + return errors.New("fatal error") + }, func(err error) bool { + return err.Error() == "retryable error" + }) + + if err == nil || err.Error() != "fatal error" { + t.Errorf("Expected 'fatal error', got %v", err) + } + if callCount != 1 { + t.Errorf("Expected 1 call, got %d", callCount) + } + }) +}