From 4106fc0436ce91591820a59a9715a06cacf06724 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 26 Nov 2025 15:54:26 -0800 Subject: [PATCH] fix(tikv): improve context propagation and refactor batch delete logic (#7558) * fix(tikv): improve context propagation and refactor batch delete logic Address review comments from PR #7557: 1. Replace context.TODO() with ctx in txn.Get calls - Fixes timeout/cancellation propagation in FindEntry - Fixes timeout/cancellation propagation in KvGet 2. Refactor DeleteFolderChildren to use flush helper - Eliminates code duplication - Cleaner and more maintainable These changes ensure proper context propagation throughout all TiKV operations and improve code maintainability. * error formatting --- weed/filer/tikv/tikv_store.go | 26 +++++++++++++++----------- weed/filer/tikv/tikv_store_kv.go | 2 +- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go index 6749e6bc2..a85c8119f 100644 --- a/weed/filer/tikv/tikv_store.go +++ b/weed/filer/tikv/tikv_store.go @@ -112,7 +112,7 @@ func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*fil } var value []byte = nil err = txn.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { - val, err := txn.Get(context.TODO(), key) + val, err := txn.Get(ctx, key) if err == nil { value = val } @@ -180,6 +180,17 @@ func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.Full var keys [][]byte + flush := func() error { + if len(keys) == 0 { + return nil + } + if err := store.deleteBatch(ctx, keys); err != nil { + return fmt.Errorf("delete batch in %s, error: %w", path, err) + } + keys = keys[:0] + return nil + } + for iter.Valid() { key := iter.Key() if !bytes.HasPrefix(key, directoryPrefix) { @@ -189,10 +200,9 @@ func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.Full keys = append(keys, append([]byte(nil), key...)) if len(keys) >= store.batchCommitSize { - if err := store.deleteBatch(ctx, keys); err != nil { - return fmt.Errorf("delete batch in %s, error: %v", path, err) + if err := flush(); err != nil { + return err } - keys = keys[:0] } if err := iter.Next(); err != nil { @@ -200,13 +210,7 @@ func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.Full } } - if len(keys) > 0 { - if err := store.deleteBatch(ctx, keys); err != nil { - return fmt.Errorf("delete batch in %s, error: %v", path, err) - } - } - - return nil + return flush() } func (store *TikvStore) deleteBatch(ctx context.Context, keys [][]byte) error { diff --git a/weed/filer/tikv/tikv_store_kv.go b/weed/filer/tikv/tikv_store_kv.go index 0266dcfdb..1f32dfb96 100644 --- a/weed/filer/tikv/tikv_store_kv.go +++ b/weed/filer/tikv/tikv_store_kv.go @@ -27,7 +27,7 @@ func (store *TikvStore) KvGet(ctx context.Context, key []byte) ([]byte, error) { } var data []byte = nil err = tw.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { - val, err := txn.Get(context.TODO(), key) + val, err := txn.Get(ctx, key) if err == nil { data = val }