You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							368 lines
						
					
					
						
							8.7 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							368 lines
						
					
					
						
							8.7 KiB
						
					
					
				| //go:build tikv | |
| // +build tikv | |
|  | |
| package tikv | |
| 
 | |
| import ( | |
| 	"bytes" | |
| 	"context" | |
| 	"crypto/sha1" | |
| 	"fmt" | |
| 	"io" | |
| 	"strings" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/filer" | |
| 	"github.com/seaweedfs/seaweedfs/weed/glog" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/util" | |
| 	"github.com/tikv/client-go/v2/config" | |
| 	"github.com/tikv/client-go/v2/txnkv" | |
| ) | |
| 
 | |
| var ( | |
| 	_ filer.FilerStore = ((*TikvStore)(nil)) | |
| ) | |
| 
 | |
| func init() { | |
| 	filer.Stores = append(filer.Stores, &TikvStore{}) | |
| } | |
| 
 | |
| type TikvStore struct { | |
| 	client                 *txnkv.Client | |
| 	deleteRangeConcurrency int | |
| 	onePC                  bool | |
| } | |
| 
 | |
| // Basic APIs | |
| func (store *TikvStore) GetName() string { | |
| 	return "tikv" | |
| } | |
| 
 | |
| func (store *TikvStore) Initialize(config util.Configuration, prefix string) error { | |
| 	ca := config.GetString(prefix + "ca_path") | |
| 	cert := config.GetString(prefix + "cert_path") | |
| 	key := config.GetString(prefix + "key_path") | |
| 	verify_cn := strings.Split(config.GetString(prefix+"verify_cn"), ",") | |
| 	pdAddrs := strings.Split(config.GetString(prefix+"pdaddrs"), ",") | |
| 
 | |
| 	drc := config.GetInt(prefix + "deleterange_concurrency") | |
| 	if drc <= 0 { | |
| 		drc = 1 | |
| 	} | |
| 	store.onePC = config.GetBool(prefix + "enable_1pc") | |
| 	store.deleteRangeConcurrency = drc | |
| 	return store.initialize(ca, cert, key, verify_cn, pdAddrs) | |
| } | |
| 
 | |
| func (store *TikvStore) initialize(ca, cert, key string, verify_cn, pdAddrs []string) error { | |
| 	config.UpdateGlobal(func(conf *config.Config) { | |
| 		conf.Security = config.NewSecurity(ca, cert, key, verify_cn) | |
| 	}) | |
| 	client, err := txnkv.NewClient(pdAddrs) | |
| 	store.client = client | |
| 	return err | |
| } | |
| 
 | |
| func (store *TikvStore) Shutdown() { | |
| 	err := store.client.Close() | |
| 	if err != nil { | |
| 		glog.V(0).Infof("Shutdown TiKV client got error: %v", err) | |
| 	} | |
| } | |
| 
 | |
| // ~ Basic APIs | |
|  | |
| // Entry APIs | |
| func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) error { | |
| 	dir, name := entry.DirAndName() | |
| 	key := generateKey(dir, name) | |
| 
 | |
| 	value, err := entry.EncodeAttributesAndChunks() | |
| 	if err != nil { | |
| 		return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) | |
| 	} | |
| 	txn, err := store.getTxn(ctx) | |
| 	if err != nil { | |
| 		return err | |
| 	} | |
| 	err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { | |
| 		return txn.Set(key, value) | |
| 	}) | |
| 	if err != nil { | |
| 		return fmt.Errorf("persisting %s : %v", entry.FullPath, err) | |
| 	} | |
| 	return nil | |
| } | |
| 
 | |
| func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error { | |
| 	return store.InsertEntry(ctx, entry) | |
| } | |
| 
 | |
| func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*filer.Entry, error) { | |
| 	dir, name := path.DirAndName() | |
| 	key := generateKey(dir, name) | |
| 
 | |
| 	txn, err := store.getTxn(ctx) | |
| 	if err != nil { | |
| 		return nil, err | |
| 	} | |
| 	var value []byte = nil | |
| 	err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { | |
| 		val, err := txn.Get(context.TODO(), key) | |
| 		if err == nil { | |
| 			value = val | |
| 		} | |
| 		return err | |
| 	}) | |
| 
 | |
