You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

361 lines
8.9 KiB

  1. package leveldb
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "fmt"
  7. "github.com/syndtr/goleveldb/leveldb"
  8. leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
  9. "github.com/syndtr/goleveldb/leveldb/opt"
  10. leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
  11. "io"
  12. "os"
  13. "strings"
  14. "sync"
  15. "github.com/chrislusf/seaweedfs/weed/filer"
  16. "github.com/chrislusf/seaweedfs/weed/glog"
  17. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  18. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  19. )
  20. const (
  21. DEFAULT = "_main"
  22. )
  23. func init() {
  24. filer.Stores = append(filer.Stores, &LevelDB3Store{})
  25. }
  26. type LevelDB3Store struct {
  27. dir string
  28. dbs map[string]*leveldb.DB
  29. dbsLock sync.RWMutex
  30. }
  31. func (store *LevelDB3Store) GetName() string {
  32. return "leveldb3"
  33. }
  34. func (store *LevelDB3Store) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
  35. dir := configuration.GetString(prefix + "dir")
  36. return store.initialize(dir)
  37. }
  38. func (store *LevelDB3Store) initialize(dir string) (err error) {
  39. glog.Infof("filer store leveldb3 dir: %s", dir)
  40. if err := weed_util.TestFolderWritable(dir); err != nil {
  41. return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
  42. }
  43. store.dir = dir
  44. db, loadDbErr := store.loadDB(DEFAULT)
  45. if loadDbErr != nil {
  46. return loadDbErr
  47. }
  48. store.dbs = make(map[string]*leveldb.DB)
  49. store.dbs[DEFAULT] = db
  50. return
  51. }
  52. func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) {
  53. opts := &opt.Options{
  54. BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
  55. WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
  56. CompactionTableSizeMultiplier: 4,
  57. }
  58. if name != DEFAULT {
  59. opts = &opt.Options{
  60. BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
  61. WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
  62. CompactionTableSizeMultiplier: 4,
  63. }
  64. }
  65. dbFolder := fmt.Sprintf("%s/%s", store.dir, name)
  66. os.MkdirAll(dbFolder, 0755)
  67. db, dbErr := leveldb.OpenFile(dbFolder, opts)
  68. if leveldb_errors.IsCorrupted(dbErr) {
  69. db, dbErr = leveldb.RecoverFile(dbFolder, opts)
  70. }
  71. if dbErr != nil {
  72. glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr)
  73. return nil, dbErr
  74. }
  75. return db, nil
  76. }
  77. func (store *LevelDB3Store) findDB(fullpath weed_util.FullPath, isForChildren bool) (*leveldb.DB, string, weed_util.FullPath, error) {
  78. store.dbsLock.RLock()
  79. defaultDB := store.dbs[DEFAULT]
  80. if !strings.HasPrefix(string(fullpath), "/buckets/") {
  81. store.dbsLock.RUnlock()
  82. return defaultDB, DEFAULT, fullpath, nil
  83. }
  84. // detect bucket
  85. bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
  86. t := strings.Index(bucketAndObjectKey, "/")
  87. if t < 0 && !isForChildren {
  88. store.dbsLock.RUnlock()
  89. return defaultDB, DEFAULT, fullpath, nil
  90. }
  91. bucket := bucketAndObjectKey
  92. shortPath := weed_util.FullPath("/")
  93. if t > 0 {
  94. bucket = bucketAndObjectKey[:t]
  95. shortPath = weed_util.FullPath(bucketAndObjectKey[t:])
  96. }
  97. if db, found := store.dbs[bucket]; found {
  98. store.dbsLock.RUnlock()
  99. return db, bucket, shortPath, nil
  100. }
  101. store.dbsLock.RUnlock()
  102. // upgrade to write lock
  103. store.dbsLock.Lock()
  104. defer store.dbsLock.Unlock()
  105. // double check after getting the write lock
  106. if db, found := store.dbs[bucket]; found {
  107. return db, bucket, shortPath, nil
  108. }
  109. // create db
  110. db, err := store.loadDB(bucket)
  111. if err != nil {
  112. return nil, bucket, shortPath, err
  113. }
  114. store.dbs[bucket] = db
  115. return db, bucket, shortPath, nil
  116. }
  117. func (store *LevelDB3Store) BeginTransaction(ctx context.Context) (context.Context, error) {
  118. return ctx, nil
  119. }
  120. func (store *LevelDB3Store) CommitTransaction(ctx context.Context) error {
  121. return nil
  122. }
  123. func (store *LevelDB3Store) RollbackTransaction(ctx context.Context) error {
  124. return nil
  125. }
  126. func (store *LevelDB3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  127. db, _, shortPath, err := store.findDB(entry.FullPath, false)
  128. if err != nil {
  129. return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
  130. }
  131. dir, name := shortPath.DirAndName()
  132. key := genKey(dir, name)
  133. value, err := entry.EncodeAttributesAndChunks()
  134. if err != nil {
  135. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  136. }
  137. if len(entry.Chunks) > 50 {
  138. value = weed_util.MaybeGzipData(value)
  139. }
  140. err = db.Put(key, value, nil)
  141. if err != nil {
  142. return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
  143. }
  144. // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
  145. return nil
  146. }
  147. func (store *LevelDB3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  148. return store.InsertEntry(ctx, entry)
  149. }
  150. func (store *LevelDB3Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
  151. db, _, shortPath, err := store.findDB(fullpath, false)
  152. if err != nil {
  153. return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
  154. }
  155. dir, name := shortPath.DirAndName()
  156. key := genKey(dir, name)
  157. data, err := db.Get(key, nil)
  158. if err == leveldb.ErrNotFound {
  159. return nil, filer_pb.ErrNotFound
  160. }
  161. if err != nil {
  162. return nil, fmt.Errorf("get %s : %v", fullpath, err)
  163. }
  164. entry = &filer.Entry{
  165. FullPath: fullpath,
  166. }
  167. err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data))
  168. if err != nil {
  169. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  170. }
  171. // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
  172. return entry, nil
  173. }
  174. func (store *LevelDB3Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  175. db, _, shortPath, err := store.findDB(fullpath, false)
  176. if err != nil {
  177. return fmt.Errorf("findDB %s : %v", fullpath, err)
  178. }
  179. dir, name := shortPath.DirAndName()
  180. key := genKey(dir, name)
  181. err = db.Delete(key, nil)
  182. if err != nil {
  183. return fmt.Errorf("delete %s : %v", fullpath, err)
  184. }
  185. return nil
  186. }
  187. func (store *LevelDB3Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  188. db, bucket, shortPath, err := store.findDB(fullpath, true)
  189. if err != nil {
  190. return fmt.Errorf("findDB %s : %v", fullpath, err)
  191. }
  192. if bucket != DEFAULT && shortPath == "/" {
  193. db.Close()
  194. if bucket != "" { // just to make sure
  195. os.RemoveAll(store.dir + "/" + bucket)
  196. }
  197. return nil
  198. }
  199. directoryPrefix := genDirectoryKeyPrefix(shortPath, "")
  200. batch := new(leveldb.Batch)
  201. iter := db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
  202. for iter.Next() {
  203. key := iter.Key()
  204. if !bytes.HasPrefix(key, directoryPrefix) {
  205. break
  206. }
  207. fileName := getNameFromKey(key)
  208. if fileName == "" {
  209. continue
  210. }
  211. batch.Delete(append(directoryPrefix, []byte(fileName)...))
  212. }
  213. iter.Release()
  214. err = db.Write(batch, nil)
  215. if err != nil {
  216. return fmt.Errorf("delete %s : %v", fullpath, err)
  217. }
  218. return nil
  219. }
  220. func (store *LevelDB3Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
  221. limit int) (entries []*filer.Entry, err error) {
  222. return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
  223. }
  224. func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
  225. db, _, shortPath, err := store.findDB(fullpath, true)
  226. if err != nil {
  227. return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
  228. }
  229. directoryPrefix := genDirectoryKeyPrefix(shortPath, prefix)
  230. lastFileStart := directoryPrefix
  231. if startFileName != "" {
  232. lastFileStart = genDirectoryKeyPrefix(shortPath, startFileName)
  233. }
  234. iter := db.NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
  235. for iter.Next() {
  236. key := iter.Key()
  237. if !bytes.HasPrefix(key, directoryPrefix) {
  238. break
  239. }
  240. fileName := getNameFromKey(key)
  241. if fileName == "" {
  242. continue
  243. }
  244. if fileName == startFileName && !inclusive {
  245. continue
  246. }
  247. limit--
  248. if limit < 0 {
  249. break
  250. }
  251. entry := &filer.Entry{
  252. FullPath: weed_util.NewFullPath(string(fullpath), fileName),
  253. }
  254. // println("list", entry.FullPath, "chunks", len(entry.Chunks))
  255. if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
  256. err = decodeErr
  257. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  258. break
  259. }
  260. entries = append(entries, entry)
  261. }
  262. iter.Release()
  263. return entries, err
  264. }
  265. func genKey(dirPath, fileName string) (key []byte) {
  266. key = hashToBytes(dirPath)
  267. key = append(key, []byte(fileName)...)
  268. return key
  269. }
  270. func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
  271. keyPrefix = hashToBytes(string(fullpath))
  272. if len(startFileName) > 0 {
  273. keyPrefix = append(keyPrefix, []byte(startFileName)...)
  274. }
  275. return keyPrefix
  276. }
  277. func getNameFromKey(key []byte) string {
  278. return string(key[md5.Size:])
  279. }
  280. // hash directory
  281. func hashToBytes(dir string) []byte {
  282. h := md5.New()
  283. io.WriteString(h, dir)
  284. b := h.Sum(nil)
  285. return b
  286. }
  287. func (store *LevelDB3Store) Shutdown() {
  288. for _, db := range store.dbs {
  289. db.Close()
  290. }
  291. }