You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							368 lines
						
					
					
						
							8.7 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							368 lines
						
					
					
						
							8.7 KiB
						
					
					
				
								//go:build tikv
							 | 
						|
								// +build tikv
							 | 
						|
								
							 | 
						|
								package tikv
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"bytes"
							 | 
						|
									"context"
							 | 
						|
									"crypto/sha1"
							 | 
						|
									"fmt"
							 | 
						|
									"io"
							 | 
						|
									"strings"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/filer"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/glog"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/util"
							 | 
						|
									"github.com/tikv/client-go/v2/config"
							 | 
						|
									"github.com/tikv/client-go/v2/txnkv"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								var (
							 | 
						|
									_ filer.FilerStore = ((*TikvStore)(nil))
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								func init() {
							 | 
						|
									filer.Stores = append(filer.Stores, &TikvStore{})
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								type TikvStore struct {
							 | 
						|
									client                 *txnkv.Client
							 | 
						|
									deleteRangeConcurrency int
							 | 
						|
									onePC                  bool
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Basic APIs
							 | 
						|
								func (store *TikvStore) GetName() string {
							 | 
						|
									return "tikv"
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *TikvStore) Initialize(config util.Configuration, prefix string) error {
							 | 
						|
									ca := config.GetString(prefix + "ca_path")
							 | 
						|
									cert := config.GetString(prefix + "cert_path")
							 | 
						|
									key := config.GetString(prefix + "key_path")
							 | 
						|
									verify_cn := strings.Split(config.GetString(prefix+"verify_cn"), ",")
							 | 
						|
									pdAddrs := strings.Split(config.GetString(prefix+"pdaddrs"), ",")
							 | 
						|
								
							 | 
						|
									drc := config.GetInt(prefix + "deleterange_concurrency")
							 | 
						|
									if drc <= 0 {
							 | 
						|
										drc = 1
							 | 
						|
									}
							 | 
						|
									store.onePC = config.GetBool(prefix + "enable_1pc")
							 | 
						|
									store.deleteRangeConcurrency = drc
							 | 
						|
									return store.initialize(ca, cert, key, verify_cn, pdAddrs)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *TikvStore) initialize(ca, cert, key string, verify_cn, pdAddrs []string) error {
							 | 
						|
									config.UpdateGlobal(func(conf *config.Config) {
							 | 
						|
										conf.Security = config.NewSecurity(ca, cert, key, verify_cn)
							 | 
						|
									})
							 | 
						|
									client, err := txnkv.NewClient(pdAddrs)
							 | 
						|
									store.client = client
							 | 
						|
									return err
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *TikvStore) Shutdown() {
							 | 
						|
									err := store.client.Close()
							 | 
						|
									if err != nil {
							 | 
						|
										glog.V(0).Infof("Shutdown TiKV client got error: %v", err)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ~ Basic APIs
							 | 
						|
								
							 | 
						|
								// Entry APIs
							 | 
						|
								func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
							 | 
						|
									dir, name := entry.DirAndName()
							 | 
						|
									key := generateKey(dir, name)
							 | 
						|
								
							 | 
						|
									value, err := entry.EncodeAttributesAndChunks()
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
							 | 
						|
									}
							 | 
						|
									txn, err := store.getTxn(ctx)
							 | 
						|
									if err != nil {
							 | 
						|
										return err
							 | 
						|
									}
							 | 
						|
									err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
							 | 
						|
										return txn.Set(key, value)
							 | 
						|
									})
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
							 | 
						|
									}
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
							 | 
						|
									return store.InsertEntry(ctx, entry)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*filer.Entry, error) {
							 | 
						|
									dir, name := path.DirAndName()
							 | 
						|
									key := generateKey(dir, name)
							 | 
						|
								
							 | 
						|
									txn, err := store.getTxn(ctx)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
									var value []byte = nil
							 | 
						|
									err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
							 | 
						|
										val, err := txn.Get(context.TODO(), key)
							 | 
						|
										if err == nil {
							 | 
						|
											value = val
							 | 
						|
										}
							 | 
						|
										return err
							 | 
						|
									})
							 | 
						|
								
							 | 
						|
									if isNotExists(err) || value == nil {
							 | 
						|
										return nil, filer_pb.ErrNotFound
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("get %s : %v", path, err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									entry := &filer.Entry{
							 | 
						|
										FullPath: path,
							 | 
						|
									}
							 | 
						|
									err = entry.DecodeAttributesAndChunks(value)
							 | 
						|
									if err != nil {
							 | 
						|
										return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
							 | 
						|
									}
							 | 
						|
									return entry, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) error {
							 | 
						|
									dir, name := path.DirAndName()
							 | 
						|
									key := generateKey(dir, name)
							 | 
						|
								
							 | 
						|
									txn, err := store.getTxn(ctx)
							 | 
						|
									if err != nil {
							 | 
						|
										return err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
							 | 
						|
										return txn.Delete(key)
							 | 
						|
									})
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("delete %s : %v", path, err)
							 | 
						|
									}
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ~ Entry APIs
							 | 
						|
								
							 | 
						|
								// Directory APIs
							 | 
						|
								func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error {
							 | 
						|
									directoryPrefix := genDirectoryKeyPrefix(path, "")
							 | 
						|
								
							 | 
						|
									txn, err := store.getTxn(ctx)
							 | 
						|
									if err != nil {
							 | 
						|
										return err
							 | 
						|
									}
							 | 
						|
									var (
							 | 
						|
										startKey []byte = nil
							 | 
						|
										endKey   []byte = nil
							 | 
						|
									)
							 | 
						|
									err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
							 | 
						|
										iter, err := txn.Iter(directoryPrefix, nil)
							 | 
						|
										if err != nil {
							 | 
						|
											return err
							 | 
						|
										}
							 | 
						|
										defer iter.Close()
							 | 
						|
										for iter.Valid() {
							 | 
						|
											key := iter.Key()
							 | 
						|
											endKey = key
							 | 
						|
											if !bytes.HasPrefix(key, directoryPrefix) {
							 | 
						|
												break
							 | 
						|
											}
							 | 
						|
											if startKey == nil {
							 | 
						|
												startKey = key
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											err = iter.Next()
							 | 
						|
											if err != nil {
							 | 
						|
												return err
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
										// Only one Key matched just delete it.
							 | 
						|
										if startKey != nil && bytes.Equal(startKey, endKey) {
							 | 
						|
											return txn.Delete(startKey)
							 | 
						|
										}
							 | 
						|
										return nil
							 | 
						|
									})
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("delete %s : %v", path, err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if startKey != nil && endKey != nil && !bytes.Equal(startKey, endKey) {
							 | 
						|
										// has startKey and endKey and they are not equals, so use delete range
							 | 
						|
										_, err = store.client.DeleteRange(context.Background(), startKey, endKey, store.deleteRangeConcurrency)
							 | 
						|
										if err != nil {
							 | 
						|
											return fmt.Errorf("delete %s : %v", path, err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
									return err
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
							 | 
						|
									return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
							 | 
						|
									lastFileName := ""
							 | 
						|
									directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
							 | 
						|
									lastFileStart := directoryPrefix
							 | 
						|
									if startFileName != "" {
							 | 
						|
										lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									txn, err := store.getTxn(ctx)
							 | 
						|
									if err != nil {
							 | 
						|
										return lastFileName, err
							 | 
						|
									}
							 | 
						|
									err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
							 | 
						|
										iter, err := txn.Iter(lastFileStart, nil)
							 | 
						|
										if err != nil {
							 | 
						|
											return err
							 | 
						|
										}
							 | 
						|
										defer iter.Close()
							 | 
						|
										for i := int64(0); i < limit && iter.Valid(); i++ {
							 | 
						|
											key := iter.Key()
							 | 
						|
											if !bytes.HasPrefix(key, directoryPrefix) {
							 | 
						|
												break
							 | 
						|
											}
							 | 
						|
											fileName := getNameFromKey(key)
							 | 
						|
											if fileName == "" || fileName == startFileName && !includeStartFile {
							 | 
						|
												if err := iter.Next(); err != nil {
							 | 
						|
													break
							 | 
						|
												} else {
							 | 
						|
													continue
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
											lastFileName = fileName
							 | 
						|
											entry := &filer.Entry{
							 | 
						|
												FullPath: util.NewFullPath(string(dirPath), fileName),
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// println("list", entry.FullPath, "chunks", len(entry.GetChunks()))
							 | 
						|
											if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(iter.Value())); decodeErr != nil {
							 | 
						|
												err = decodeErr
							 | 
						|
												glog.V(0).Infof("list %s : %v", entry.FullPath, err)
							 | 
						|
												break
							 | 
						|
											}
							 | 
						|
											if err := iter.Next(); !eachEntryFunc(entry) || err != nil {
							 | 
						|
												break
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
										return nil
							 | 
						|
									})
							 | 
						|
									if err != nil {
							 | 
						|
										return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
							 | 
						|
									}
							 | 
						|
									return lastFileName, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ~ Directory APIs
							 | 
						|
								
							 | 
						|
								// Transaction Related APIs
							 | 
						|
								func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
							 | 
						|
									tx, err := store.client.Begin()
							 | 
						|
									if err != nil {
							 | 
						|
										return ctx, err
							 | 
						|
									}
							 | 
						|
									if store.onePC {
							 | 
						|
										tx.SetEnable1PC(store.onePC)
							 | 
						|
									}
							 | 
						|
									return context.WithValue(ctx, "tx", tx), nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *TikvStore) CommitTransaction(ctx context.Context) error {
							 | 
						|
									if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
							 | 
						|
										return tx.Commit(context.Background())
							 | 
						|
									}
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
							 | 
						|
									if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
							 | 
						|
										return tx.Rollback()
							 | 
						|
									}
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ~ Transaction Related APIs
							 | 
						|
								
							 | 
						|
								// Transaction Wrapper
							 | 
						|
								type TxnWrapper struct {
							 | 
						|
									*txnkv.KVTxn
							 | 
						|
									inContext bool
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error {
							 | 
						|
									err := f(w.KVTxn)
							 | 
						|
									if !w.inContext {
							 | 
						|
										if err != nil {
							 | 
						|
											w.KVTxn.Rollback()
							 | 
						|
											return err
							 | 
						|
										}
							 | 
						|
										w.KVTxn.Commit(context.Background())
							 | 
						|
										return nil
							 | 
						|
									}
							 | 
						|
									return err
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) {
							 | 
						|
									if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
							 | 
						|
										return &TxnWrapper{tx, true}, nil
							 | 
						|
									}
							 | 
						|
									txn, err := store.client.Begin()
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
									if store.onePC {
							 | 
						|
										txn.SetEnable1PC(store.onePC)
							 | 
						|
									}
							 | 
						|
									return &TxnWrapper{txn, false}, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ~ Transaction Wrapper
							 | 
						|
								
							 | 
						|
								// Encoding Functions
							 | 
						|
								func hashToBytes(dir string) []byte {
							 | 
						|
									h := sha1.New()
							 | 
						|
									io.WriteString(h, dir)
							 | 
						|
									b := h.Sum(nil)
							 | 
						|
									return b
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func generateKey(dirPath, fileName string) []byte {
							 | 
						|
									key := hashToBytes(dirPath)
							 | 
						|
									key = append(key, []byte(fileName)...)
							 | 
						|
									return key
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func getNameFromKey(key []byte) string {
							 | 
						|
									return string(key[sha1.Size:])
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) {
							 | 
						|
									keyPrefix = hashToBytes(string(fullpath))
							 | 
						|
									if len(startFileName) > 0 {
							 | 
						|
										keyPrefix = append(keyPrefix, []byte(startFileName)...)
							 | 
						|
									}
							 | 
						|
									return keyPrefix
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func isNotExists(err error) bool {
							 | 
						|
									if err == nil {
							 | 
						|
										return false
							 | 
						|
									}
							 | 
						|
									if err.Error() == "not exist" {
							 | 
						|
										return true
							 | 
						|
									}
							 | 
						|
									return false
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ~ Encoding Functions
							 |