| 	if isNotExists(err) || value == nil { | |
| 		return nil, filer_pb.ErrNotFound | |
| 	} | |
| 
 | |
| 	if err != nil { | |
| 		return nil, fmt.Errorf("get %s : %v", path, err) | |
| 	} | |
| 
 | |
| 	entry := &filer.Entry{ | |
| 		FullPath: path, | |
| 	} | |
| 	err = entry.DecodeAttributesAndChunks(value) | |
| 	if err != nil { | |
| 		return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) | |
| 	} | |
| 	return entry, nil | |
| } | |
| 
 | |
| func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) error { | |
| 	dir, name := path.DirAndName() | |
| 	key := generateKey(dir, name) | |
| 
 | |
| 	txn, err := store.getTxn(ctx) | |
| 	if err != nil { | |
| 		return err | |
| 	} | |
| 
 | |
| 	err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { | |
| 		return txn.Delete(key) | |
| 	}) | |
| 	if err != nil { | |
| 		return fmt.Errorf("delete %s : %v", path, err) | |
| 	} | |
| 	return nil | |
| } | |
| 
 | |
| // ~ Entry APIs | |
|  | |
| // Directory APIs | |
| func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error { | |
| 	directoryPrefix := genDirectoryKeyPrefix(path, "") | |
| 
 | |
| 	txn, err := store.getTxn(ctx) | |
| 	if err != nil { | |
| 		return err | |
| 	} | |
| 	var ( | |
| 		startKey []byte = nil | |
| 		endKey   []byte = nil | |
| 	) | |
| 	err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { | |
| 		iter, err := txn.Iter(directoryPrefix, nil) | |
| 		if err != nil { | |
| 			return err | |
| 		} | |
| 		defer iter.Close() | |
| 		for iter.Valid() { | |
| 			key := iter.Key() | |
| 			endKey = key | |
| 			if !bytes.HasPrefix(key, directoryPrefix) { | |
| 				break | |
| 			} | |
| 			if startKey == nil { | |
| 				startKey = key | |
| 			} | |
| 
 | |
| 			err = iter.Next() | |
| 			if err != nil { | |
| 				return err | |
| 			} | |
| 		} | |
| 		// Only one Key matched just delete it. | |
| 		if startKey != nil && bytes.Equal(startKey, endKey) { | |
| 			return txn.Delete(startKey) | |
| 		} | |
| 		return nil | |
| 	}) | |
| 	if err != nil { | |
| 		return fmt.Errorf("delete %s : %v", path, err) | |
| 	} | |
| 
 | |
| 	if startKey != nil && endKey != nil && !bytes.Equal(startKey, endKey) { | |
| 		// has startKey and endKey and they are not equals, so use delete range | |
| 		_, err = store.client.DeleteRange(context.Background(), startKey, endKey, store.deleteRangeConcurrency) | |
| 		if err != nil { | |
| 			return fmt.Errorf("delete %s : %v", path, err) | |
| 		} | |
| 	} | |
| 	return err | |
| } | |
| 
 | |
| func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) { | |
| 	return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) | |
| } | |
| 
 | |
| func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) { | |
| 	lastFileName := "" | |
| 	directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix) | |
| 	lastFileStart := directoryPrefix | |
| 	if startFileName != "" { | |
| 		lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName) | |
| 	} | |
| 
 | |
