Browse Source

Add error list each entry func (#7485)

* added error return in type ListEachEntryFunc

* return error if errClose

* fix fmt.Errorf

* fix return errClose

* use %w fmt.Errorf

* added entry in messege error

* add callbackErr in ListDirectoryEntries

* fix error

* add log

* clear err when the scanner stops on io.EOF, so returning err doesn’t surface EOF as a failure.

* more info in error

* add ctx to logs, error handling

* fix return eachEntryFunc

* fix

* fix log

* fix return

* fix foundationdb test s

* fix eachEntryFunc

* fix return resEachEntryFuncErr

* Update weed/filer/filer.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/filer/elastic/v7/elastic_store.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/filer/hbase/hbase_store.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/filer/foundationdb/foundationdb_store.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/filer/ydb/ydb_store.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* fix

* add scanErr

---------

Co-authored-by: Roman Tamarov <r.tamarov@kryptonite.ru>
Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
Co-authored-by: chrislu <chris.lu@gmail.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
pull/7553/head
tam-i13 3 weeks ago
committed by GitHub
parent
commit
b669607fcd
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 12
      test/foundationdb/foundationdb_concurrent_test.go
  2. 12
      test/foundationdb/foundationdb_integration_test.go
  3. 19
      test/foundationdb/mock_integration_test.go
  4. 12
      weed/filer/abstract_sql/abstract_sql_store.go
  5. 8
      weed/filer/arangodb/arangodb_store.go
  6. 17
      weed/filer/cassandra/cassandra_store.go
  7. 18
      weed/filer/cassandra2/cassandra_store.go
  8. 18
      weed/filer/elastic/v7/elastic_store.go
  9. 12
      weed/filer/etcd/etcd_store.go
  10. 13
      weed/filer/filer.go
  11. 27
      weed/filer/filer_search.go
  12. 5
      weed/filer/filerstore.go
  13. 7
      weed/filer/filerstore_translate_path.go
  14. 22
      weed/filer/filerstore_wrapper.go
  15. 12
      weed/filer/foundationdb/foundationdb_store.go
  16. 12
      weed/filer/foundationdb/foundationdb_store_test.go
  17. 21
      weed/filer/hbase/hbase_store.go
  18. 9
      weed/filer/leveldb/leveldb_store.go
  19. 9
      weed/filer/leveldb2/leveldb2_store.go
  20. 9
      weed/filer/leveldb3/leveldb3_store.go
  21. 14
      weed/filer/mongodb/mongodb_store.go
  22. 9
      weed/filer/redis/universal_redis_store.go
  23. 9
      weed/filer/redis2/universal_redis_store.go
  24. 19
      weed/filer/redis3/universal_redis_store.go
  25. 9
      weed/filer/redis_lua/universal_redis_store.go
  26. 20
      weed/filer/rocksdb/rocksdb_store.go
  27. 13
      weed/filer/store_test/test_suite.go
  28. 9
      weed/filer/tarantool/tarantool_store.go
  29. 24
      weed/filer/tikv/tikv_store.go
  30. 7
      weed/filer/ydb/ydb_store.go
  31. 4
      weed/mount/meta_cache/meta_cache.go
  32. 17
      weed/mount/weedfs_dir_read.go
  33. 8
      weed/server/filer_grpc_server.go
  34. 7
      weed/server/filer_grpc_server_traverse_meta.go

12
test/foundationdb/foundationdb_concurrent_test.go

@ -65,9 +65,9 @@ func TestFoundationDBStore_ConcurrentInserts(t *testing.T) {
expectedTotal := numGoroutines * entriesPerGoroutine
actualCount := 0
_, err := store.ListDirectoryEntries(ctx, "/concurrent", "", true, 10000, func(entry *filer.Entry) bool {
_, err := store.ListDirectoryEntries(ctx, "/concurrent", "", true, 10000, func(entry *filer.Entry) (bool, error) {
actualCount++
return true
return true, nil
})
if err != nil {
t.Fatalf("ListDirectoryEntries failed: %v", err)
@ -265,9 +265,9 @@ func TestFoundationDBStore_ConcurrentTransactions(t *testing.T) {
totalExpectedEntries := successCount * entriesPerTransaction
actualCount := 0
_, err := store.ListDirectoryEntries(ctx, "/transactions", "", true, 10000, func(entry *filer.Entry) bool {
_, err := store.ListDirectoryEntries(ctx, "/transactions", "", true, 10000, func(entry *filer.Entry) (bool, error) {
actualCount++
return true
return true, nil
})
if err != nil {
t.Fatalf("ListDirectoryEntries failed: %v", err)
@ -335,9 +335,9 @@ func TestFoundationDBStore_ConcurrentDirectoryOperations(t *testing.T) {
dirPath := fmt.Sprintf("/worker%d/dir%d", w, d)
fileCount := 0
_, err := store.ListDirectoryEntries(ctx, dirPath, "", true, 1000, func(entry *filer.Entry) bool {
_, err := store.ListDirectoryEntries(ctx, dirPath, "", true, 1000, func(entry *filer.Entry) (bool, error) {
fileCount++
return true
return true, nil
})
if err != nil {
t.Errorf("ListDirectoryEntries failed for %s: %v", dirPath, err)

12
test/foundationdb/foundationdb_integration_test.go

@ -115,9 +115,9 @@ func TestFoundationDBStore_DirectoryOperations(t *testing.T) {
// Test ListDirectoryEntries
var listedFiles []string
lastFileName, err := store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) bool {
lastFileName, err := store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) (bool, error) {
listedFiles = append(listedFiles, entry.Name())
return true
return true, nil
})
if err != nil {
t.Fatalf("ListDirectoryEntries failed: %v", err)
@ -132,9 +132,9 @@ func TestFoundationDBStore_DirectoryOperations(t *testing.T) {
// Test ListDirectoryPrefixedEntries
var prefixedFiles []string
_, err = store.ListDirectoryPrefixedEntries(ctx, testDir, "", true, 100, "file", func(entry *filer.Entry) bool {
_, err = store.ListDirectoryPrefixedEntries(ctx, testDir, "", true, 100, "file", func(entry *filer.Entry) (bool, error) {
prefixedFiles = append(prefixedFiles, entry.Name())
return true
return true, nil
})
if err != nil {
t.Fatalf("ListDirectoryPrefixedEntries failed: %v", err)
@ -153,9 +153,9 @@ func TestFoundationDBStore_DirectoryOperations(t *testing.T) {
// Verify children are deleted
var remainingFiles []string
_, err = store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) bool {
_, err = store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) (bool, error) {
remainingFiles = append(remainingFiles, entry.Name())
return true
return true, nil
})
if err != nil {
t.Fatalf("ListDirectoryEntries after delete failed: %v", err)

19
test/foundationdb/mock_integration_test.go

@ -2,6 +2,7 @@ package foundationdb
import (
"context"
"fmt"
"sort"
"strings"
"testing"
@ -157,14 +158,20 @@ func (store *MockFoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Con
continue
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break
}
lastFileName = entry.Name()
count++
}
return lastFileName, nil
return lastFileName, err
}
func (store *MockFoundationDBStore) KvPut(ctx context.Context, key []byte, value []byte) error {
@ -390,9 +397,9 @@ func TestMockFoundationDBStore_DirectoryOperations(t *testing.T) {
// Test ListDirectoryEntries
var listedFiles []string
lastFileName, err := store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) bool {
lastFileName, err := store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) (bool, error) {
listedFiles = append(listedFiles, entry.Name())
return true
return true, nil
})
if err != nil {
t.Fatalf("ListDirectoryEntries failed: %v", err)
@ -409,9 +416,9 @@ func TestMockFoundationDBStore_DirectoryOperations(t *testing.T) {
// Verify children are deleted
var remainingFiles []string
_, err = store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) bool {
_, err = store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) (bool, error) {
remainingFiles = append(remainingFiles, entry.Name())
return true
return true, nil
})
if err != nil {
t.Fatalf("ListDirectoryEntries after delete failed: %v", err)

12
weed/filer/abstract_sql/abstract_sql_store.go

@ -326,17 +326,23 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break
}
}
return lastFileName, nil
return lastFileName, err
}
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *AbstractSqlStore) Shutdown() {

8
weed/filer/arangodb/arangodb_store.go

@ -335,7 +335,13 @@ sort d.name asc
break
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break
}

17
weed/filer/cassandra/cassandra_store.go

@ -206,12 +206,23 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath u
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.FullPath, resEachEntryFuncErr)
glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", entry.FullPath, resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break
}
}
if err = iter.Close(); err != nil {
glog.V(0).InfofCtx(ctx, "list iterator close: %v", err)
if errClose := iter.Close(); errClose != nil {
glog.V(0).InfofCtx(ctx, "list iterator close: %v", errClose)
if err == nil {
return lastFileName, errClose
}
}
return lastFileName, err

18
weed/filer/cassandra2/cassandra_store.go

@ -206,12 +206,24 @@ func (store *Cassandra2Store) ListDirectoryEntries(ctx context.Context, dirPath
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.FullPath, resEachEntryFuncErr)
glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", entry.FullPath, resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break
}
}
if err = iter.Close(); err != nil {
glog.V(0).InfofCtx(ctx, "list iterator close: %v", err)
if errClose := iter.Close(); errClose != nil {
glog.V(0).InfofCtx(ctx, "list iterator close: %v", errClose)
if err == nil {
return lastFileName, errClose
}
}
return lastFileName, err

18
weed/filer/elastic/v7/elastic_store.go

@ -198,12 +198,12 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e
}
func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
_, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool {
_, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) (bool, error) {
if err := store.DeleteEntry(ctx, entry.FullPath); err != nil {
glog.ErrorfCtx(ctx, "elastic delete %s: %v.", entry.FullPath, err)
return false
return false, err
}
return true
return true, nil
})
return
}
@ -258,9 +258,17 @@ func (store *ElasticStore) listDirectoryEntries(
if fileName == startFileName && !inclusive {
continue
}
if !eachEntryFunc(esEntry.Entry) {
break
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(esEntry.Entry)
if resEachEntryFuncErr != nil {
glog.ErrorfCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr)
return lastFileName, fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", fileName, resEachEntryFuncErr)
}
if !resEachEntryFunc {
return lastFileName, nil
}
lastFileName = fileName
}
}

12
weed/filer/etcd/etcd_store.go

@ -9,7 +9,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -212,9 +212,17 @@ func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break
}
lastFileName = fileName
}

13
weed/filer/filer.go

@ -369,10 +369,11 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
var s3ExpiredEntries []*Entry
var hasValidEntries bool
lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) (bool, error) {
select {
case <-ctx.Done():
return false
glog.Errorf("Context is done.")
return false, fmt.Errorf("context canceled: %w", ctx.Err())
default:
if entry.TtlSec > 0 {
if entry.IsExpireS3Enabled() {
@ -380,13 +381,13 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
// Collect for deletion after iteration completes to avoid DB deadlock
s3ExpiredEntries = append(s3ExpiredEntries, entry)
expiredCount++
return true
return true, nil
}
} else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
// Collect for deletion after iteration completes to avoid DB deadlock
expiredEntries = append(expiredEntries, entry)
expiredCount++
return true
return true, nil
}
}
// Track that we found at least one valid (non-expired) entry
@ -496,9 +497,9 @@ func (f *Filer) DeleteEmptyParentDirectories(ctx context.Context, dirPath util.F
// IsDirectoryEmpty checks if a directory contains any entries
func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bool, error) {
isEmpty := true
_, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) bool {
_, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) (bool, error) {
isEmpty = false
return false // Stop after first entry
return false, nil // Stop after first entry
})
return isEmpty, err
}

27
weed/filer/filer_search.go

@ -2,10 +2,11 @@ package filer
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/util"
"math"
"path/filepath"
"strings"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func splitPattern(pattern string) (prefix string, restPattern string) {
@ -27,9 +28,9 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start
limit = math.MaxInt32 - 1
}
_, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) bool {
_, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) (bool, error) {
entries = append(entries, entry)
return true
return true, nil
})
hasMore = int64(len(entries)) >= limit+1
@ -68,24 +69,32 @@ func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath
return 0, lastFileName, err
}
lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) (bool, error) {
nameToTest := entry.Name()
if len(namePatternExclude) > 0 {
if matched, matchErr := filepath.Match(namePatternExclude, nameToTest); matchErr == nil && matched {
missedCount++
return true
return true, nil
}
}
if len(restNamePattern) > 0 {
if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && !matched {
missedCount++
return true
return true, nil
}
}
if !eachEntryFunc(entry) {
return false
res, resErr := eachEntryFunc(entry)
if resErr != nil {
return false, resErr
}
if !res {
return false, nil
}
return true
return true, nil
})
if err != nil {
return

5
weed/filer/filerstore.go

@ -3,8 +3,9 @@ package filer
import (
"context"
"errors"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
"github.com/seaweedfs/seaweedfs/weed/util"
)
const CountEntryChunksForGzip = 50
@ -16,7 +17,7 @@ var (
ErrKvNotFound = errors.New("kv: not found")
)
type ListEachEntryFunc func(entry *Entry) bool
type ListEachEntryFunc func(entry *Entry) (bool, error)
type FilerStore interface {
// GetName gets the name to locate the configuration in filer.toml file

7
weed/filer/filerstore_translate_path.go

@ -2,9 +2,10 @@ package filer
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/util"
"math"
"strings"
"github.com/seaweedfs/seaweedfs/weed/util"
)
var (
@ -111,7 +112,7 @@ func (t *FilerStorePathTranslator) ListDirectoryEntries(ctx context.Context, dir
newFullPath := t.translatePath(dirPath)
return t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
return t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit, func(entry *Entry) (bool, error) {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
return eachEntryFunc(entry)
})
@ -125,7 +126,7 @@ func (t *FilerStorePathTranslator) ListDirectoryPrefixedEntries(ctx context.Cont
limit = math.MaxInt32 - 1
}
return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool {
return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) (bool, error) {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
return eachEntryFunc(entry)
})

22
weed/filer/filerstore_wrapper.go

@ -2,6 +2,7 @@ package filer
import (
"context"
"fmt"
"io"
"math"
"strings"
@ -254,7 +255,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath
}()
// glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) (bool, error) {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.GetChunks())
return eachEntryFunc(entry)
@ -273,7 +274,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
limit = math.MaxInt32 - 1
}
// glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
adjustedEntryFunc := func(entry *Entry) bool {
adjustedEntryFunc := func(entry *Entry) (bool, error) {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.GetChunks())
return eachEntryFunc(entry)
@ -293,9 +294,9 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
}
var notPrefixed []*Entry
lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) (bool, error) {
notPrefixed = append(notPrefixed, entry)
return true
return true, nil
})
if err != nil {
return
@ -306,7 +307,14 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
for _, entry := range notPrefixed {
if strings.HasPrefix(entry.Name(), prefix) {
count++
if !eachEntryFunc(entry) {
res, resErr := eachEntryFunc(entry)
if resErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.Name(), resErr)
return
}
if !res {
return
}
if count >= limit {
@ -316,9 +324,9 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
}
if count < limit && lastFileName < prefix {
notPrefixed = notPrefixed[:0]
lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool {
lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) (bool, error) {
notPrefixed = append(notPrefixed, entry)
return true
return true, nil
})
if err != nil {
return

12
weed/filer/foundationdb/foundationdb_store.go

@ -318,12 +318,12 @@ func (store *FoundationDBStore) deleteFolderChildrenInBatches(ctx context.Contex
var subDirectories []util.FullPath
// List entries - we'll process BATCH_SIZE at a time
_, err := store.ListDirectoryEntries(ctxNoTxn, fullpath, "", true, int64(BATCH_SIZE), func(entry *filer.Entry) bool {
_, err := store.ListDirectoryEntries(ctxNoTxn, fullpath, "", true, int64(BATCH_SIZE), func(entry *filer.Entry) (bool, error) {
entriesToDelete = append(entriesToDelete, entry.FullPath)
if entry.IsDirectory() {
subDirectories = append(subDirectories, entry.FullPath)
}
return true
return true, nil
})
if err != nil {
@ -474,9 +474,15 @@ func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context
continue
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
glog.ErrorfCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr)
return lastFileName, fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", fileName, resEachEntryFuncErr)
}
if !resEachEntryFunc {
break
}
lastFileName = fileName
}

12
weed/filer/foundationdb/foundationdb_store_test.go

@ -424,9 +424,9 @@ func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) {
// Verify all entries deleted
var count int
store.ListDirectoryEntries(ctx, testDir1, "", true, 1000, func(entry *filer.Entry) bool {
store.ListDirectoryEntries(ctx, testDir1, "", true, 1000, func(entry *filer.Entry) (bool, error) {
count++
return true
return true, nil
})
if count != 0 {
t.Errorf("Expected all entries to be deleted, found %d", count)
@ -468,9 +468,9 @@ func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) {
// Verify entries are still deleted (because DeleteFolderChildren managed its own transactions)
var count int
store.ListDirectoryEntries(ctx, testDir2, "", true, 1000, func(entry *filer.Entry) bool {
store.ListDirectoryEntries(ctx, testDir2, "", true, 1000, func(entry *filer.Entry) (bool, error) {
count++
return true
return true, nil
})
if count != 0 {
@ -531,9 +531,9 @@ func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) {
// Verify all deleted
var count int
store.ListDirectoryEntries(ctx, testDir3, "", true, 1000, func(entry *filer.Entry) bool {
store.ListDirectoryEntries(ctx, testDir3, "", true, 1000, func(entry *filer.Entry) (bool, error) {
count++
return true
return true, nil
})
if count != 0 {
t.Errorf("Expected all nested entries to be deleted, found %d", count)

21
weed/filer/hbase/hbase_store.go

@ -4,13 +4,14 @@ import (
"bytes"
"context"
"fmt"
"io"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/tsuna/gohbase"
"github.com/tsuna/gohbase/hrpc"
"io"
)
func init() {
@ -163,12 +164,12 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
scanner := store.Client.Scan(scan)
defer scanner.Close()
for {
res, err := scanner.Next()
if err == io.EOF {
res, scanErr := scanner.Next()
if scanErr == io.EOF {
break
}
if err != nil {
return lastFileName, err
if scanErr != nil {
return lastFileName, scanErr
}
if len(res.Cells) == 0 {
continue
@ -206,12 +207,18 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
return lastFileName, fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.FullPath, resEachEntryFuncErr)
}
if !resEachEntryFunc {
break
}
}
return lastFileName, nil
return lastFileName, err
}
func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) {

9
weed/filer/leveldb/leveldb_store.go

@ -209,7 +209,14 @@ func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break
}
}

9
weed/filer/leveldb2/leveldb2_store.go

@ -216,7 +216,14 @@ func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, di
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break
}
}

9
weed/filer/leveldb3/leveldb3_store.go

@ -345,7 +345,14 @@ func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, di
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break
}
}

14
weed/filer/mongodb/mongodb_store.go

@ -319,14 +319,22 @@ func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
break
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break
}
}
if err := cur.Close(ctx); err != nil {
glog.V(0).InfofCtx(ctx, "list iterator close: %v", err)
if errClose := cur.Close(ctx); errClose != nil {
glog.V(0).InfofCtx(ctx, "list iterator close: %v", errClose)
if err == nil {
return lastFileName, errClose
}
}
return lastFileName, err

9
weed/filer/redis/universal_redis_store.go

@ -191,7 +191,14 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirP
continue
}
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break
}
}

9
weed/filer/redis2/universal_redis_store.go

@ -206,7 +206,14 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dir
continue
}
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break
}
}

19
weed/filer/redis3/universal_redis_store.go

@ -140,6 +140,7 @@ func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dir
dirListKey := genDirectoryListKey(string(dirPath))
counter := int64(0)
var callbackErr error
err = listChildren(ctx, store, dirListKey, startFileName, func(fileName string) bool {
if startFileName != "" {
if !includeStartFile && startFileName == fileName {
@ -164,9 +165,18 @@ func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dir
}
}
counter++
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr)
callbackErr = resEachEntryFuncErr
return false
}
if !resEachEntryFunc {
return false
}
if counter >= limit {
return false
}
@ -174,6 +184,13 @@ func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dir
return true
})
if callbackErr != nil {
return lastFileName, fmt.Errorf(
"failed to process eachEntryFunc for dir %q, entry %q: %w",
dirPath, lastFileName, callbackErr,
)
}
return lastFileName, err
}

