You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

386 lines
8.7 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package tikv
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/sha1"
  6. "fmt"
  7. "io"
  8. "strings"
  9. "github.com/chrislusf/seaweedfs/weed/filer"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. "github.com/tikv/client-go/v2/tikv"
  14. "github.com/tikv/client-go/v2/txnkv"
  15. )
  16. var (
  17. _ filer.FilerStore = ((*TikvStore)(nil))
  18. )
  19. func init() {
  20. filer.Stores = append(filer.Stores, &TikvStore{})
  21. }
  22. type TikvStore struct {
  23. client *tikv.KVStore
  24. deleteRangeConcurrency int
  25. }
  26. // Basic APIs
  27. func (store *TikvStore) GetName() string {
  28. return "tikv"
  29. }
  30. func (store *TikvStore) Initialize(config util.Configuration, prefix string) error {
  31. pdAddrs := []string{}
  32. pdAddrsStr := config.GetString(prefix + "pdaddrs")
  33. for _, item := range strings.Split(pdAddrsStr, ",") {
  34. pdAddrs = append(pdAddrs, strings.TrimSpace(item))
  35. }
  36. drc := config.GetInt(prefix + "deleterange_concurrency")
  37. if drc <= 0 {
  38. drc = 1
  39. }
  40. store.deleteRangeConcurrency = drc
  41. return store.initialize(pdAddrs)
  42. }
  43. func (store *TikvStore) initialize(pdAddrs []string) error {
  44. client, err := tikv.NewTxnClient(pdAddrs)
  45. store.client = client
  46. return err
  47. }
  48. func (store *TikvStore) Shutdown() {
  49. err := store.client.Close()
  50. if err != nil {
  51. glog.V(0).Infof("Shutdown TiKV client got error: %v", err)
  52. }
  53. }
  54. // ~ Basic APIs
  55. // Entry APIs
  56. func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
  57. dir, name := entry.DirAndName()
  58. key := generateKey(dir, name)
  59. value, err := entry.EncodeAttributesAndChunks()
  60. if err != nil {
  61. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  62. }
  63. txn, err := store.getTxn(ctx)
  64. if err != nil {
  65. return err
  66. }
  67. err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
  68. return txn.Set(key, value)
  69. })
  70. if err != nil {
  71. return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
  72. }
  73. return nil
  74. }
  75. func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
  76. return store.InsertEntry(ctx, entry)
  77. }
  78. func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*filer.Entry, error) {
  79. dir, name := path.DirAndName()
  80. key := generateKey(dir, name)
  81. txn, err := store.getTxn(ctx)
  82. if err != nil {
  83. return nil, err
  84. }
  85. var value []byte = nil
  86. err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
  87. val, err := txn.Get(context.TODO(), key)
  88. if err == nil {
  89. value = val
  90. }
  91. return err
  92. })
  93. if isNotExists(err) || value == nil {
  94. return nil, filer_pb.ErrNotFound
  95. }
  96. if err != nil {
  97. return nil, fmt.Errorf("get %s : %v", path, err)
  98. }
  99. entry := &filer.Entry{
  100. FullPath: path,
  101. }
  102. err = entry.DecodeAttributesAndChunks(value)
  103. if err != nil {
  104. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  105. }
  106. return entry, nil
  107. }
  108. func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) error {
  109. dir, name := path.DirAndName()
  110. key := generateKey(dir, name)
  111. txn, err := store.getTxn(ctx)
  112. if err != nil {
  113. return err
  114. }
  115. err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
  116. return txn.Delete(key)
  117. })
  118. if err != nil {
  119. return fmt.Errorf("delete %s : %v", path, err)
  120. }
  121. return nil
  122. }
  123. // ~ Entry APIs
  124. // Directory APIs
  125. func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error {
  126. directoryPrefix := genDirectoryKeyPrefix(path, "")
  127. txn, err := store.getTxn(ctx)
  128. if err != nil {
  129. return err
  130. }
  131. var (
  132. startKey []byte = nil
  133. endKey []byte = nil
  134. )
  135. err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
  136. iter, err := txn.Iter(directoryPrefix, nil)
  137. if err != nil {
  138. return err
  139. }
  140. defer iter.Close()
  141. for iter.Valid() {
  142. key := iter.Key()
  143. endKey = key
  144. if !bytes.HasPrefix(key, directoryPrefix) {
  145. break
  146. }
  147. if startKey == nil {
  148. startKey = key
  149. }
  150. err = iter.Next()
  151. if err != nil {
  152. return err
  153. }
  154. }
  155. // Only one Key matched just delete it.
  156. if startKey != nil && bytes.Equal(startKey, endKey) {
  157. return txn.Delete(startKey)
  158. }
  159. return nil
  160. })
  161. if err != nil {
  162. return fmt.Errorf("delete %s : %v", path, err)
  163. }
  164. if startKey != nil && endKey != nil && !bytes.Equal(startKey, endKey) {
  165. // has startKey and endKey and they are not equals, so use delete range
  166. _, err = store.client.DeleteRange(context.Background(), startKey, endKey, store.deleteRangeConcurrency)
  167. if err != nil {
  168. return fmt.Errorf("delete %s : %v", path, err)
  169. }
  170. }
  171. return err
  172. }
  173. func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
  174. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
  175. }
  176. func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
  177. lastFileName := ""
  178. directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
  179. lastFileStart := directoryPrefix
  180. if startFileName != "" {
  181. lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName)
  182. }
  183. txn, err := store.getTxn(ctx)
  184. if err != nil {
  185. return lastFileName, err
  186. }
  187. err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
  188. iter, err := txn.Iter(lastFileStart, nil)
  189. if err != nil {
  190. return err
  191. }
  192. defer iter.Close()
  193. i := int64(0)
  194. first := true
  195. for iter.Valid() {
  196. if first {
  197. first = false
  198. if !includeStartFile {
  199. if iter.Valid() {
  200. // Check first item is lastFileStart
  201. if bytes.Equal(iter.Key(), lastFileStart) {
  202. // Is lastFileStart and not include start file, just
  203. // ignore it.
  204. err = iter.Next()
  205. if err != nil {
  206. return err
  207. }
  208. continue
  209. }
  210. }
  211. }
  212. }
  213. // Check for limitation
  214. if limit > 0 {
  215. i++
  216. if i > limit {
  217. break
  218. }
  219. }
  220. // Validate key prefix
  221. key := iter.Key()
  222. if !bytes.HasPrefix(key, directoryPrefix) {
  223. break
  224. }
  225. value := iter.Value()
  226. // Start process
  227. fileName := getNameFromKey(key)
  228. if fileName != "" {
  229. // Got file name, then generate the Entry
  230. entry := &filer.Entry{
  231. FullPath: util.NewFullPath(string(dirPath), fileName),
  232. }
  233. // Update lastFileName
  234. lastFileName = fileName
  235. // Check for decode value.
  236. if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
  237. // Got error just return the error
  238. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  239. return err
  240. }
  241. // Run for each callback if return false just break the iteration
  242. if !eachEntryFunc(entry) {
  243. break
  244. }
  245. }
  246. // End process
  247. err = iter.Next()
  248. if err != nil {
  249. return err
  250. }
  251. }
  252. return nil
  253. })
  254. if err != nil {
  255. return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
  256. }
  257. return lastFileName, nil
  258. }
  259. // ~ Directory APIs
  260. // Transaction Related APIs
  261. func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  262. tx, err := store.client.Begin()
  263. if err != nil {
  264. return ctx, err
  265. }
  266. return context.WithValue(ctx, "tx", tx), nil
  267. }
  268. func (store *TikvStore) CommitTransaction(ctx context.Context) error {
  269. if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
  270. return tx.Commit(context.Background())
  271. }
  272. return nil
  273. }
  274. func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
  275. if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
  276. return tx.Rollback()
  277. }
  278. return nil
  279. }
  280. // ~ Transaction Related APIs
  281. // Transaction Wrapper
  282. type TxnWrapper struct {
  283. *txnkv.KVTxn
  284. inContext bool
  285. }
  286. func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error {
  287. err := f(w.KVTxn)
  288. if !w.inContext {
  289. if err != nil {
  290. w.KVTxn.Rollback()
  291. return err
  292. }
  293. w.KVTxn.Commit(context.Background())
  294. return nil
  295. }
  296. return err
  297. }
  298. func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) {
  299. if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
  300. return &TxnWrapper{tx, true}, nil
  301. }
  302. txn, err := store.client.Begin()
  303. if err != nil {
  304. return nil, err
  305. }
  306. return &TxnWrapper{txn, false}, nil
  307. }
  308. // ~ Transaction Wrapper
  309. // Encoding Functions
  310. func hashToBytes(dir string) []byte {
  311. h := sha1.New()
  312. io.WriteString(h, dir)
  313. b := h.Sum(nil)
  314. return b
  315. }
  316. func generateKey(dirPath, fileName string) []byte {
  317. key := hashToBytes(dirPath)
  318. key = append(key, []byte(fileName)...)
  319. return key
  320. }
  321. func getNameFromKey(key []byte) string {
  322. return string(key[sha1.Size:])
  323. }
  324. func genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) {
  325. keyPrefix = hashToBytes(string(fullpath))
  326. if len(startFileName) > 0 {
  327. keyPrefix = append(keyPrefix, []byte(startFileName)...)
  328. }
  329. return keyPrefix
  330. }
  331. func isNotExists(err error) bool {
  332. if err == nil {
  333. return false
  334. }
  335. if err.Error() == "not exist" {
  336. return true
  337. }
  338. return false
  339. }
  340. // ~ Encoding Functions