Browse Source

implement rocksdb changes

pull/1748/head
Chris Lu 4 years ago
parent
commit
34a846009d
  1. 28
      weed/filer/rocksdb/rocksdb_store.go

28
weed/filer/rocksdb/rocksdb_store.go

@ -158,7 +158,7 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
iter := store.db.NewIterator(ro) iter := store.db.NewIterator(ro)
defer iter.Close() defer iter.Close()
err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool {
_, err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool {
batch.Delete(key) batch.Delete(key)
return true return true
}) })
@ -175,7 +175,7 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
return nil return nil
} }
func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) error {
func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) (hasMore bool, err error) {
if len(lastKey) == 0 { if len(lastKey) == 0 {
iter.Seek(prefix) iter.Seek(prefix)
@ -196,6 +196,7 @@ func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey
if limit > 0 { if limit > 0 {
i++ i++
if i > limit { if i > limit {
hasMore = true
break break
} }
} }
@ -215,22 +216,21 @@ func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey
} }
if err := iter.Err(); err != nil { if err := iter.Err(); err != nil {
return fmt.Errorf("prefix scan iterator: %v", err)
return hasMore, fmt.Errorf("prefix scan iterator: %v", err)
} }
return nil
return hasMore, nil
} }
func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer.Entry, err error) {
return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int) (entries []*filer.Entry, hasMore bool, err error) {
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "")
} }
func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*filer.Entry, hasMore bool, err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, prefix)
directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
lastFileStart := directoryPrefix lastFileStart := directoryPrefix
if startFileName != "" { if startFileName != "" {
lastFileStart = genDirectoryKeyPrefix(fullpath, startFileName)
lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName)
} }
ro := gorocksdb.NewDefaultReadOptions() ro := gorocksdb.NewDefaultReadOptions()
@ -239,13 +239,13 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, ful
iter := store.db.NewIterator(ro) iter := store.db.NewIterator(ro)
defer iter.Close() defer iter.Close()
err = enumerate(iter, directoryPrefix, lastFileStart, inclusive, limit, func(key, value []byte) bool {
hasMore, err = enumerate(iter, directoryPrefix, lastFileStart, includeStartFile, limit, func(key, value []byte) bool {
fileName := getNameFromKey(key) fileName := getNameFromKey(key)
if fileName == "" { if fileName == "" {
return true return true
} }
entry := &filer.Entry{ entry := &filer.Entry{
FullPath: weed_util.NewFullPath(string(fullpath), fileName),
FullPath: weed_util.NewFullPath(string(dirPath), fileName),
} }
// println("list", entry.FullPath, "chunks", len(entry.Chunks)) // println("list", entry.FullPath, "chunks", len(entry.Chunks))
@ -258,10 +258,10 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, ful
return true return true
}) })
if err != nil { if err != nil {
return entries, fmt.Errorf("prefix list %s : %v", fullpath, err)
return entries, false, fmt.Errorf("prefix list %s : %v", dirPath, err)
} }
return entries, err
return entries, false, err
} }
func genKey(dirPath, fileName string) (key []byte) { func genKey(dirPath, fileName string) (key []byte) {

Loading…
Cancel
Save