You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

252 lines
5.6 KiB

5 years ago
5 years ago
5 years ago
  1. // +build !386
  2. // +build !arm
  3. package tikv
  4. import (
  5. "bytes"
  6. "context"
  7. "crypto/md5"
  8. "fmt"
  9. "io"
  10. "github.com/chrislusf/seaweedfs/weed/filer2"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  14. "github.com/pingcap/tidb/kv"
  15. "github.com/pingcap/tidb/store/tikv"
  16. )
  17. func init() {
  18. filer2.Stores = append(filer2.Stores, &TikvStore{})
  19. }
  20. type TikvStore struct {
  21. store kv.Storage
  22. }
  23. func (store *TikvStore) GetName() string {
  24. return "tikv"
  25. }
  26. func (store *TikvStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
  27. pdAddr := configuration.GetString(prefix + "pdAddress")
  28. return store.initialize(pdAddr)
  29. }
  30. func (store *TikvStore) initialize(pdAddr string) (err error) {
  31. glog.Infof("filer store tikv pd address: %s", pdAddr)
  32. driver := tikv.Driver{}
  33. store.store, err = driver.Open(fmt.Sprintf("tikv://%s", pdAddr))
  34. if err != nil {
  35. return fmt.Errorf("open tikv %s : %v", pdAddr, err)
  36. }
  37. return
  38. }
  39. func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  40. tx, err := store.store.Begin()
  41. if err != nil {
  42. return ctx, err
  43. }
  44. return context.WithValue(ctx, "tx", tx), nil
  45. }
  46. func (store *TikvStore) CommitTransaction(ctx context.Context) error {
  47. tx, ok := ctx.Value("tx").(kv.Transaction)
  48. if ok {
  49. return tx.Commit(ctx)
  50. }
  51. return nil
  52. }
  53. func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
  54. tx, ok := ctx.Value("tx").(kv.Transaction)
  55. if ok {
  56. return tx.Rollback()
  57. }
  58. return nil
  59. }
  60. func (store *TikvStore) getTx(ctx context.Context) kv.Transaction {
  61. if tx, ok := ctx.Value("tx").(kv.Transaction); ok {
  62. return tx
  63. }
  64. return nil
  65. }
  66. func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  67. dir, name := entry.DirAndName()
  68. key := genKey(dir, name)
  69. value, err := entry.EncodeAttributesAndChunks()
  70. if err != nil {
  71. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  72. }
  73. err = store.getTx(ctx).Set(key, value)
  74. if err != nil {
  75. return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
  76. }
  77. // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
  78. return nil
  79. }
  80. func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  81. return store.InsertEntry(ctx, entry)
  82. }
  83. func (store *TikvStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
  84. dir, name := fullpath.DirAndName()
  85. key := genKey(dir, name)
  86. data, err := store.getTx(ctx).Get(ctx, key)
  87. if err == kv.ErrNotExist {
  88. return nil, filer_pb.ErrNotFound
  89. }
  90. if err != nil {
  91. return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
  92. }
  93. entry = &filer2.Entry{
  94. FullPath: fullpath,
  95. }
  96. err = entry.DecodeAttributesAndChunks(data)
  97. if err != nil {
  98. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  99. }
  100. // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
  101. return entry, nil
  102. }
  103. func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
  104. dir, name := fullpath.DirAndName()
  105. key := genKey(dir, name)
  106. err = store.getTx(ctx).Delete(key)
  107. if err != nil {
  108. return fmt.Errorf("delete %s : %v", fullpath, err)
  109. }
  110. return nil
  111. }
  112. func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
  113. directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
  114. tx := store.getTx(ctx)
  115. iter, err := tx.Iter(directoryPrefix, nil)
  116. if err != nil {
  117. return fmt.Errorf("deleteFolderChildren %s: %v", fullpath, err)
  118. }
  119. defer iter.Close()
  120. for iter.Valid() {
  121. key := iter.Key()
  122. if !bytes.HasPrefix(key, directoryPrefix) {
  123. break
  124. }
  125. fileName := getNameFromKey(key)
  126. if fileName == "" {
  127. iter.Next()
  128. continue
  129. }
  130. if err = tx.Delete(genKey(string(fullpath), fileName)); err != nil {
  131. return fmt.Errorf("delete %s : %v", fullpath, err)
  132. }
  133. iter.Next()
  134. }
  135. return nil
  136. }
  137. func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
  138. limit int) (entries []*filer2.Entry, err error) {
  139. directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
  140. lastFileStart := genDirectoryKeyPrefix(fullpath, startFileName)
  141. iter, err := store.getTx(ctx).Iter(lastFileStart, nil)
  142. if err != nil {
  143. return nil, fmt.Errorf("list %s: %v", fullpath, err)
  144. }
  145. defer iter.Close()
  146. for iter.Valid() {
  147. key := iter.Key()
  148. if !bytes.HasPrefix(key, directoryPrefix) {
  149. break
  150. }
  151. fileName := getNameFromKey(key)
  152. if fileName == "" {
  153. iter.Next()
  154. continue
  155. }
  156. if fileName == startFileName && !inclusive {
  157. iter.Next()
  158. continue
  159. }
  160. limit--
  161. if limit < 0 {
  162. break
  163. }
  164. entry := &filer2.Entry{
  165. FullPath: filer2.NewFullPath(string(fullpath), fileName),
  166. }
  167. // println("list", entry.FullPath, "chunks", len(entry.Chunks))
  168. if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
  169. err = decodeErr
  170. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  171. break
  172. }
  173. entries = append(entries, entry)
  174. iter.Next()
  175. }
  176. return entries, err
  177. }
  178. func genKey(dirPath, fileName string) (key []byte) {
  179. key = hashToBytes(dirPath)
  180. key = append(key, []byte(fileName)...)
  181. return key
  182. }
  183. func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
  184. keyPrefix = hashToBytes(string(fullpath))
  185. if len(startFileName) > 0 {
  186. keyPrefix = append(keyPrefix, []byte(startFileName)...)
  187. }
  188. return keyPrefix
  189. }
  190. func getNameFromKey(key []byte) string {
  191. return string(key[md5.Size:])
  192. }
  193. // hash directory
  194. func hashToBytes(dir string) []byte {
  195. h := md5.New()
  196. io.WriteString(h, dir)
  197. b := h.Sum(nil)
  198. return b
  199. }