You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

376 lines
9.3 KiB

  1. package leveldb
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "fmt"
  7. "github.com/syndtr/goleveldb/leveldb"
  8. leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
  9. "github.com/syndtr/goleveldb/leveldb/opt"
  10. leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
  11. "io"
  12. "os"
  13. "strings"
  14. "sync"
  15. "github.com/chrislusf/seaweedfs/weed/filer"
  16. "github.com/chrislusf/seaweedfs/weed/glog"
  17. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  18. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  19. )
  20. const (
  21. DEFAULT = "_main"
  22. )
  23. func init() {
  24. filer.Stores = append(filer.Stores, &LevelDB3Store{})
  25. }
  26. type LevelDB3Store struct {
  27. dir string
  28. dbs map[string]*leveldb.DB
  29. dbsLock sync.RWMutex
  30. }
  31. func (store *LevelDB3Store) GetName() string {
  32. return "leveldb3"
  33. }
  34. func (store *LevelDB3Store) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
  35. dir := configuration.GetString(prefix + "dir")
  36. return store.initialize(dir)
  37. }
  38. func (store *LevelDB3Store) initialize(dir string) (err error) {
  39. glog.Infof("filer store leveldb3 dir: %s", dir)
  40. os.MkdirAll(dir, 0755)
  41. if err := weed_util.TestFolderWritable(dir); err != nil {
  42. return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
  43. }
  44. store.dir = dir
  45. db, loadDbErr := store.loadDB(DEFAULT)
  46. if loadDbErr != nil {
  47. return loadDbErr
  48. }
  49. store.dbs = make(map[string]*leveldb.DB)
  50. store.dbs[DEFAULT] = db
  51. return
  52. }
  53. func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) {
  54. opts := &opt.Options{
  55. BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
  56. WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
  57. CompactionTableSizeMultiplier: 4,
  58. }
  59. if name != DEFAULT {
  60. opts = &opt.Options{
  61. BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
  62. WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
  63. CompactionTableSizeMultiplier: 4,
  64. }
  65. }
  66. dbFolder := fmt.Sprintf("%s/%s", store.dir, name)
  67. os.MkdirAll(dbFolder, 0755)
  68. db, dbErr := leveldb.OpenFile(dbFolder, opts)
  69. if leveldb_errors.IsCorrupted(dbErr) {
  70. db, dbErr = leveldb.RecoverFile(dbFolder, opts)
  71. }
  72. if dbErr != nil {
  73. glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr)
  74. return nil, dbErr
  75. }
  76. return db, nil
  77. }
  78. func (store *LevelDB3Store) findDB(fullpath weed_util.FullPath, isForChildren bool) (*leveldb.DB, string, weed_util.FullPath, error) {
  79. store.dbsLock.RLock()
  80. defaultDB := store.dbs[DEFAULT]
  81. if !strings.HasPrefix(string(fullpath), "/buckets/") {
  82. store.dbsLock.RUnlock()
  83. return defaultDB, DEFAULT, fullpath, nil
  84. }
  85. // detect bucket
  86. bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
  87. t := strings.Index(bucketAndObjectKey, "/")
  88. if t < 0 && !isForChildren {
  89. store.dbsLock.RUnlock()
  90. return defaultDB, DEFAULT, fullpath, nil
  91. }
  92. bucket := bucketAndObjectKey
  93. shortPath := weed_util.FullPath("/")
  94. if t > 0 {
  95. bucket = bucketAndObjectKey[:t]
  96. shortPath = weed_util.FullPath(bucketAndObjectKey[t:])
  97. }
  98. if db, found := store.dbs[bucket]; found {
  99. store.dbsLock.RUnlock()
  100. return db, bucket, shortPath, nil
  101. }
  102. store.dbsLock.RUnlock()
  103. // upgrade to write lock
  104. store.dbsLock.Lock()
  105. defer store.dbsLock.Unlock()
  106. // double check after getting the write lock
  107. if db, found := store.dbs[bucket]; found {
  108. return db, bucket, shortPath, nil
  109. }
  110. // create db
  111. db, err := store.loadDB(bucket)
  112. if err != nil {
  113. return nil, bucket, shortPath, err
  114. }
  115. store.dbs[bucket] = db
  116. return db, bucket, shortPath, nil
  117. }
  118. func (store *LevelDB3Store) closeDB(bucket string) {
  119. store.dbsLock.Lock()
  120. defer store.dbsLock.Unlock()
  121. if db, found := store.dbs[bucket]; found {
  122. db.Close()
  123. delete(store.dbs, bucket)
  124. }
  125. }
  126. func (store *LevelDB3Store) BeginTransaction(ctx context.Context) (context.Context, error) {
  127. return ctx, nil
  128. }
  129. func (store *LevelDB3Store) CommitTransaction(ctx context.Context) error {
  130. return nil
  131. }
  132. func (store *LevelDB3Store) RollbackTransaction(ctx context.Context) error {
  133. return nil
  134. }
  135. func (store *LevelDB3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  136. db, _, shortPath, err := store.findDB(entry.FullPath, false)
  137. if err != nil {
  138. return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
  139. }
  140. dir, name := shortPath.DirAndName()
  141. key := genKey(dir, name)
  142. value, err := entry.EncodeAttributesAndChunks()
  143. if err != nil {
  144. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  145. }
  146. if len(entry.Chunks) > 50 {
  147. value = weed_util.MaybeGzipData(value)
  148. }
  149. err = db.Put(key, value, nil)
  150. if err != nil {
  151. return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
  152. }
  153. // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
  154. return nil
  155. }
  156. func (store *LevelDB3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  157. return store.InsertEntry(ctx, entry)
  158. }
  159. func (store *LevelDB3Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
  160. db, _, shortPath, err := store.findDB(fullpath, false)
  161. if err != nil {
  162. return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
  163. }
  164. dir, name := shortPath.DirAndName()
  165. key := genKey(dir, name)
  166. data, err := db.Get(key, nil)
  167. if err == leveldb.ErrNotFound {
  168. return nil, filer_pb.ErrNotFound
  169. }
  170. if err != nil {
  171. return nil, fmt.Errorf("get %s : %v", fullpath, err)
  172. }
  173. entry = &filer.Entry{
  174. FullPath: fullpath,
  175. }
  176. err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data))
  177. if err != nil {
  178. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  179. }
  180. // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
  181. return entry, nil
  182. }
  183. func (store *LevelDB3Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  184. db, _, shortPath, err := store.findDB(fullpath, false)
  185. if err != nil {
  186. return fmt.Errorf("findDB %s : %v", fullpath, err)
  187. }
  188. dir, name := shortPath.DirAndName()
  189. key := genKey(dir, name)
  190. err = db.Delete(key, nil)
  191. if err != nil {
  192. return fmt.Errorf("delete %s : %v", fullpath, err)
  193. }
  194. return nil
  195. }
  196. func (store *LevelDB3Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  197. db, bucket, shortPath, err := store.findDB(fullpath, true)
  198. if err != nil {
  199. return fmt.Errorf("findDB %s : %v", fullpath, err)
  200. }
  201. if bucket != DEFAULT && shortPath == "/" {
  202. store.closeDB(bucket)
  203. if bucket != "" { // just to make sure
  204. os.RemoveAll(store.dir + "/" + bucket)
  205. }
  206. return nil
  207. }
  208. directoryPrefix := genDirectoryKeyPrefix(shortPath, "")
  209. batch := new(leveldb.Batch)
  210. iter := db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
  211. for iter.Next() {
  212. key := iter.Key()
  213. if !bytes.HasPrefix(key, directoryPrefix) {
  214. break
  215. }
  216. fileName := getNameFromKey(key)
  217. if fileName == "" {
  218. continue
  219. }
  220. batch.Delete(append(directoryPrefix, []byte(fileName)...))
  221. }
  222. iter.Release()
  223. err = db.Write(batch, nil)
  224. if err != nil {
  225. return fmt.Errorf("delete %s : %v", fullpath, err)
  226. }
  227. return nil
  228. }
  229. func (store *LevelDB3Store) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  230. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
  231. }
  232. func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  233. db, _, shortPath, err := store.findDB(dirPath, true)
  234. if err != nil {
  235. return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
  236. }
  237. directoryPrefix := genDirectoryKeyPrefix(shortPath, prefix)
  238. lastFileStart := directoryPrefix
  239. if startFileName != "" {
  240. lastFileStart = genDirectoryKeyPrefix(shortPath, startFileName)
  241. }
  242. iter := db.NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
  243. for iter.Next() {
  244. key := iter.Key()
  245. if !bytes.HasPrefix(key, directoryPrefix) {
  246. break
  247. }
  248. fileName := getNameFromKey(key)
  249. if fileName == "" {
  250. continue
  251. }
  252. if fileName == startFileName && !includeStartFile {
  253. continue
  254. }
  255. limit--
  256. if limit < 0 {
  257. break
  258. }
  259. lastFileName = fileName
  260. entry := &filer.Entry{
  261. FullPath: weed_util.NewFullPath(string(dirPath), fileName),
  262. }
  263. // println("list", entry.FullPath, "chunks", len(entry.Chunks))
  264. if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
  265. err = decodeErr
  266. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  267. break
  268. }
  269. if !eachEntryFunc(entry) {
  270. break
  271. }
  272. }
  273. iter.Release()
  274. return lastFileName, err
  275. }
  276. func genKey(dirPath, fileName string) (key []byte) {
  277. key = hashToBytes(dirPath)
  278. key = append(key, []byte(fileName)...)
  279. return key
  280. }
  281. func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
  282. keyPrefix = hashToBytes(string(fullpath))
  283. if len(startFileName) > 0 {
  284. keyPrefix = append(keyPrefix, []byte(startFileName)...)
  285. }
  286. return keyPrefix
  287. }
  288. func getNameFromKey(key []byte) string {
  289. return string(key[md5.Size:])
  290. }
  291. // hash directory
  292. func hashToBytes(dir string) []byte {
  293. h := md5.New()
  294. io.WriteString(h, dir)
  295. b := h.Sum(nil)
  296. return b
  297. }
  298. func (store *LevelDB3Store) Shutdown() {
  299. for _, db := range store.dbs {
  300. db.Close()
  301. }
  302. }