You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

244 lines
6.4 KiB

7 years ago
4 years ago
7 years ago
4 years ago
6 years ago
6 years ago
6 years ago
5 years ago
5 years ago
7 years ago
  1. package leveldb
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "github.com/syndtr/goleveldb/leveldb"
  7. leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
  8. "github.com/syndtr/goleveldb/leveldb/filter"
  9. "github.com/syndtr/goleveldb/leveldb/opt"
  10. leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
  11. "os"
  12. "github.com/chrislusf/seaweedfs/weed/filer"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  15. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  16. )
  17. const (
  18. DIR_FILE_SEPARATOR = byte(0x00)
  19. )
  20. func init() {
  21. filer.Stores = append(filer.Stores, &LevelDBStore{})
  22. }
  23. type LevelDBStore struct {
  24. db *leveldb.DB
  25. }
  26. func (store *LevelDBStore) GetName() string {
  27. return "leveldb"
  28. }
  29. func (store *LevelDBStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
  30. dir := configuration.GetString(prefix + "dir")
  31. return store.initialize(dir)
  32. }
  33. func (store *LevelDBStore) initialize(dir string) (err error) {
  34. glog.Infof("filer store dir: %s", dir)
  35. os.MkdirAll(dir, 0755)
  36. if err := weed_util.TestFolderWritable(dir); err != nil {
  37. return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
  38. }
  39. opts := &opt.Options{
  40. BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
  41. WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
  42. Filter: filter.NewBloomFilter(8), // false positive rate 0.02
  43. }
  44. if store.db, err = leveldb.OpenFile(dir, opts); err != nil {
  45. if leveldb_errors.IsCorrupted(err) {
  46. store.db, err = leveldb.RecoverFile(dir, opts)
  47. }
  48. if err != nil {
  49. glog.Infof("filer store open dir %s: %v", dir, err)
  50. return
  51. }
  52. }
  53. return
  54. }
  55. func (store *LevelDBStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  56. return ctx, nil
  57. }
  58. func (store *LevelDBStore) CommitTransaction(ctx context.Context) error {
  59. return nil
  60. }
  61. func (store *LevelDBStore) RollbackTransaction(ctx context.Context) error {
  62. return nil
  63. }
  64. func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  65. key := genKey(entry.DirAndName())
  66. value, err := entry.EncodeAttributesAndChunks()
  67. if err != nil {
  68. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  69. }
  70. if len(entry.Chunks) > 50 {
  71. value = weed_util.MaybeGzipData(value)
  72. }
  73. err = store.db.Put(key, value, nil)
  74. if err != nil {
  75. return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
  76. }
  77. // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
  78. return nil
  79. }
  80. func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  81. return store.InsertEntry(ctx, entry)
  82. }
  83. func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
  84. key := genKey(fullpath.DirAndName())
  85. data, err := store.db.Get(key, nil)
  86. if err == leveldb.ErrNotFound {
  87. return nil, filer_pb.ErrNotFound
  88. }
  89. if err != nil {
  90. return nil, fmt.Errorf("get %s : %v", fullpath, err)
  91. }
  92. entry = &filer.Entry{
  93. FullPath: fullpath,
  94. }
  95. err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData((data)))
  96. if err != nil {
  97. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  98. }
  99. // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
  100. return entry, nil
  101. }
  102. func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  103. key := genKey(fullpath.DirAndName())
  104. err = store.db.Delete(key, nil)
  105. if err != nil {
  106. return fmt.Errorf("delete %s : %v", fullpath, err)
  107. }
  108. return nil
  109. }
  110. func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  111. batch := new(leveldb.Batch)
  112. directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
  113. iter := store.db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
  114. for iter.Next() {
  115. key := iter.Key()
  116. if !bytes.HasPrefix(key, directoryPrefix) {
  117. break
  118. }
  119. fileName := getNameFromKey(key)
  120. if fileName == "" {
  121. continue
  122. }
  123. batch.Delete([]byte(genKey(string(fullpath), fileName)))
  124. }
  125. iter.Release()
  126. err = store.db.Write(batch, nil)
  127. if err != nil {
  128. return fmt.Errorf("delete %s : %v", fullpath, err)
  129. }
  130. return nil
  131. }
  132. func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  133. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
  134. }
  135. func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  136. directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
  137. lastFileStart := directoryPrefix
  138. if startFileName != "" {
  139. lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName)
  140. }
  141. iter := store.db.NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
  142. for iter.Next() {
  143. key := iter.Key()
  144. if !bytes.HasPrefix(key, directoryPrefix) {
  145. break
  146. }
  147. fileName := getNameFromKey(key)
  148. if fileName == "" {
  149. continue
  150. }
  151. if fileName == startFileName && !includeStartFile {
  152. continue
  153. }
  154. limit--
  155. if limit < 0 {
  156. break
  157. }
  158. lastFileName = fileName
  159. entry := &filer.Entry{
  160. FullPath: weed_util.NewFullPath(string(dirPath), fileName),
  161. }
  162. if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
  163. err = decodeErr
  164. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  165. break
  166. }
  167. if !eachEntryFunc(entry) {
  168. break
  169. }
  170. }
  171. iter.Release()
  172. return lastFileName, err
  173. }
  174. func genKey(dirPath, fileName string) (key []byte) {
  175. key = []byte(dirPath)
  176. key = append(key, DIR_FILE_SEPARATOR)
  177. key = append(key, []byte(fileName)...)
  178. return key
  179. }
  180. func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
  181. keyPrefix = []byte(string(fullpath))
  182. keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
  183. if len(startFileName) > 0 {
  184. keyPrefix = append(keyPrefix, []byte(startFileName)...)
  185. }
  186. return keyPrefix
  187. }
  188. func getNameFromKey(key []byte) string {
  189. sepIndex := len(key) - 1
  190. for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR {
  191. sepIndex--
  192. }
  193. return string(key[sepIndex+1:])
  194. }
  195. func (store *LevelDBStore) Shutdown() {
  196. store.db.Close()
  197. }