You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

389 lines
8.8 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. //go:build tikv
  2. // +build tikv
  3. package tikv
  4. import (
  5. "bytes"
  6. "context"
  7. "crypto/sha1"
  8. "fmt"
  9. "io"
  10. "strings"
  11. "github.com/chrislusf/seaweedfs/weed/filer"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. "github.com/tikv/client-go/v2/tikv"
  16. "github.com/tikv/client-go/v2/txnkv"
  17. )
  18. var (
  19. _ filer.FilerStore = ((*TikvStore)(nil))
  20. )
  21. func init() {
  22. filer.Stores = append(filer.Stores, &TikvStore{})
  23. }
  24. type TikvStore struct {
  25. client *tikv.KVStore
  26. deleteRangeConcurrency int
  27. }
  28. // Basic APIs
  29. func (store *TikvStore) GetName() string {
  30. return "tikv"
  31. }
  32. func (store *TikvStore) Initialize(config util.Configuration, prefix string) error {
  33. pdAddrs := []string{}
  34. pdAddrsStr := config.GetString(prefix + "pdaddrs")
  35. for _, item := range strings.Split(pdAddrsStr, ",") {
  36. pdAddrs = append(pdAddrs, strings.TrimSpace(item))
  37. }
  38. drc := config.GetInt(prefix + "deleterange_concurrency")
  39. if drc <= 0 {
  40. drc = 1
  41. }
  42. store.deleteRangeConcurrency = drc
  43. return store.initialize(pdAddrs)
  44. }
  45. func (store *TikvStore) initialize(pdAddrs []string) error {
  46. client, err := tikv.NewTxnClient(pdAddrs)
  47. store.client = client
  48. return err
  49. }
  50. func (store *TikvStore) Shutdown() {
  51. err := store.client.Close()
  52. if err != nil {
  53. glog.V(0).Infof("Shutdown TiKV client got error: %v", err)
  54. }
  55. }
  56. // ~ Basic APIs
  57. // Entry APIs
  58. func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
  59. dir, name := entry.DirAndName()
  60. key := generateKey(dir, name)
  61. value, err := entry.EncodeAttributesAndChunks()
  62. if err != nil {
  63. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  64. }
  65. txn, err := store.getTxn(ctx)
  66. if err != nil {
  67. return err
  68. }
  69. err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
  70. return txn.Set(key, value)
  71. })
  72. if err != nil {
  73. return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
  74. }
  75. return nil
  76. }
  77. func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
  78. return store.InsertEntry(ctx, entry)
  79. }
  80. func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*filer.Entry, error) {
  81. dir, name := path.DirAndName()
  82. key := generateKey(dir, name)
  83. txn, err := store.getTxn(ctx)
  84. if err != nil {
  85. return nil, err
  86. }
  87. var value []byte = nil
  88. err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
  89. val, err := txn.Get(context.TODO(), key)
  90. if err == nil {
  91. value = val
  92. }
  93. return err
  94. })
  95. if isNotExists(err) || value == nil {
  96. return nil, filer_pb.ErrNotFound
  97. }
  98. if err != nil {
  99. return nil, fmt.Errorf("get %s : %v", path, err)
  100. }
  101. entry := &filer.Entry{
  102. FullPath: path,
  103. }
  104. err = entry.DecodeAttributesAndChunks(value)
  105. if err != nil {
  106. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  107. }
  108. return entry, nil
  109. }
  110. func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) error {
  111. dir, name := path.DirAndName()
  112. key := generateKey(dir, name)
  113. txn, err := store.getTxn(ctx)
  114. if err != nil {
  115. return err
  116. }
  117. err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
  118. return txn.Delete(key)
  119. })
  120. if err != nil {
  121. return fmt.Errorf("delete %s : %v", path, err)
  122. }
  123. return nil
  124. }
  125. // ~ Entry APIs
  126. // Directory APIs
  127. func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error {
  128. directoryPrefix := genDirectoryKeyPrefix(path, "")
  129. txn, err := store.getTxn(ctx)
  130. if err != nil {
  131. return err
  132. }
  133. var (
  134. startKey []byte = nil
  135. endKey []byte = nil
  136. )
  137. err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
  138. iter, err := txn.Iter(directoryPrefix, nil)
  139. if err != nil {
  140. return err
  141. }
  142. defer iter.Close()
  143. for iter.Valid() {
  144. key := iter.Key()
  145. endKey = key
  146. if !bytes.HasPrefix(key, directoryPrefix) {
  147. break
  148. }
  149. if startKey == nil {
  150. startKey = key
  151. }
  152. err = iter.Next()
  153. if err != nil {
  154. return err
  155. }
  156. }
  157. // Only one Key matched just delete it.
  158. if startKey != nil && bytes.Equal(startKey, endKey) {
  159. return txn.Delete(startKey)
  160. }
  161. return nil
  162. })
  163. if err != nil {
  164. return fmt.Errorf("delete %s : %v", path, err)
  165. }
  166. if startKey != nil && endKey != nil && !bytes.Equal(startKey, endKey) {
  167. // has startKey and endKey and they are not equals, so use delete range
  168. _, err = store.client.DeleteRange(context.Background(), startKey, endKey, store.deleteRangeConcurrency)
  169. if err != nil {
  170. return fmt.Errorf("delete %s : %v", path, err)
  171. }
  172. }
  173. return err
  174. }
  175. func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
  176. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
  177. }
  178. func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
  179. lastFileName := ""
  180. directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
  181. lastFileStart := directoryPrefix
  182. if startFileName != "" {
  183. lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName)
  184. }
  185. txn, err := store.getTxn(ctx)
  186. if err != nil {
  187. return lastFileName, err
  188. }
  189. err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
  190. iter, err := txn.Iter(lastFileStart, nil)
  191. if err != nil {
  192. return err
  193. }
  194. defer iter.Close()
  195. i := int64(0)
  196. first := true
  197. for iter.Valid() {
  198. if first {
  199. first = false
  200. if !includeStartFile {
  201. if iter.Valid() {
  202. // Check first item is lastFileStart
  203. if bytes.Equal(iter.Key(), lastFileStart) {
  204. // Is lastFileStart and not include start file, just
  205. // ignore it.
  206. err = iter.Next()
  207. if err != nil {
  208. return err
  209. }
  210. continue
  211. }
  212. }
  213. }
  214. }
  215. // Check for limitation
  216. if limit > 0 {
  217. i++
  218. if i > limit {
  219. break
  220. }
  221. }
  222. // Validate key prefix
  223. key := iter.Key()
  224. if !bytes.HasPrefix(key, directoryPrefix) {
  225. break
  226. }
  227. value := iter.Value()
  228. // Start process
  229. fileName := getNameFromKey(key)
  230. if fileName != "" {
  231. // Got file name, then generate the Entry
  232. entry := &filer.Entry{
  233. FullPath: util.NewFullPath(string(dirPath), fileName),
  234. }
  235. // Update lastFileName
  236. lastFileName = fileName
  237. // Check for decode value.
  238. if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
  239. // Got error just return the error
  240. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  241. return err
  242. }
  243. // Run for each callback if return false just break the iteration
  244. if !eachEntryFunc(entry) {
  245. break
  246. }
  247. }
  248. // End process
  249. err = iter.Next()
  250. if err != nil {
  251. return err
  252. }
  253. }
  254. return nil
  255. })
  256. if err != nil {
  257. return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
  258. }
  259. return lastFileName, nil
  260. }
  261. // ~ Directory APIs
  262. // Transaction Related APIs
  263. func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  264. tx, err := store.client.Begin()
  265. if err != nil {
  266. return ctx, err
  267. }
  268. return context.WithValue(ctx, "tx", tx), nil
  269. }
  270. func (store *TikvStore) CommitTransaction(ctx context.Context) error {
  271. if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
  272. return tx.Commit(context.Background())
  273. }
  274. return nil
  275. }
  276. func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
  277. if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
  278. return tx.Rollback()
  279. }
  280. return nil
  281. }
  282. // ~ Transaction Related APIs
  283. // Transaction Wrapper
  284. type TxnWrapper struct {
  285. *txnkv.KVTxn
  286. inContext bool
  287. }
  288. func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error {
  289. err := f(w.KVTxn)
  290. if !w.inContext {
  291. if err != nil {
  292. w.KVTxn.Rollback()
  293. return err
  294. }
  295. w.KVTxn.Commit(context.Background())
  296. return nil
  297. }
  298. return err
  299. }
  300. func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) {
  301. if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
  302. return &TxnWrapper{tx, true}, nil
  303. }
  304. txn, err := store.client.Begin()
  305. if err != nil {
  306. return nil, err
  307. }
  308. return &TxnWrapper{txn, false}, nil
  309. }
  310. // ~ Transaction Wrapper
  311. // Encoding Functions
  312. func hashToBytes(dir string) []byte {
  313. h := sha1.New()
  314. io.WriteString(h, dir)
  315. b := h.Sum(nil)
  316. return b
  317. }
  318. func generateKey(dirPath, fileName string) []byte {
  319. key := hashToBytes(dirPath)
  320. key = append(key, []byte(fileName)...)
  321. return key
  322. }
  323. func getNameFromKey(key []byte) string {
  324. return string(key[sha1.Size:])
  325. }
  326. func genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) {
  327. keyPrefix = hashToBytes(string(fullpath))
  328. if len(startFileName) > 0 {
  329. keyPrefix = append(keyPrefix, []byte(startFileName)...)
  330. }
  331. return keyPrefix
  332. }
  333. func isNotExists(err error) bool {
  334. if err == nil {
  335. return false
  336. }
  337. if err.Error() == "not exist" {
  338. return true
  339. }
  340. return false
  341. }
  342. // ~ Encoding Functions