From 5287d9f3e32000c7a4b8c19ea7cd28bc5d1cd60e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 26 Nov 2025 14:45:56 -0800 Subject: [PATCH] fix(tikv): replace DeleteRange with transaction-based batch deletes (#7557) * fix(tikv): replace DeleteRange with transaction-based batch deletes Fixes #7187 Problem: TiKV's DeleteRange API is a RawKV operation that bypasses transaction isolation. When SeaweedFS filer uses TiKV with txn client and another service uses RawKV client on the same cluster, DeleteFolderChildren can accidentally delete KV pairs from the RawKV client because DeleteRange operates at the raw key level without respecting transaction boundaries. Reproduction: 1. SeaweedFS filer using TiKV txn client for metadata 2. Another service using rawkv client on same TiKV cluster 3. Filer performs batch file deletion via DeleteFolderChildren 4. Result: ~50% of rawkv client's KV pairs get deleted Solution: Replace client.DeleteRange() (RawKV API) with transactional batch deletes using txn.Delete() within transactions. This ensures: - Transaction isolation - operations respect TiKV's MVCC boundaries - Keyspace separation - txn client and RawKV client stay isolated - Proper key handling - keys are copied to avoid iterator reuse issues - Batch processing - deletes batched (10K default) to manage memory Changes: 1. Core data structure: - Removed deleteRangeConcurrency field - Added batchCommitSize field (configurable, default 10000) 2. DeleteFolderChildren rewrite: - Replaced DeleteRange with iterative batch deletes - Added proper transaction lifecycle management - Implemented key copying to avoid iterator buffer reuse - Added batching to prevent memory exhaustion 3. New deleteBatch helper: - Handles transaction creation and lifecycle - Batches deletes within single transaction - Properly commits/rolls back based on context 4. Context propagation: - Updated RunInTxn to accept context parameter - All RunInTxn call sites now pass context - Enables proper timeout/cancellation handling 5. Configuration: - Removed deleterange_concurrency setting - Added batchdelete_count setting (default 10000) All critical review comments from PR #7188 have been addressed: - Proper key copying with append([]byte(nil), key...) - Conditional transaction rollback based on inContext flag - Context propagation for commits - Proper transaction lifecycle management - Configurable batch size Co-authored-by: giftz * fix: remove extra closing brace causing syntax error in tikv_store.go --------- Co-authored-by: giftz --- weed/command/scaffold/filer.toml | 5 +- weed/filer/tikv/tikv_store.go | 120 ++++++++++++++++++------------- weed/filer/tikv/tikv_store_kv.go | 6 +- 3 files changed, 78 insertions(+), 53 deletions(-) diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index 080d8f78b..61a7ced6d 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -381,10 +381,11 @@ enabled = false # If you have many pd address, use ',' split then: # pdaddrs = "pdhost1:2379, pdhost2:2379, pdhost3:2379" pdaddrs = "localhost:2379" -# Concurrency for TiKV delete range -deleterange_concurrency = 1 # Enable 1PC enable_1pc = false +# batch delete count, default 10000 in code +#batchdelete_count = 20000 + # Set the CA certificate path ca_path="" # Set the certificate path diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go index 307d2b3fb..6749e6bc2 100644 --- a/weed/filer/tikv/tikv_store.go +++ b/weed/filer/tikv/tikv_store.go @@ -20,6 +20,8 @@ import ( "github.com/tikv/client-go/v2/txnkv" ) +const defaultBatchCommitSize = 10000 + var ( _ filer.FilerStore = ((*TikvStore)(nil)) ) @@ -29,9 +31,9 @@ func init() { } type TikvStore struct { - client *txnkv.Client - deleteRangeConcurrency int - onePC bool + client *txnkv.Client + onePC bool + batchCommitSize int } // Basic APIs @@ -46,12 +48,13 @@ func (store *TikvStore) Initialize(config util.Configuration, prefix string) err verify_cn := strings.Split(config.GetString(prefix+"verify_cn"), ",") pdAddrs := strings.Split(config.GetString(prefix+"pdaddrs"), ",") - drc := config.GetInt(prefix + "deleterange_concurrency") - if drc <= 0 { - drc = 1 + bdc := config.GetInt(prefix + "batchdelete_count") + if bdc <= 0 { + bdc = defaultBatchCommitSize } + store.onePC = config.GetBool(prefix + "enable_1pc") - store.deleteRangeConcurrency = drc + store.batchCommitSize = bdc return store.initialize(ca, cert, key, verify_cn, pdAddrs) } @@ -86,7 +89,7 @@ func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) err if err != nil { return err } - err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + err = txn.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { return txn.Set(key, value) }) if err != nil { @@ -108,7 +111,7 @@ func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*fil return nil, err } var value []byte = nil - err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + err = txn.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { val, err := txn.Get(context.TODO(), key) if err == nil { value = val @@ -143,7 +146,7 @@ func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) err return err } - err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + err = txn.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { return txn.Delete(key) }) if err != nil { @@ -158,53 +161,75 @@ func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) err func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error { directoryPrefix := genDirectoryKeyPrefix(path, "") - txn, err := store.getTxn(ctx) + iterTxn, err := store.getTxn(ctx) if err != nil { return err } - var ( - startKey []byte = nil - endKey []byte = nil - ) - err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { - iter, err := txn.Iter(directoryPrefix, nil) - if err != nil { - return err + + if !iterTxn.inContext { + defer func() { + _ = iterTxn.Rollback() + }() + } + + iter, err := iterTxn.Iter(directoryPrefix, nil) + if err != nil { + return err + } + defer iter.Close() + + var keys [][]byte + + for iter.Valid() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break } - defer iter.Close() - for iter.Valid() { - key := iter.Key() - endKey = key - if !bytes.HasPrefix(key, directoryPrefix) { - break - } - if startKey == nil { - startKey = key - } - err = iter.Next() - if err != nil { - return err + keys = append(keys, append([]byte(nil), key...)) + + if len(keys) >= store.batchCommitSize { + if err := store.deleteBatch(ctx, keys); err != nil { + return fmt.Errorf("delete batch in %s, error: %v", path, err) } + keys = keys[:0] } - // Only one Key matched just delete it. - if startKey != nil && bytes.Equal(startKey, endKey) { - return txn.Delete(startKey) + + if err := iter.Next(); err != nil { + return err } - return nil - }) + } + + if len(keys) > 0 { + if err := store.deleteBatch(ctx, keys); err != nil { + return fmt.Errorf("delete batch in %s, error: %v", path, err) + } + } + + return nil +} + +func (store *TikvStore) deleteBatch(ctx context.Context, keys [][]byte) error { + deleteTxn, err := store.getTxn(ctx) if err != nil { - return fmt.Errorf("delete %s : %v", path, err) + return err } - if startKey != nil && endKey != nil && !bytes.Equal(startKey, endKey) { - // has startKey and endKey and they are not equals, so use delete range - _, err = store.client.DeleteRange(context.Background(), startKey, endKey, store.deleteRangeConcurrency) - if err != nil { - return fmt.Errorf("delete %s : %v", path, err) + if !deleteTxn.inContext { + defer func() { _ = deleteTxn.Rollback() }() + } + + for _, key := range keys { + if err := deleteTxn.Delete(key); err != nil { + return err } } - return err + + if !deleteTxn.inContext { + return deleteTxn.Commit(ctx) + } + + return nil } func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) { @@ -224,7 +249,7 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat return lastFileName, err } var callbackErr error - err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + err = txn.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { iter, err := txn.Iter(lastFileStart, nil) if err != nil { return err @@ -353,15 +378,14 @@ type TxnWrapper struct { inContext bool } -func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error { +func (w *TxnWrapper) RunInTxn(ctx context.Context, f func(txn *txnkv.KVTxn) error) error { err := f(w.KVTxn) if !w.inContext { if err != nil { w.KVTxn.Rollback() return err } - w.KVTxn.Commit(context.Background()) - return nil + return w.KVTxn.Commit(ctx) } return err } diff --git a/weed/filer/tikv/tikv_store_kv.go b/weed/filer/tikv/tikv_store_kv.go index a2aaafb7a..0266dcfdb 100644 --- a/weed/filer/tikv/tikv_store_kv.go +++ b/weed/filer/tikv/tikv_store_kv.go @@ -15,7 +15,7 @@ func (store *TikvStore) KvPut(ctx context.Context, key []byte, value []byte) err if err != nil { return err } - return tw.RunInTxn(func(txn *txnkv.KVTxn) error { + return tw.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { return txn.Set(key, value) }) } @@ -26,7 +26,7 @@ func (store *TikvStore) KvGet(ctx context.Context, key []byte) ([]byte, error) { return nil, err } var data []byte = nil - err = tw.RunInTxn(func(txn *txnkv.KVTxn) error { + err = tw.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { val, err := txn.Get(context.TODO(), key) if err == nil { data = val @@ -44,7 +44,7 @@ func (store *TikvStore) KvDelete(ctx context.Context, key []byte) error { if err != nil { return err } - return tw.RunInTxn(func(txn *txnkv.KVTxn) error { + return tw.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { return txn.Delete(key) }) }