9
weed/filer/redis_lua/universal_redis_store.go

@ -173,7 +173,14 @@ func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, d
continue
}
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break
}
}

20
weed/filer/rocksdb/rocksdb_store.go

@ -251,6 +251,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
defer ro.Destroy()
ro.SetFillCache(false)
var callbackErr error
iter := store.db.NewIterator(ro)
defer iter.Close()
err = enumerate(iter, directoryPrefix, lastFileStart, includeStartFile, limit, startFileName, func(key, value []byte) bool {
@ -269,11 +270,28 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
return false
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr)
callbackErr = resEachEntryFuncErr
return false
}
if !resEachEntryFunc {
return false
}
return true
})
if callbackErr != nil {
return lastFileName, fmt.Errorf(
"failed to process eachEntryFunc for dir %q, entry %q: %w",
dirPath, lastFileName, callbackErr,
)
}
if err != nil {
return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
}

13
weed/filer/store_test/test_suite.go

@ -3,11 +3,12 @@ package store_test
import (
"context"
"fmt"
"os"
"testing"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/stretchr/testify/assert"
"os"
"testing"
)
func TestFilerStore(t *testing.T, store filer.FilerStore) {
@ -23,16 +24,16 @@ func TestFilerStore(t *testing.T, store filer.FilerStore) {
{
var counter int
lastFileName, err := store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), "", false, 3, func(entry *filer.Entry) bool {
lastFileName, err := store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), "", false, 3, func(entry *filer.Entry) (bool, error) {
counter++
return true
return true, nil
})
assert.Nil(t, err, "list directory")
assert.Equal(t, 3, counter, "directory list counter")
assert.Equal(t, "f00002", lastFileName, "directory list last file")
lastFileName, err = store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), lastFileName, false, 1024, func(entry *filer.Entry) bool {
lastFileName, err = store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), lastFileName, false, 1024, func(entry *filer.Entry) (bool, error) {
counter++
return true
return true, nil
})
assert.Nil(t, err, "list directory")
assert.Equal(t, 1027, counter, "directory list counter")

