You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

293 lines
6.8 KiB

  1. // +build rocksdb
  2. package rocksdb
  3. import (
  4. "bytes"
  5. "context"
  6. "crypto/md5"
  7. "fmt"
  8. "github.com/chrislusf/seaweedfs/weed/filer"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  12. "github.com/tecbot/gorocksdb"
  13. "io"
  14. )
  15. func init() {
  16. filer.Stores = append(filer.Stores, &RocksDBStore{})
  17. }
  18. type RocksDBStore struct {
  19. path string
  20. db *gorocksdb.DB
  21. }
  22. func (store *RocksDBStore) GetName() string {
  23. return "rocksdb"
  24. }
  25. func (store *RocksDBStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
  26. dir := configuration.GetString(prefix + "dir")
  27. return store.initialize(dir)
  28. }
  29. func (store *RocksDBStore) initialize(dir string) (err error) {
  30. glog.Infof("filer store rocksdb dir: %s", dir)
  31. if err := weed_util.TestFolderWritable(dir); err != nil {
  32. return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
  33. }
  34. options := gorocksdb.NewDefaultOptions()
  35. options.SetCreateIfMissing(true)
  36. store.db, err = gorocksdb.OpenDb(options, dir)
  37. return
  38. }
  39. func (store *RocksDBStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  40. return ctx, nil
  41. }
  42. func (store *RocksDBStore) CommitTransaction(ctx context.Context) error {
  43. return nil
  44. }
  45. func (store *RocksDBStore) RollbackTransaction(ctx context.Context) error {
  46. return nil
  47. }
  48. func (store *RocksDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  49. dir, name := entry.DirAndName()
  50. key := genKey(dir, name)
  51. value, err := entry.EncodeAttributesAndChunks()
  52. if err != nil {
  53. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  54. }
  55. wo := gorocksdb.NewDefaultWriteOptions()
  56. err = store.db.Put(wo, key, value)
  57. if err != nil {
  58. return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
  59. }
  60. // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
  61. return nil
  62. }
  63. func (store *RocksDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  64. return store.InsertEntry(ctx, entry)
  65. }
  66. func (store *RocksDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
  67. dir, name := fullpath.DirAndName()
  68. key := genKey(dir, name)
  69. ro := gorocksdb.NewDefaultReadOptions()
  70. data, err := store.db.GetBytes(ro, key)
  71. if data == nil {
  72. return nil, filer_pb.ErrNotFound
  73. }
  74. if err != nil {
  75. return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
  76. }
  77. entry = &filer.Entry{
  78. FullPath: fullpath,
  79. }
  80. err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data))
  81. if err != nil {
  82. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  83. }
  84. // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
  85. return entry, nil
  86. }
  87. func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  88. dir, name := fullpath.DirAndName()
  89. key := genKey(dir, name)
  90. wo := gorocksdb.NewDefaultWriteOptions()
  91. err = store.db.Delete(wo, key)
  92. if err != nil {
  93. return fmt.Errorf("delete %s : %v", fullpath, err)
  94. }
  95. return nil
  96. }
  97. func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  98. directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
  99. batch := new(gorocksdb.WriteBatch)
  100. ro := gorocksdb.NewDefaultReadOptions()
  101. ro.SetFillCache(false)
  102. iter := store.db.NewIterator(ro)
  103. defer iter.Close()
  104. err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool {
  105. batch.Delete(key)
  106. return true
  107. })
  108. if err != nil {
  109. return fmt.Errorf("delete list %s : %v", fullpath, err)
  110. }
  111. wo := gorocksdb.NewDefaultWriteOptions()
  112. err = store.db.Write(wo, batch)
  113. if err != nil {
  114. return fmt.Errorf("delete %s : %v", fullpath, err)
  115. }
  116. return nil
  117. }
  118. func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) error {
  119. if len(lastKey) == 0 {
  120. iter.Seek(prefix)
  121. } else {
  122. iter.Seek(lastKey)
  123. if !includeLastKey {
  124. k := iter.Key()
  125. v := iter.Value()
  126. key := k.Data()
  127. defer k.Free()
  128. defer v.Free()
  129. if !bytes.HasPrefix(key, prefix) {
  130. return nil
  131. }
  132. if bytes.Equal(key, lastKey) {
  133. iter.Next()
  134. }
  135. }
  136. }
  137. i := 0
  138. for ; iter.Valid(); iter.Next() {
  139. if limit > 0 {
  140. i++
  141. if i > limit {
  142. break
  143. }
  144. }
  145. k := iter.Key()
  146. v := iter.Value()
  147. key := k.Data()
  148. value := v.Data()
  149. if !bytes.HasPrefix(key, prefix) {
  150. k.Free()
  151. v.Free()
  152. break
  153. }
  154. ret := fn(key, value)
  155. k.Free()
  156. v.Free()
  157. if !ret {
  158. break
  159. }
  160. }
  161. if err := iter.Err(); err != nil {
  162. return fmt.Errorf("prefix scan iterator: %v", err)
  163. }
  164. return nil
  165. }
  166. func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
  167. limit int) (entries []*filer.Entry, err error) {
  168. return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
  169. }
  170. func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
  171. directoryPrefix := genDirectoryKeyPrefix(fullpath, prefix)
  172. lastFileStart := directoryPrefix
  173. if startFileName != "" {
  174. lastFileStart = genDirectoryKeyPrefix(fullpath, startFileName)
  175. }
  176. ro := gorocksdb.NewDefaultReadOptions()
  177. ro.SetFillCache(false)
  178. iter := store.db.NewIterator(ro)
  179. defer iter.Close()
  180. err = enumerate(iter, directoryPrefix, lastFileStart, inclusive, limit, func(key, value []byte) bool {
  181. fileName := getNameFromKey(key)
  182. if fileName == "" {
  183. return true
  184. }
  185. limit--
  186. if limit < 0 {
  187. return false
  188. }
  189. entry := &filer.Entry{
  190. FullPath: weed_util.NewFullPath(string(fullpath), fileName),
  191. }
  192. // println("list", entry.FullPath, "chunks", len(entry.Chunks))
  193. if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(value)); decodeErr != nil {
  194. err = decodeErr
  195. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  196. return false
  197. }
  198. entries = append(entries, entry)
  199. return true
  200. })
  201. if err != nil {
  202. return entries, fmt.Errorf("prefix list %s : %v", fullpath, err)
  203. }
  204. return entries, err
  205. }
  206. func genKey(dirPath, fileName string) (key []byte) {
  207. key = hashToBytes(dirPath)
  208. key = append(key, []byte(fileName)...)
  209. return key
  210. }
  211. func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
  212. keyPrefix = hashToBytes(string(fullpath))
  213. if len(startFileName) > 0 {
  214. keyPrefix = append(keyPrefix, []byte(startFileName)...)
  215. }
  216. return keyPrefix
  217. }
  218. func getNameFromKey(key []byte) string {
  219. return string(key[md5.Size:])
  220. }
  221. // hash directory, and use last byte for partitioning
  222. func hashToBytes(dir string) []byte {
  223. h := md5.New()
  224. io.WriteString(h, dir)
  225. b := h.Sum(nil)
  226. return b
  227. }
  228. func (store *RocksDBStore) Shutdown() {
  229. store.db.Close()
  230. }