diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index 1ec91404c..2892fa0d2 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -399,6 +399,8 @@ enabled = false # If you have many pd address, use ',' split then: # pdaddrs = "pdhost1:2379, pdhost2:2379, pdhost3:2379" pdaddrs = "localhost:2379" +# prefix for filer TiKV keys, useful for sharing a TiKV cluster with multiple seaweedfs clusters +keyPrefix = "" # Enable 1PC enable_1pc = false # batch delete count, default 10000 in code diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go index a85c8119f..72f9461bf 100644 --- a/weed/filer/tikv/tikv_store.go +++ b/weed/filer/tikv/tikv_store.go @@ -34,6 +34,7 @@ type TikvStore struct { client *txnkv.Client onePC bool batchCommitSize int + keyPrefix []byte } // Basic APIs @@ -47,6 +48,7 @@ func (store *TikvStore) Initialize(config util.Configuration, prefix string) err key := config.GetString(prefix + "key_path") verify_cn := strings.Split(config.GetString(prefix+"verify_cn"), ",") pdAddrs := strings.Split(config.GetString(prefix+"pdaddrs"), ",") + keyPrefix := config.GetString(prefix + "keyPrefix") bdc := config.GetInt(prefix + "batchdelete_count") if bdc <= 0 { @@ -55,6 +57,7 @@ func (store *TikvStore) Initialize(config util.Configuration, prefix string) err store.onePC = config.GetBool(prefix + "enable_1pc") store.batchCommitSize = bdc + store.keyPrefix = []byte(keyPrefix) return store.initialize(ca, cert, key, verify_cn, pdAddrs) } @@ -79,7 +82,7 @@ func (store *TikvStore) Shutdown() { // Entry APIs func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) error { dir, name := entry.DirAndName() - key := generateKey(dir, name) + key := store.generateKey(dir, name) value, err := entry.EncodeAttributesAndChunks() if err != nil { @@ -104,7 +107,7 @@ func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer.Entry) err func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*filer.Entry, error) { dir, name := path.DirAndName() - key := generateKey(dir, name) + key := store.generateKey(dir, name) txn, err := store.getTxn(ctx) if err != nil { @@ -139,7 +142,7 @@ func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*fil func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) error { dir, name := path.DirAndName() - key := generateKey(dir, name) + key := store.generateKey(dir, name) txn, err := store.getTxn(ctx) if err != nil { @@ -159,7 +162,7 @@ func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) err // Directory APIs func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error { - directoryPrefix := genDirectoryKeyPrefix(path, "") + directoryPrefix := store.genDirectoryKeyPrefix(path, "") iterTxn, err := store.getTxn(ctx) if err != nil { @@ -242,10 +245,10 @@ func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.F func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) { lastFileName := "" - directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix) + directoryPrefix := store.genDirectoryKeyPrefix(dirPath, prefix) lastFileStart := directoryPrefix if startFileName != "" { - lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName) + lastFileStart = store.genDirectoryKeyPrefix(dirPath, startFileName) } txn, err := store.getTxn(ctx) @@ -265,7 +268,7 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat if !bytes.HasPrefix(key, directoryPrefix) { break } - fileName := getNameFromKey(key) + fileName := store.getNameFromKey(key) if fileName == "" { if err := iter.Next(); err != nil { break @@ -418,22 +421,37 @@ func hashToBytes(dir string) []byte { return b } -func generateKey(dirPath, fileName string) []byte { +func (store *TikvStore) getKey(key []byte) []byte { + if len(store.keyPrefix) == 0 { + return key + } + result := make([]byte, len(store.keyPrefix)+len(key)) + copy(result, store.keyPrefix) + copy(result[len(store.keyPrefix):], key) + return result +} + +func (store *TikvStore) generateKey(dirPath, fileName string) []byte { key := hashToBytes(dirPath) key = append(key, []byte(fileName)...) - return key + return store.getKey(key) } -func getNameFromKey(key []byte) string { - return string(key[sha1.Size:]) +func (store *TikvStore) getNameFromKey(key []byte) string { + minKeyLen := len(store.keyPrefix) + sha1.Size + if len(key) < minKeyLen { + glog.Warningf("malformed key: length %d is less than minimum %d", len(key), minKeyLen) + return "" + } + return string(key[minKeyLen:]) } -func genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) { +func (store *TikvStore) genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) { keyPrefix = hashToBytes(string(fullpath)) if len(startFileName) > 0 { keyPrefix = append(keyPrefix, []byte(startFileName)...) } - return keyPrefix + return store.getKey(keyPrefix) } func isNotExists(err error) bool {