9
weed/filer/tarantool/tarantool_store.go

@ -305,7 +305,14 @@ func (store *TarantoolStore) ListDirectoryEntries(ctx context.Context, dirPath w
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break
}
}

24
weed/filer/tikv/tikv_store.go

@ -223,6 +223,7 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat
if err != nil {
return lastFileName, err
}
var callbackErr error
err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
iter, err := txn.Iter(lastFileStart, nil)
if err != nil {
@ -283,12 +284,33 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat
// Only increment counter for non-expired entries
i++
if err := iter.Next(); !eachEntryFunc(entry) || err != nil {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr)
callbackErr = resEachEntryFuncErr
break
}
nextErr := iter.Next()
if nextErr != nil {
err = nextErr
break
}
if !resEachEntryFunc {
break
}
}
return err
})
if callbackErr != nil {
return lastFileName, fmt.Errorf(
"failed to process eachEntryFunc for dir %q, entry %q: %w",
dirPath, lastFileName, callbackErr,
)
}
if err != nil {
return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
}

7
weed/filer/ydb/ydb_store.go

@ -313,7 +313,12 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath
return fmt.Errorf("decode entry %s: %w", entry.FullPath, decodeErr)
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
return fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.FullPath, resEachEntryFuncErr)
}
if !resEachEntryFunc {
return nil
}