| 	txn, err := store.getTxn(ctx) | |
| 	if err != nil { | |
| 		return lastFileName, err | |
| 	} | |
| 	err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { | |
| 		iter, err := txn.Iter(lastFileStart, nil) | |
| 		if err != nil { | |
| 			return err | |
| 		} | |
| 		defer iter.Close() | |
| 		for i := int64(0); i < limit && iter.Valid(); i++ { | |
| 			key := iter.Key() | |
| 			if !bytes.HasPrefix(key, directoryPrefix) { | |
| 				break | |
| 			} | |
| 			fileName := getNameFromKey(key) | |
| 			if fileName == "" || fileName == startFileName && !includeStartFile { | |
| 				if err := iter.Next(); err != nil { | |
| 					break | |
| 				} else { | |
| 					continue | |
| 				} | |
| 			} | |
| 			lastFileName = fileName | |
| 			entry := &filer.Entry{ | |
| 				FullPath: util.NewFullPath(string(dirPath), fileName), | |
| 			} | |
| 
 | |
| 			// println("list", entry.FullPath, "chunks", len(entry.GetChunks())) | |
| 			if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(iter.Value())); decodeErr != nil { | |
| 				err = decodeErr | |
| 				glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) | |
| 				break | |
| 			} | |
| 			if err := iter.Next(); !eachEntryFunc(entry) || err != nil { | |
| 				break | |
| 			} | |
| 		} | |
| 		return nil | |
| 	}) | |
| 	if err != nil { | |
| 		return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err) | |
| 	} | |
| 	return lastFileName, nil | |
| } | |
| 
 | |
| // ~ Directory APIs | |
|  | |
| // Transaction Related APIs | |
| func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) { | |
| 	tx, err := store.client.Begin() | |
| 	if err != nil { | |
| 		return ctx, err | |
| 	} | |
| 	if store.onePC { | |
| 		tx.SetEnable1PC(store.onePC) | |
| 	} | |
| 	return context.WithValue(ctx, "tx", tx), nil | |
| } | |
| 
 | |
| func (store *TikvStore) CommitTransaction(ctx context.Context) error { | |
| 	if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { | |
| 		return tx.Commit(context.Background()) | |
| 	} | |
| 	return nil | |
| } | |
| 
 | |
| func (store *TikvStore) RollbackTransaction(ctx context.Context) error { | |
| 	if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { | |
| 		return tx.Rollback() | |
| 	} | |
| 	return nil | |
| } | |
| 
 | |
| // ~ Transaction Related APIs | |
|  | |
| // Transaction Wrapper | |
| type TxnWrapper struct { | |
| 	*txnkv.KVTxn | |
| 	inContext bool | |
| } | |
| 
 | |
| func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error { | |
| 	err := f(w.KVTxn) | |
| 	if !w.inContext { | |
| 		if err != nil { | |
| 			w.KVTxn.Rollback() | |
| 			return err | |
| 		} | |
| 		w.KVTxn.Commit(context.Background()) | |
| 		return nil | |
| 	} | |
| 	return err | |
| } | |
| 
 | |
| func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) { | |
| 	if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { | |
| 		return &TxnWrapper{tx, true}, nil | |
| 	} | |
| 	txn, err := store.client.Begin() | |
| 	if err != nil { | |
| 		return nil, err | |
| 	} | |
| 	if store.onePC { | |
| 		txn.SetEnable1PC(store.onePC) | |
| 	} | |
| 	return &TxnWrapper{txn, false}, nil | |
| } | |
| 
 | |
| // ~ Transaction Wrapper | |
|  | |
| // Encoding Functions | |
| func hashToBytes(dir string) []byte { | |
| 	h := sha1.New() | |
| 	io.WriteString(h, dir) | |
| 	b := h.Sum(nil) | |
| 	return b | |
| } | |
| 
 | |
| func generateKey(dirPath, fileName string) []byte { | |
| 	key := hashToBytes(dirPath) | |
| 	key = append(key, []byte(fileName)...) | |
| 	return key | |
| } | |
| 
 | |
| func getNameFromKey(key []byte) string { | |
| 	return string(key[sha1.Size:]) | |
| } | |
| 
 | |
| func genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) { | |
| 	keyPrefix = hashToBytes(string(fullpath)) | |
| 	if len(startFileName) > 0 { | |
| 		keyPrefix = append(keyPrefix, []byte(startFileName)...) | |
| 	} | |
| 	return keyPrefix | |
| } | |
| 
 | |
| func isNotExists(err error) bool { | |
| 	if err == nil { | |
| 		return false | |
| 	} | |
| 	if err.Error() == "not exist" { | |
| 		return true | |
| 	} | |
| 	return false | |
| } | |
| 
 | |
| // ~ Encoding Functions
 |