You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

363 lines
9.0 KiB

  1. package leveldb
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "fmt"
  7. "github.com/syndtr/goleveldb/leveldb"
  8. leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
  9. "github.com/syndtr/goleveldb/leveldb/opt"
  10. leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
  11. "io"
  12. "os"
  13. "strings"
  14. "sync"
  15. "github.com/chrislusf/seaweedfs/weed/filer"
  16. "github.com/chrislusf/seaweedfs/weed/glog"
  17. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  18. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  19. )
  20. const (
  21. DEFAULT = "_main"
  22. )
  23. func init() {
  24. filer.Stores = append(filer.Stores, &LevelDB3Store{})
  25. }
  26. type LevelDB3Store struct {
  27. dir string
  28. dbs map[string]*leveldb.DB
  29. dbsLock sync.RWMutex
  30. }
  31. func (store *LevelDB3Store) GetName() string {
  32. return "leveldb3"
  33. }
  34. func (store *LevelDB3Store) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
  35. dir := configuration.GetString(prefix + "dir")
  36. return store.initialize(dir)
  37. }
  38. func (store *LevelDB3Store) initialize(dir string) (err error) {
  39. glog.Infof("filer store leveldb3 dir: %s", dir)
  40. if err := weed_util.TestFolderWritable(dir); err != nil {
  41. return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
  42. }
  43. store.dir = dir
  44. db, loadDbErr := store.loadDB(DEFAULT)
  45. if loadDbErr != nil {
  46. return loadDbErr
  47. }
  48. store.dbs = make(map[string]*leveldb.DB)
  49. store.dbs[DEFAULT] = db
  50. return
  51. }
  52. func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) {
  53. opts := &opt.Options{
  54. BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
  55. WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
  56. CompactionTableSizeMultiplier: 4,
  57. }
  58. if name != DEFAULT {
  59. opts = &opt.Options{
  60. BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
  61. WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
  62. CompactionTableSizeMultiplier: 4,
  63. }
  64. }
  65. dbFolder := fmt.Sprintf("%s/%s", store.dir, name)
  66. os.MkdirAll(dbFolder, 0755)
  67. db, dbErr := leveldb.OpenFile(dbFolder, opts)
  68. if leveldb_errors.IsCorrupted(dbErr) {
  69. db, dbErr = leveldb.RecoverFile(dbFolder, opts)
  70. }
  71. if dbErr != nil {
  72. glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr)
  73. return nil, dbErr
  74. }
  75. return db, nil
  76. }
  77. func (store *LevelDB3Store) findDB(fullpath weed_util.FullPath, isForChildren bool) (*leveldb.DB, string, weed_util.FullPath, error) {
  78. store.dbsLock.RLock()
  79. defaultDB := store.dbs[DEFAULT]
  80. if !strings.HasPrefix(string(fullpath), "/buckets/") {
  81. store.dbsLock.RUnlock()
  82. return defaultDB, DEFAULT, fullpath, nil
  83. }
  84. // detect bucket
  85. bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
  86. t := strings.Index(bucketAndObjectKey, "/")
  87. if t < 0 && !isForChildren {
  88. store.dbsLock.RUnlock()
  89. return defaultDB, DEFAULT, fullpath, nil
  90. }
  91. bucket := bucketAndObjectKey
  92. shortPath := weed_util.FullPath("/")
  93. if t > 0 {
  94. bucket = bucketAndObjectKey[:t]
  95. shortPath = weed_util.FullPath(bucketAndObjectKey[t:])
  96. }
  97. println("bucket:", bucket, "shortPath", shortPath)
  98. if db, found := store.dbs[bucket]; found {
  99. store.dbsLock.RUnlock()
  100. return db, bucket, shortPath, nil
  101. }
  102. store.dbsLock.RUnlock()
  103. // upgrade to write lock
  104. store.dbsLock.Lock()
  105. defer store.dbsLock.Unlock()
  106. // double check after getting the write lock
  107. if db, found := store.dbs[bucket]; found {
  108. return db, bucket, shortPath, nil
  109. }
  110. // create db
  111. db, err := store.loadDB(bucket)
  112. if err != nil {
  113. return nil, bucket, shortPath, err
  114. }
  115. store.dbs[bucket] = db
  116. return db, bucket, shortPath, nil
  117. }
  118. func (store *LevelDB3Store) BeginTransaction(ctx context.Context) (context.Context, error) {
  119. return ctx, nil
  120. }
  121. func (store *LevelDB3Store) CommitTransaction(ctx context.Context) error {
  122. return nil
  123. }
  124. func (store *LevelDB3Store) RollbackTransaction(ctx context.Context) error {
  125. return nil
  126. }
  127. func (store *LevelDB3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  128. db, _, shortPath, err := store.findDB(entry.FullPath, false)
  129. if err != nil {
  130. return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
  131. }
  132. dir, name := shortPath.DirAndName()
  133. key := genKey(dir, name)
  134. value, err := entry.EncodeAttributesAndChunks()
  135. if err != nil {
  136. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  137. }
  138. if len(entry.Chunks) > 50 {
  139. value = weed_util.MaybeGzipData(value)
  140. }
  141. err = db.Put(key, value, nil)
  142. if err != nil {
  143. return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
  144. }
  145. // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
  146. return nil
  147. }
  148. func (store *LevelDB3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  149. return store.InsertEntry(ctx, entry)
  150. }
  151. func (store *LevelDB3Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
  152. db, _, shortPath, err := store.findDB(fullpath, false)
  153. if err != nil {
  154. return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
  155. }
  156. dir, name := shortPath.DirAndName()
  157. key := genKey(dir, name)
  158. data, err := db.Get(key, nil)
  159. if err == leveldb.ErrNotFound {
  160. return nil, filer_pb.ErrNotFound
  161. }
  162. if err != nil {
  163. return nil, fmt.Errorf("get %s : %v", fullpath, err)
  164. }
  165. entry = &filer.Entry{
  166. FullPath: fullpath,
  167. }
  168. err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data))
  169. if err != nil {
  170. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  171. }
  172. // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
  173. return entry, nil
  174. }
  175. func (store *LevelDB3Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  176. db, _, shortPath, err := store.findDB(fullpath, false)
  177. if err != nil {
  178. return fmt.Errorf("findDB %s : %v", fullpath, err)
  179. }
  180. dir, name := shortPath.DirAndName()
  181. key := genKey(dir, name)
  182. err = db.Delete(key, nil)
  183. if err != nil {
  184. return fmt.Errorf("delete %s : %v", fullpath, err)
  185. }
  186. return nil
  187. }
  188. func (store *LevelDB3Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  189. db, bucket, shortPath, err := store.findDB(fullpath, true)
  190. if err != nil {
  191. return fmt.Errorf("findDB %s : %v", fullpath, err)
  192. }
  193. if bucket != DEFAULT && shortPath == "/" {
  194. db.Close()
  195. if bucket != "" { // just to make sure
  196. os.RemoveAll(store.dir + "/" + bucket)
  197. }
  198. return nil
  199. }
  200. directoryPrefix := genDirectoryKeyPrefix(shortPath, "")
  201. batch := new(leveldb.Batch)
  202. iter := db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
  203. for iter.Next() {
  204. key := iter.Key()
  205. if !bytes.HasPrefix(key, directoryPrefix) {
  206. break
  207. }
  208. fileName := getNameFromKey(key)
  209. if fileName == "" {
  210. continue
  211. }
  212. batch.Delete(append(directoryPrefix, []byte(fileName)...))
  213. }
  214. iter.Release()
  215. err = db.Write(batch, nil)
  216. if err != nil {
  217. return fmt.Errorf("delete %s : %v", fullpath, err)
  218. }
  219. return nil
  220. }
  221. func (store *LevelDB3Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
  222. limit int) (entries []*filer.Entry, err error) {
  223. return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
  224. }
  225. func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
  226. db, _, shortPath, err := store.findDB(fullpath, true)
  227. if err != nil {
  228. return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
  229. }
  230. directoryPrefix := genDirectoryKeyPrefix(shortPath, prefix)
  231. lastFileStart := directoryPrefix
  232. if startFileName != "" {
  233. lastFileStart = genDirectoryKeyPrefix(shortPath, startFileName)
  234. }
  235. iter := db.NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
  236. for iter.Next() {
  237. key := iter.Key()
  238. if !bytes.HasPrefix(key, directoryPrefix) {
  239. break
  240. }
  241. fileName := getNameFromKey(key)
  242. if fileName == "" {
  243. continue
  244. }
  245. if fileName == startFileName && !inclusive {
  246. continue
  247. }
  248. limit--
  249. if limit < 0 {
  250. break
  251. }
  252. entry := &filer.Entry{
  253. FullPath: weed_util.NewFullPath(string(fullpath), fileName),
  254. }
  255. // println("list", entry.FullPath, "chunks", len(entry.Chunks))
  256. if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
  257. err = decodeErr
  258. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  259. break
  260. }
  261. entries = append(entries, entry)
  262. }
  263. iter.Release()
  264. return entries, err
  265. }
  266. func genKey(dirPath, fileName string) (key []byte) {
  267. key = hashToBytes(dirPath)
  268. key = append(key, []byte(fileName)...)
  269. return key
  270. }
  271. func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
  272. keyPrefix = hashToBytes(string(fullpath))
  273. if len(startFileName) > 0 {
  274. keyPrefix = append(keyPrefix, []byte(startFileName)...)
  275. }
  276. return keyPrefix
  277. }
  278. func getNameFromKey(key []byte) string {
  279. return string(key[md5.Size:])
  280. }
  281. // hash directory
  282. func hashToBytes(dir string) []byte {
  283. h := md5.New()
  284. io.WriteString(h, dir)
  285. b := h.Sum(nil)
  286. return b
  287. }
  288. func (store *LevelDB3Store) Shutdown() {
  289. for _, db := range store.dbs {
  290. db.Close()
  291. }
  292. }