Browse Source

fix rocksdb enumerate (#6858)

pull/6714/merge
Bruce Zou 6 days ago
committed by GitHub
parent
commit
fa730abec7
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 26
      weed/filer/rocksdb/rocksdb_store.go
  2. 71
      weed/filer/rocksdb/rocksdb_store_test.go

26
weed/filer/rocksdb/rocksdb_store.go

@ -169,7 +169,7 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
iter := store.db.NewIterator(ro) iter := store.db.NewIterator(ro)
defer iter.Close() defer iter.Close()
err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool {
err = enumerate(iter, directoryPrefix, nil, false, -1, "", func(key, value []byte) bool {
batch.Delete(key) batch.Delete(key)
return true return true
}) })
@ -186,23 +186,16 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
return nil return nil
} }
func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int64, fn func(key, value []byte) bool) (err error) {
func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int64, startFileName string, fn func(key, value []byte) bool) (err error) {
if len(lastKey) == 0 { if len(lastKey) == 0 {
iter.Seek(prefix) iter.Seek(prefix)
} else { } else {
iter.Seek(lastKey) iter.Seek(lastKey)
if !includeLastKey {
if iter.Valid() {
if bytes.Equal(iter.Key().Data(), lastKey) {
iter.Next()
}
}
}
} }
i := int64(0) i := int64(0)
for ; iter.Valid(); iter.Next() {
for iter.Valid() {
if limit > 0 { if limit > 0 {
i++ i++
@ -217,12 +210,23 @@ func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey
break break
} }
fileName := getNameFromKey(key)
if fileName == "" {
iter.Next()
continue
}
if fileName == startFileName && !includeLastKey {
iter.Next()
continue
}
ret := fn(key, iter.Value().Data()) ret := fn(key, iter.Value().Data())
if !ret { if !ret {
break break
} }
iter.Next()
} }
if err := iter.Err(); err != nil { if err := iter.Err(); err != nil {
@ -249,7 +253,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
iter := store.db.NewIterator(ro) iter := store.db.NewIterator(ro)
defer iter.Close() defer iter.Close()
err = enumerate(iter, directoryPrefix, lastFileStart, includeStartFile, limit, func(key, value []byte) bool {
err = enumerate(iter, directoryPrefix, lastFileStart, includeStartFile, limit, startFileName, func(key, value []byte) bool {
fileName := getNameFromKey(key) fileName := getNameFromKey(key)
if fileName == "" { if fileName == "" {
return true return true

71
weed/filer/rocksdb/rocksdb_store_test.go

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
) )
@ -112,3 +113,73 @@ func BenchmarkInsertEntry(b *testing.B) {
store.InsertEntry(ctx, entry) store.InsertEntry(ctx, entry)
} }
} }
func TestListDirectoryWithPrefix(t *testing.T) {
testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", 255, nil)
dir := t.TempDir()
store := &RocksDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
ctx := context.Background()
files := []string{
"/bucket1/test-prefix1/file1.txt",
"/bucket1/test-prefix1/file2.txt",
"/bucket1/test-prefix1-extra.txt",
}
expected1 := []string{
"/bucket1/test-prefix1",
"/bucket1/test-prefix1-extra.txt",
}
expected2 := []string{
"/bucket1/test-prefix1/file1.txt",
"/bucket1/test-prefix1/file2.txt",
}
for _, file := range files {
fullpath := util.FullPath(file)
entry := &filer.Entry{
FullPath: fullpath,
Attr: filer.Attr{
Mode: 0644,
Uid: 1,
Gid: 1,
},
}
if err := testFiler.CreateEntry(ctx, entry, false, false, nil, false, testFiler.MaxFilenameLength); err != nil {
t.Fatalf("Failed to create entry %s: %v", fullpath, err)
}
}
prefix1 := "test-prefix1"
entries1, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/bucket1"), "", false, 100, prefix1, "", "")
if err != nil {
t.Fatalf("Failed to list entries with prefix %s: %v", prefix1, err)
}
if len(entries1) != 2 {
t.Errorf("Expected 2 entries with prefix %s, got %d", prefix1, len(entries1))
} else {
t.Logf("Found %d entries with prefix %s", len(entries1), prefix1)
}
for i, entry := range entries1 {
if string(entry.FullPath) != expected1[i] {
t.Errorf("Expected entry %s, got %s", expected1[i], entry.FullPath)
}
}
entries2, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/bucket1/test-prefix1"), "", false, 100, "", "", "")
if err != nil {
t.Fatalf("Failed to list entries with prefix %s: %v", prefix1, err)
}
if len(entries2) != 2 {
t.Errorf("Expected 2 entries with prefix %s, got %d", prefix1, len(entries1))
}
for i, entry := range entries2 {
if string(entry.FullPath) != expected2[i] {
t.Errorf("Expected entry %s, got %s", expected2[i], entry.FullPath)
}
}
}
Loading…
Cancel
Save