diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index 080d8f78b..61a7ced6d 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -381,10 +381,11 @@ enabled = false # If you have many pd address, use ',' split then: # pdaddrs = "pdhost1:2379, pdhost2:2379, pdhost3:2379" pdaddrs = "localhost:2379" -# Concurrency for TiKV delete range -deleterange_concurrency = 1 # Enable 1PC enable_1pc = false +# batch delete count, default 10000 in code +#batchdelete_count = 20000 + # Set the CA certificate path ca_path="" # Set the certificate path diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go index 307d2b3fb..6749e6bc2 100644 --- a/weed/filer/tikv/tikv_store.go +++ b/weed/filer/tikv/tikv_store.go @@ -20,6 +20,8 @@ import ( "github.com/tikv/client-go/v2/txnkv" ) +const defaultBatchCommitSize = 10000 + var ( _ filer.FilerStore = ((*TikvStore)(nil)) ) @@ -29,9 +31,9 @@ func init() { } type TikvStore struct { - client *txnkv.Client - deleteRangeConcurrency int - onePC bool + client *txnkv.Client + onePC bool + batchCommitSize int } // Basic APIs @@ -46,12 +48,13 @@ func (store *TikvStore) Initialize(config util.Configuration, prefix string) err verify_cn := strings.Split(config.GetString(prefix+"verify_cn"), ",") pdAddrs := strings.Split(config.GetString(prefix+"pdaddrs"), ",") - drc := config.GetInt(prefix + "deleterange_concurrency") - if drc <= 0 { - drc = 1 + bdc := config.GetInt(prefix + "batchdelete_count") + if bdc <= 0 { + bdc = defaultBatchCommitSize } + store.onePC = config.GetBool(prefix + "enable_1pc") - store.deleteRangeConcurrency = drc + store.batchCommitSize = bdc return store.initialize(ca, cert, key, verify_cn, pdAddrs) } @@ -86,7 +89,7 @@ func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) err if err != nil { return err } - err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + err = txn.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { return txn.Set(key, value) }) if err != nil { @@ -108,7 +111,7 @@ func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*fil return nil, err } var value []byte = nil - err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + err = txn.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { val, err := txn.Get(context.TODO(), key) if err == nil { value = val @@ -143,7 +146,7 @@ func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) err return err } - err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + err = txn.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { return txn.Delete(key) }) if err != nil { @@ -158,53 +161,75 @@ func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) err func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error { directoryPrefix := genDirectoryKeyPrefix(path, "") - txn, err := store.getTxn(ctx) + iterTxn, err := store.getTxn(ctx) if err != nil { return err } - var ( - startKey []byte = nil - endKey []byte = nil - ) - err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { - iter, err := txn.Iter(directoryPrefix, nil) - if err != nil { - return err + + if !iterTxn.inContext { + defer func() { + _ = iterTxn.Rollback() + }() + } + + iter, err := iterTxn.Iter(directoryPrefix, nil) + if err != nil { + return err + } + defer iter.Close() + + var keys [][]byte + + for iter.Valid() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break } - defer iter.Close() - for iter.Valid() { - key := iter.Key() - endKey = key - if !bytes.HasPrefix(key, directoryPrefix) { - break - } - if startKey == nil { - startKey = key - } - err = iter.Next() - if err != nil { - return err + keys = append(keys, append([]byte(nil), key...)) + + if len(keys) >= store.batchCommitSize { + if err := store.deleteBatch(ctx, keys); err != nil { + return fmt.Errorf("delete batch in %s, error: %v", path, err) } + keys = keys[:0] } - // Only one Key matched just delete it. - if startKey != nil && bytes.Equal(startKey, endKey) { - return txn.Delete(startKey) + + if err := iter.Next(); err != nil { + return err } - return nil - }) + } + + if len(keys) > 0 { + if err := store.deleteBatch(ctx, keys); err != nil { + return fmt.Errorf("delete batch in %s, error: %v", path, err) + } + } + + return nil +} + +func (store *TikvStore) deleteBatch(ctx context.Context, keys [][]byte) error { + deleteTxn, err := store.getTxn(ctx) if err != nil { - return fmt.Errorf("delete %s : %v", path, err) + return err } - if startKey != nil && endKey != nil && !bytes.Equal(startKey, endKey) { - // has startKey and endKey and they are not equals, so use delete range - _, err = store.client.DeleteRange(context.Background(), startKey, endKey, store.deleteRangeConcurrency) - if err != nil { - return fmt.Errorf("delete %s : %v", path, err) + if !deleteTxn.inContext { + defer func() { _ = deleteTxn.Rollback() }() + } + + for _, key := range keys { + if err := deleteTxn.Delete(key); err != nil { + return err } } - return err + + if !deleteTxn.inContext { + return deleteTxn.Commit(ctx) + } + + return nil } func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) { @@ -224,7 +249,7 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat return lastFileName, err } var callbackErr error - err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + err = txn.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { iter, err := txn.Iter(lastFileStart, nil) if err != nil { return err @@ -353,15 +378,14 @@ type TxnWrapper struct { inContext bool } -func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error { +func (w *TxnWrapper) RunInTxn(ctx context.Context, f func(txn *txnkv.KVTxn) error) error { err := f(w.KVTxn) if !w.inContext { if err != nil { w.KVTxn.Rollback() return err } - w.KVTxn.Commit(context.Background()) - return nil + return w.KVTxn.Commit(ctx) } return err } diff --git a/weed/filer/tikv/tikv_store_kv.go b/weed/filer/tikv/tikv_store_kv.go index a2aaafb7a..0266dcfdb 100644 --- a/weed/filer/tikv/tikv_store_kv.go +++ b/weed/filer/tikv/tikv_store_kv.go @@ -15,7 +15,7 @@ func (store *TikvStore) KvPut(ctx context.Context, key []byte, value []byte) err if err != nil { return err } - return tw.RunInTxn(func(txn *txnkv.KVTxn) error { + return tw.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { return txn.Set(key, value) }) } @@ -26,7 +26,7 @@ func (store *TikvStore) KvGet(ctx context.Context, key []byte) ([]byte, error) { return nil, err } var data []byte = nil - err = tw.RunInTxn(func(txn *txnkv.KVTxn) error { + err = tw.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { val, err := txn.Get(context.TODO(), key) if err == nil { data = val @@ -44,7 +44,7 @@ func (store *TikvStore) KvDelete(ctx context.Context, key []byte) error { if err != nil { return err } - return tw.RunInTxn(func(txn *txnkv.KVTxn) error { + return tw.RunInTxn(ctx, func(txn *txnkv.KVTxn) error { return txn.Delete(key) }) }