You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

219 lines
4.9 KiB

5 years ago
5 years ago
5 years ago
  1. // +build !386
  2. // +build !arm
  3. package tikv
  4. import (
  5. "bytes"
  6. "context"
  7. "crypto/md5"
  8. "fmt"
  9. "io"
  10. "github.com/chrislusf/seaweedfs/weed/filer2"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  13. "github.com/pingcap/tidb/kv"
  14. "github.com/pingcap/tidb/store/tikv"
  15. )
  16. func init() {
  17. filer2.Stores = append(filer2.Stores, &TikvStore{})
  18. }
  19. type TikvStore struct {
  20. store kv.Storage
  21. }
  22. func (store *TikvStore) GetName() string {
  23. return "tikv"
  24. }
  25. func (store *TikvStore) Initialize(configuration weed_util.Configuration) (err error) {
  26. pdAddr := configuration.GetString("pdAddress")
  27. return store.initialize(pdAddr)
  28. }
  29. func (store *TikvStore) initialize(pdAddr string) (err error) {
  30. glog.Infof("filer store tikv pd address: %s", pdAddr)
  31. driver := tikv.Driver{}
  32. store.store, err = driver.Open(fmt.Sprintf("tikv://%s", pdAddr))
  33. if err != nil {
  34. return fmt.Errorf("open tikv %s : %v", pdAddr, err)
  35. }
  36. return
  37. }
  38. func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  39. tx, err := store.store.Begin()
  40. if err != nil {
  41. return ctx, err
  42. }
  43. return context.WithValue(ctx, "tx", tx), nil
  44. }
  45. func (store *TikvStore) CommitTransaction(ctx context.Context) error {
  46. tx, ok := ctx.Value("tx").(kv.Transaction)
  47. if ok {
  48. return tx.Commit(ctx)
  49. }
  50. return nil
  51. }
  52. func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
  53. tx, ok := ctx.Value("tx").(kv.Transaction)
  54. if ok {
  55. return tx.Rollback()
  56. }
  57. return nil
  58. }
  59. func (store *TikvStore) getTx(ctx context.Context) kv.Transaction {
  60. if tx, ok := ctx.Value("tx").(kv.Transaction); ok {
  61. return tx
  62. }
  63. return nil
  64. }
  65. func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  66. dir, name := entry.DirAndName()
  67. key := genKey(dir, name)
  68. value, err := entry.EncodeAttributesAndChunks()
  69. if err != nil {
  70. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  71. }
  72. err = store.getTx(ctx).Set(key, value)
  73. if err != nil {
  74. return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
  75. }
  76. // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
  77. return nil
  78. }
  79. func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  80. return store.InsertEntry(ctx, entry)
  81. }
  82. func (store *TikvStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
  83. dir, name := fullpath.DirAndName()
  84. key := genKey(dir, name)
  85. data, err := store.getTx(ctx).Get(ctx, key)
  86. if err == kv.ErrNotExist {
  87. return nil, filer2.ErrNotFound
  88. }
  89. if err != nil {
  90. return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
  91. }
  92. entry = &filer2.Entry{
  93. FullPath: fullpath,
  94. }
  95. err = entry.DecodeAttributesAndChunks(data)
  96. if err != nil {
  97. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  98. }
  99. // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
  100. return entry, nil
  101. }
  102. func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
  103. dir, name := fullpath.DirAndName()
  104. key := genKey(dir, name)
  105. err = store.getTx(ctx).Delete(key)
  106. if err != nil {
  107. return fmt.Errorf("delete %s : %v", fullpath, err)
  108. }
  109. return nil
  110. }
  111. func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
  112. limit int) (entries []*filer2.Entry, err error) {
  113. directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
  114. lastFileStart := genDirectoryKeyPrefix(fullpath, startFileName)
  115. iter, err := store.getTx(ctx).Iter(lastFileStart, nil)
  116. if err != nil {
  117. return nil, fmt.Errorf("list %s: %v", fullpath, err)
  118. }
  119. defer iter.Close()
  120. for iter.Valid() {
  121. key := iter.Key()
  122. if !bytes.HasPrefix(key, directoryPrefix) {
  123. break
  124. }
  125. fileName := getNameFromKey(key)
  126. if fileName == "" {
  127. iter.Next()
  128. continue
  129. }
  130. if fileName == startFileName && !inclusive {
  131. iter.Next()
  132. continue
  133. }
  134. limit--
  135. if limit < 0 {
  136. break
  137. }
  138. entry := &filer2.Entry{
  139. FullPath: filer2.NewFullPath(string(fullpath), fileName),
  140. }
  141. // println("list", entry.FullPath, "chunks", len(entry.Chunks))
  142. if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
  143. err = decodeErr
  144. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  145. break
  146. }
  147. entries = append(entries, entry)
  148. iter.Next()
  149. }
  150. return entries, err
  151. }
  152. func genKey(dirPath, fileName string) (key []byte) {
  153. key = hashToBytes(dirPath)
  154. key = append(key, []byte(fileName)...)
  155. return key
  156. }
  157. func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
  158. keyPrefix = hashToBytes(string(fullpath))
  159. if len(startFileName) > 0 {
  160. keyPrefix = append(keyPrefix, []byte(startFileName)...)
  161. }
  162. return keyPrefix
  163. }
  164. func getNameFromKey(key []byte) string {
  165. return string(key[md5.Size:])
  166. }
  167. // hash directory
  168. func hashToBytes(dir string) []byte {
  169. h := md5.New()
  170. io.WriteString(h, dir)
  171. b := h.Sum(nil)
  172. return b
  173. }