4
weed/mount/meta_cache/meta_cache.go

@ -146,9 +146,9 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full
glog.Warningf("unsynchronized dir: %v", dirPath)
}
_, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {
_, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) (bool, error) {
if entry.TtlSec > 0 && entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) {
return true
return true, nil
}
mc.mapIdFromFilerToLocal(entry)
return eachEntryFunc(entry)

17
weed/mount/weedfs_dir_read.go

@ -2,13 +2,14 @@ package mount
import (
"context"
"math"
"sync"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/util"
"math"
"sync"
)
type DirectoryHandleId uint64
@ -153,7 +154,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
}
var dirEntry fuse.DirEntry
processEachEntryFn := func(entry *filer.Entry) bool {
processEachEntryFn := func(entry *filer.Entry) (bool, error) {
dirEntry.Name = entry.Name()
dirEntry.Mode = toSyscallMode(entry.Mode)
inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, isPlusMode)
@ -161,13 +162,13 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
if !isPlusMode {
if !out.AddDirEntry(dirEntry) {
isEarlyTerminated = true
return false
return false, nil
}
} else {
entryOut := out.AddDirLookupEntry(dirEntry)
if entryOut == nil {
isEarlyTerminated = true
return false
return false, nil
}
if fh, found := wfs.fhMap.FindFileHandle(inode); found {
glog.V(4).Infof("readdir opened file %s", dirPath.Child(dirEntry.Name))
@ -175,7 +176,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
}
wfs.outputFilerEntry(entryOut, inode, entry)
}
return true
return true, nil
}
if input.Offset < directoryStreamBaseOffset {
@ -206,7 +207,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
entryCurrentIndex := input.Offset - dh.entryStreamOffset
for uint64(len(dh.entryStream)) > entryCurrentIndex {
entry := dh.entryStream[entryCurrentIndex]
if processEachEntryFn(entry) {
if process, _ := processEachEntryFn(entry); process {
lastEntryName = entry.Name()
entryCurrentIndex++
} else {
@ -221,7 +222,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return fuse.EIO
}
listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) (bool, error) {
dh.entryStream = append(dh.entryStream, entry)
return processEachEntryFn(entry)
})

8
weed/server/filer_grpc_server.go

@ -56,19 +56,19 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
var listErr error
for limit > 0 {
var hasEntries bool
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) bool {
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) (bool, error) {
hasEntries = true
if err = stream.Send(&filer_pb.ListEntriesResponse{
Entry: entry.ToProtoEntry(),
}); err != nil {
return false
return false, err
}
limit--
if limit == 0 {
return false
return false, nil
}
return true
return true, nil
})
if listErr != nil {

7
weed/server/filer_grpc_server_traverse_meta.go

@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -63,13 +64,13 @@ func (fs *FilerServer) iterateDirectory(ctx context.Context, dirPath util.FullPa
var listErr error
for {
var hasEntries bool
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) bool {
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) (bool, error) {
hasEntries = true
if fnErr := fn(entry); fnErr != nil {
err = fnErr
return false
return false, err
}
return true
return true, nil
})
if listErr != nil {
return listErr

Loading…
Cancel
Save