From 40b3207f18d7b1a5f04c0333c970a42527d16df0 Mon Sep 17 00:00:00 2001 From: chenqieqie Date: Mon, 4 Jan 2021 18:48:55 +0800 Subject: [PATCH] fix #1726 --- weed/filer/rocksdb/rocksdb_store.go | 81 ++++++++++++++++------------- 1 file changed, 45 insertions(+), 36 deletions(-) diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go index a8992cf03..ca6391386 100644 --- a/weed/filer/rocksdb/rocksdb_store.go +++ b/weed/filer/rocksdb/rocksdb_store.go @@ -11,7 +11,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" weed_util "github.com/chrislusf/seaweedfs/weed/util" - "github.com/tecbot/gorocksdb" + rocksdb "github.com/tecbot/gorocksdb" "io" ) @@ -19,9 +19,28 @@ func init() { filer.Stores = append(filer.Stores, &RocksDBStore{}) } +type options struct { + opt *rocksdb.Options + ro *rocksdb.ReadOptions + wo *rocksdb.WriteOptions +} + +func (opt *options) init() { + opt.opt = rocksdb.NewDefaultOptions() + opt.ro = rocksdb.NewDefaultReadOptions() + opt.wo = rocksdb.NewDefaultWriteOptions() +} + +func (opt *options) close() { + opt.opt.Destroy() + opt.ro.Destroy() + opt.wo.Destroy() +} + type RocksDBStore struct { path string - db *gorocksdb.DB + db *rocksdb.DB + options } func (store *RocksDBStore) GetName() string { @@ -38,10 +57,9 @@ func (store *RocksDBStore) initialize(dir string) (err error) { if err := weed_util.TestFolderWritable(dir); err != nil { return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) } - - options := gorocksdb.NewDefaultOptions() - options.SetCreateIfMissing(true) - store.db, err = gorocksdb.OpenDb(options, dir) + store.options.init() + store.opt.SetCreateIfMissing(true) + store.db, err = rocksdb.OpenDb(store.opt, dir) return } @@ -65,8 +83,7 @@ func (store *RocksDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - wo := gorocksdb.NewDefaultWriteOptions() - err = store.db.Put(wo, key, value) + err = store.db.Put(store.wo, key, value) if err != nil { return fmt.Errorf("persisting %s : %v", entry.FullPath, err) @@ -85,21 +102,21 @@ func (store *RocksDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry) func (store *RocksDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { dir, name := fullpath.DirAndName() key := genKey(dir, name) - - ro := gorocksdb.NewDefaultReadOptions() - data, err := store.db.GetBytes(ro, key) + data, err := store.db.Get(store.ro, key) if data == nil { return nil, filer_pb.ErrNotFound } + defer data.Free() + if err != nil { - return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) + return nil, fmt.Errorf("get %s : %v", fullpath, err) } entry = &filer.Entry{ FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data)) + err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data.Data())) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } @@ -113,8 +130,7 @@ func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.F dir, name := fullpath.DirAndName() key := genKey(dir, name) - wo := gorocksdb.NewDefaultWriteOptions() - err = store.db.Delete(wo, key) + err = store.db.Delete(store.wo, key) if err != nil { return fmt.Errorf("delete %s : %v", fullpath, err) } @@ -125,10 +141,13 @@ func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.F func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { directoryPrefix := genDirectoryKeyPrefix(fullpath, "") - batch := new(gorocksdb.WriteBatch) + batch := rocksdb.NewWriteBatch() + defer batch.Destroy() - ro := gorocksdb.NewDefaultReadOptions() + ro := rocksdb.NewDefaultReadOptions() + defer ro.Destroy() ro.SetFillCache(false) + iter := store.db.NewIterator(ro) defer iter.Close() err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool { @@ -139,8 +158,7 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we return fmt.Errorf("delete list %s : %v", fullpath, err) } - wo := gorocksdb.NewDefaultWriteOptions() - err = store.db.Write(wo, batch) + err = store.db.Write(store.wo, batch) if err != nil { return fmt.Errorf("delete %s : %v", fullpath, err) @@ -149,7 +167,7 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we return nil } -func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) error { +func enumerate(iter *rocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) error { if len(lastKey) == 0 { iter.Seek(prefix) @@ -157,11 +175,7 @@ func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey iter.Seek(lastKey) if !includeLastKey { - k := iter.Key() - v := iter.Value() - key := k.Data() - defer k.Free() - defer v.Free() + key := iter.Key().Data() if !bytes.HasPrefix(key, prefix) { return nil @@ -184,21 +198,13 @@ func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey } } - k := iter.Key() - v := iter.Value() - key := k.Data() - value := v.Data() + key := iter.Key().Data() if !bytes.HasPrefix(key, prefix) { - k.Free() - v.Free() break } - ret := fn(key, value) - - k.Free() - v.Free() + ret := fn(key, iter.Value().Data()) if !ret { break @@ -225,8 +231,10 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, ful lastFileStart = genDirectoryKeyPrefix(fullpath, startFileName) } - ro := gorocksdb.NewDefaultReadOptions() + ro := rocksdb.NewDefaultReadOptions() + defer ro.Destroy() ro.SetFillCache(false) + iter := store.db.NewIterator(ro) defer iter.Close() err = enumerate(iter, directoryPrefix, lastFileStart, inclusive, limit, func(key, value []byte) bool { @@ -290,4 +298,5 @@ func hashToBytes(dir string) []byte { func (store *RocksDBStore) Shutdown() { store.db.Close() + store.options.close() }