Browse Source

impl: TTL per entry for rocksdb; fix package name

pull/1730/head
qieqieplus 4 years ago
parent
commit
bbae13e415
  1. 40
      weed/filer/rocksdb/rocksdb_store.go
  2. 41
      weed/filer/rocksdb/rocksdb_ttl.go

40
weed/filer/rocksdb/rocksdb_store.go

@ -7,12 +7,14 @@ import (
"context" "context"
"crypto/md5" "crypto/md5"
"fmt" "fmt"
"io"
"github.com/tecbot/gorocksdb"
"github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
weed_util "github.com/chrislusf/seaweedfs/weed/util" weed_util "github.com/chrislusf/seaweedfs/weed/util"
rocksdb "github.com/tecbot/gorocksdb"
"io"
) )
func init() { func init() {
@ -20,15 +22,15 @@ func init() {
} }
type options struct { type options struct {
opt *rocksdb.Options
ro *rocksdb.ReadOptions
wo *rocksdb.WriteOptions
opt *gorocksdb.Options
ro *gorocksdb.ReadOptions
wo *gorocksdb.WriteOptions
} }
func (opt *options) init() { func (opt *options) init() {
opt.opt = rocksdb.NewDefaultOptions()
opt.ro = rocksdb.NewDefaultReadOptions()
opt.wo = rocksdb.NewDefaultWriteOptions()
opt.opt = gorocksdb.NewDefaultOptions()
opt.ro = gorocksdb.NewDefaultReadOptions()
opt.wo = gorocksdb.NewDefaultWriteOptions()
} }
func (opt *options) close() { func (opt *options) close() {
@ -39,7 +41,7 @@ func (opt *options) close() {
type RocksDBStore struct { type RocksDBStore struct {
path string path string
db *rocksdb.DB
db *gorocksdb.DB
options options
} }
@ -59,7 +61,13 @@ func (store *RocksDBStore) initialize(dir string) (err error) {
} }
store.options.init() store.options.init()
store.opt.SetCreateIfMissing(true) store.opt.SetCreateIfMissing(true)
store.db, err = rocksdb.OpenDb(store.opt, dir)
// reduce write amplification
// also avoid expired data stored in highest level never get compacted
store.opt.SetLevelCompactionDynamicLevelBytes(true)
store.opt.SetCompactionFilter(NewTTLFilter())
// store.opt.SetMaxBackgroundCompactions(2)
store.db, err = gorocksdb.OpenDb(store.opt, dir)
return return
} }
@ -116,7 +124,7 @@ func (store *RocksDBStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
entry = &filer.Entry{ entry = &filer.Entry{
FullPath: fullpath, FullPath: fullpath,
} }
err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data.Data()))
err = entry.DecodeAttributesAndChunks(data.Data())
if err != nil { if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
} }
@ -141,10 +149,10 @@ func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.F
func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "") directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
batch := rocksdb.NewWriteBatch()
batch := gorocksdb.NewWriteBatch()
defer batch.Destroy() defer batch.Destroy()
ro := rocksdb.NewDefaultReadOptions()
ro := gorocksdb.NewDefaultReadOptions()
defer ro.Destroy() defer ro.Destroy()
ro.SetFillCache(false) ro.SetFillCache(false)
@ -167,7 +175,7 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
return nil return nil
} }
func enumerate(iter *rocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) error {
func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) error {
if len(lastKey) == 0 { if len(lastKey) == 0 {
iter.Seek(prefix) iter.Seek(prefix)
@ -231,7 +239,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, ful
lastFileStart = genDirectoryKeyPrefix(fullpath, startFileName) lastFileStart = genDirectoryKeyPrefix(fullpath, startFileName)
} }
ro := rocksdb.NewDefaultReadOptions()
ro := gorocksdb.NewDefaultReadOptions()
defer ro.Destroy() defer ro.Destroy()
ro.SetFillCache(false) ro.SetFillCache(false)
@ -251,7 +259,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, ful
} }
// println("list", entry.FullPath, "chunks", len(entry.Chunks)) // println("list", entry.FullPath, "chunks", len(entry.Chunks))
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(value)); decodeErr != nil {
if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
err = decodeErr err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err) glog.V(0).Infof("list %s : %v", entry.FullPath, err)
return false return false

41
weed/filer/rocksdb/rocksdb_ttl.go

@ -0,0 +1,41 @@
//+build rocksdb
package rocksdb
import (
"time"
"github.com/tecbot/gorocksdb"
"github.com/chrislusf/seaweedfs/weed/filer"
)
type TTLFilter struct {
skipLevel0 bool
}
func NewTTLFilter() gorocksdb.CompactionFilter {
return &TTLFilter{
skipLevel0: true,
}
}
func (t *TTLFilter) Filter(level int, key, val []byte) (remove bool, newVal []byte) {
// decode could be slow, causing write stall
// level >0 sst can run compaction in parallel
if t.skipLevel0 && level == 0 {
return false, val
}
entry := filer.Entry{}
if err := entry.DecodeAttributesAndChunks(val); err == nil {
if entry.TtlSec == 0 ||
entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) {
return false, val
}
}
return true, nil
}
func (t *TTLFilter) Name() string {
return "TTLFilter"
}
Loading…
Cancel
Save