You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

375 lines
9.3 KiB

  1. package leveldb
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "fmt"
  7. "github.com/syndtr/goleveldb/leveldb"
  8. leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
  9. "github.com/syndtr/goleveldb/leveldb/opt"
  10. leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
  11. "io"
  12. "os"
  13. "strings"
  14. "sync"
  15. "github.com/chrislusf/seaweedfs/weed/filer"
  16. "github.com/chrislusf/seaweedfs/weed/glog"
  17. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  18. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  19. )
  20. const (
  21. DEFAULT = "_main"
  22. )
  23. func init() {
  24. filer.Stores = append(filer.Stores, &LevelDB3Store{})
  25. }
  26. type LevelDB3Store struct {
  27. dir string
  28. dbs map[string]*leveldb.DB
  29. dbsLock sync.RWMutex
  30. }
  31. func (store *LevelDB3Store) GetName() string {
  32. return "leveldb3"
  33. }
  34. func (store *LevelDB3Store) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
  35. dir := configuration.GetString(prefix + "dir")
  36. return store.initialize(dir)
  37. }
  38. func (store *LevelDB3Store) initialize(dir string) (err error) {
  39. glog.Infof("filer store leveldb3 dir: %s", dir)
  40. if err := weed_util.TestFolderWritable(dir); err != nil {
  41. return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
  42. }
  43. store.dir = dir
  44. db, loadDbErr := store.loadDB(DEFAULT)
  45. if loadDbErr != nil {
  46. return loadDbErr
  47. }
  48. store.dbs = make(map[string]*leveldb.DB)
  49. store.dbs[DEFAULT] = db
  50. return
  51. }
  52. func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) {
  53. opts := &opt.Options{
  54. BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
  55. WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
  56. CompactionTableSizeMultiplier: 4,
  57. }
  58. if name != DEFAULT {
  59. opts = &opt.Options{
  60. BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
  61. WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
  62. CompactionTableSizeMultiplier: 4,
  63. }
  64. }
  65. dbFolder := fmt.Sprintf("%s/%s", store.dir, name)
  66. os.MkdirAll(dbFolder, 0755)
  67. db, dbErr := leveldb.OpenFile(dbFolder, opts)
  68. if leveldb_errors.IsCorrupted(dbErr) {
  69. db, dbErr = leveldb.RecoverFile(dbFolder, opts)
  70. }
  71. if dbErr != nil {
  72. glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr)
  73. return nil, dbErr
  74. }
  75. return db, nil
  76. }
  77. func (store *LevelDB3Store) findDB(fullpath weed_util.FullPath, isForChildren bool) (*leveldb.DB, string, weed_util.FullPath, error) {
  78. store.dbsLock.RLock()
  79. defaultDB := store.dbs[DEFAULT]
  80. if !strings.HasPrefix(string(fullpath), "/buckets/") {
  81. store.dbsLock.RUnlock()
  82. return defaultDB, DEFAULT, fullpath, nil
  83. }
  84. // detect bucket
  85. bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
  86. t := strings.Index(bucketAndObjectKey, "/")
  87. if t < 0 && !isForChildren {
  88. store.dbsLock.RUnlock()
  89. return defaultDB, DEFAULT, fullpath, nil
  90. }
  91. bucket := bucketAndObjectKey
  92. shortPath := weed_util.FullPath("/")
  93. if t > 0 {
  94. bucket = bucketAndObjectKey[:t]
  95. shortPath = weed_util.FullPath(bucketAndObjectKey[t:])
  96. }
  97. if db, found := store.dbs[bucket]; found {
  98. store.dbsLock.RUnlock()
  99. return db, bucket, shortPath, nil
  100. }
  101. store.dbsLock.RUnlock()
  102. // upgrade to write lock
  103. store.dbsLock.Lock()
  104. defer store.dbsLock.Unlock()
  105. // double check after getting the write lock
  106. if db, found := store.dbs[bucket]; found {
  107. return db, bucket, shortPath, nil
  108. }
  109. // create db
  110. db, err := store.loadDB(bucket)
  111. if err != nil {
  112. return nil, bucket, shortPath, err
  113. }
  114. store.dbs[bucket] = db
  115. return db, bucket, shortPath, nil
  116. }
  117. func (store *LevelDB3Store) closeDB(bucket string) {
  118. store.dbsLock.Lock()
  119. defer store.dbsLock.Unlock()
  120. if db, found := store.dbs[bucket]; found {
  121. db.Close()
  122. delete(store.dbs, bucket)
  123. }
  124. }
  125. func (store *LevelDB3Store) BeginTransaction(ctx context.Context) (context.Context, error) {
  126. return ctx, nil
  127. }
  128. func (store *LevelDB3Store) CommitTransaction(ctx context.Context) error {
  129. return nil
  130. }
  131. func (store *LevelDB3Store) RollbackTransaction(ctx context.Context) error {
  132. return nil
  133. }
  134. func (store *LevelDB3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  135. db, _, shortPath, err := store.findDB(entry.FullPath, false)
  136. if err != nil {
  137. return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
  138. }
  139. dir, name := shortPath.DirAndName()
  140. key := genKey(dir, name)
  141. value, err := entry.EncodeAttributesAndChunks()
  142. if err != nil {
  143. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  144. }
  145. if len(entry.Chunks) > 50 {
  146. value = weed_util.MaybeGzipData(value)
  147. }
  148. err = db.Put(key, value, nil)
  149. if err != nil {
  150. return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
  151. }
  152. // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
  153. return nil
  154. }
  155. func (store *LevelDB3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  156. return store.InsertEntry(ctx, entry)
  157. }
  158. func (store *LevelDB3Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
  159. db, _, shortPath, err := store.findDB(fullpath, false)
  160. if err != nil {
  161. return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
  162. }
  163. dir, name := shortPath.DirAndName()
  164. key := genKey(dir, name)
  165. data, err := db.Get(key, nil)
  166. if err == leveldb.ErrNotFound {
  167. return nil, filer_pb.ErrNotFound
  168. }
  169. if err != nil {
  170. return nil, fmt.Errorf("get %s : %v", fullpath, err)
  171. }
  172. entry = &filer.Entry{
  173. FullPath: fullpath,
  174. }
  175. err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data))
  176. if err != nil {
  177. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  178. }
  179. // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
  180. return entry, nil
  181. }
  182. func (store *LevelDB3Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  183. db, _, shortPath, err := store.findDB(fullpath, false)
  184. if err != nil {
  185. return fmt.Errorf("findDB %s : %v", fullpath, err)
  186. }
  187. dir, name := shortPath.DirAndName()
  188. key := genKey(dir, name)
  189. err = db.Delete(key, nil)
  190. if err != nil {
  191. return fmt.Errorf("delete %s : %v", fullpath, err)
  192. }
  193. return nil
  194. }
  195. func (store *LevelDB3Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  196. db, bucket, shortPath, err := store.findDB(fullpath, true)
  197. if err != nil {
  198. return fmt.Errorf("findDB %s : %v", fullpath, err)
  199. }
  200. if bucket != DEFAULT && shortPath == "/" {
  201. store.closeDB(bucket)
  202. if bucket != "" { // just to make sure
  203. os.RemoveAll(store.dir + "/" + bucket)
  204. }
  205. return nil
  206. }
  207. directoryPrefix := genDirectoryKeyPrefix(shortPath, "")
  208. batch := new(leveldb.Batch)
  209. iter := db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
  210. for iter.Next() {
  211. key := iter.Key()
  212. if !bytes.HasPrefix(key, directoryPrefix) {
  213. break
  214. }
  215. fileName := getNameFromKey(key)
  216. if fileName == "" {
  217. continue
  218. }
  219. batch.Delete(append(directoryPrefix, []byte(fileName)...))
  220. }
  221. iter.Release()
  222. err = db.Write(batch, nil)
  223. if err != nil {
  224. return fmt.Errorf("delete %s : %v", fullpath, err)
  225. }
  226. return nil
  227. }
  228. func (store *LevelDB3Store) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  229. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
  230. }
  231. func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  232. db, _, shortPath, err := store.findDB(dirPath, true)
  233. if err != nil {
  234. return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
  235. }
  236. directoryPrefix := genDirectoryKeyPrefix(shortPath, prefix)
  237. lastFileStart := directoryPrefix
  238. if startFileName != "" {
  239. lastFileStart = genDirectoryKeyPrefix(shortPath, startFileName)
  240. }
  241. iter := db.NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
  242. for iter.Next() {
  243. key := iter.Key()
  244. if !bytes.HasPrefix(key, directoryPrefix) {
  245. break
  246. }
  247. fileName := getNameFromKey(key)
  248. if fileName == "" {
  249. continue
  250. }
  251. if fileName == startFileName && !includeStartFile {
  252. continue
  253. }
  254. lastFileName = fileName
  255. limit--
  256. if limit < 0 {
  257. break
  258. }
  259. entry := &filer.Entry{
  260. FullPath: weed_util.NewFullPath(string(dirPath), fileName),
  261. }
  262. // println("list", entry.FullPath, "chunks", len(entry.Chunks))
  263. if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
  264. err = decodeErr
  265. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  266. break
  267. }
  268. if !eachEntryFunc(entry) {
  269. break
  270. }
  271. }
  272. iter.Release()
  273. return lastFileName, err
  274. }
  275. func genKey(dirPath, fileName string) (key []byte) {
  276. key = hashToBytes(dirPath)
  277. key = append(key, []byte(fileName)...)
  278. return key
  279. }
  280. func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
  281. keyPrefix = hashToBytes(string(fullpath))
  282. if len(startFileName) > 0 {
  283. keyPrefix = append(keyPrefix, []byte(startFileName)...)
  284. }
  285. return keyPrefix
  286. }
  287. func getNameFromKey(key []byte) string {
  288. return string(key[md5.Size:])
  289. }
  290. // hash directory
  291. func hashToBytes(dir string) []byte {
  292. h := md5.New()
  293. io.WriteString(h, dir)
  294. b := h.Sum(nil)
  295. return b
  296. }
  297. func (store *LevelDB3Store) Shutdown() {
  298. for _, db := range store.dbs {
  299. db.Close()
  300. }
  301. }