From b669607fcdb959c9a5c8ba7accd5b65e54cafbc6 Mon Sep 17 00:00:00 2001 From: tam-i13 <46927823+tam-i13@users.noreply.github.com> Date: Wed, 26 Nov 2025 06:35:19 +0300 Subject: [PATCH] Add error list each entry func (#7485) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * added error return in type ListEachEntryFunc * return error if errClose * fix fmt.Errorf * fix return errClose * use %w fmt.Errorf * added entry in messege error * add callbackErr in ListDirectoryEntries * fix error * add log * clear err when the scanner stops on io.EOF, so returning err doesn’t surface EOF as a failure. * more info in error * add ctx to logs, error handling * fix return eachEntryFunc * fix * fix log * fix return * fix foundationdb test s * fix eachEntryFunc * fix return resEachEntryFuncErr * Update weed/filer/filer.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/filer/elastic/v7/elastic_store.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/filer/hbase/hbase_store.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/filer/foundationdb/foundationdb_store.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/filer/ydb/ydb_store.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fix * add scanErr --------- Co-authored-by: Roman Tamarov Co-authored-by: Chris Lu Co-authored-by: chrislu Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../foundationdb_concurrent_test.go | 12 ++-- .../foundationdb_integration_test.go | 12 ++-- test/foundationdb/mock_integration_test.go | 19 +++++-- weed/filer/abstract_sql/abstract_sql_store.go | 12 +++- weed/filer/arangodb/arangodb_store.go | 8 ++- weed/filer/cassandra/cassandra_store.go | 17 +++++- weed/filer/cassandra2/cassandra_store.go | 18 +++++- weed/filer/elastic/v7/elastic_store.go | 18 ++++-- weed/filer/etcd/etcd_store.go | 12 +++- weed/filer/filer.go | 13 +++-- weed/filer/filer_search.go | 27 ++++++--- weed/filer/filerstore.go | 5 +- weed/filer/filerstore_translate_path.go | 7 ++- weed/filer/filerstore_wrapper.go | 22 +++++--- weed/filer/foundationdb/foundationdb_store.go | 12 +++- .../foundationdb/foundationdb_store_test.go | 56 +++++++++---------- weed/filer/hbase/hbase_store.go | 21 ++++--- weed/filer/leveldb/leveldb_store.go | 9 ++- weed/filer/leveldb2/leveldb2_store.go | 9 ++- weed/filer/leveldb3/leveldb3_store.go | 9 ++- weed/filer/mongodb/mongodb_store.go | 14 ++++- weed/filer/redis/universal_redis_store.go | 9 ++- weed/filer/redis2/universal_redis_store.go | 9 ++- weed/filer/redis3/universal_redis_store.go | 19 ++++++- weed/filer/redis_lua/universal_redis_store.go | 9 ++- weed/filer/rocksdb/rocksdb_store.go | 20 ++++++- weed/filer/store_test/test_suite.go | 13 +++-- weed/filer/tarantool/tarantool_store.go | 9 ++- weed/filer/tikv/tikv_store.go | 24 +++++++- weed/filer/ydb/ydb_store.go | 7 ++- weed/mount/meta_cache/meta_cache.go | 4 +- weed/mount/weedfs_dir_read.go | 17 +++--- weed/server/filer_grpc_server.go | 8 +-- .../server/filer_grpc_server_traverse_meta.go | 7 ++- 34 files changed, 350 insertions(+), 137 deletions(-) diff --git a/test/foundationdb/foundationdb_concurrent_test.go b/test/foundationdb/foundationdb_concurrent_test.go index b0ecaf742..de49ecc61 100644 --- a/test/foundationdb/foundationdb_concurrent_test.go +++ b/test/foundationdb/foundationdb_concurrent_test.go @@ -65,9 +65,9 @@ func TestFoundationDBStore_ConcurrentInserts(t *testing.T) { expectedTotal := numGoroutines * entriesPerGoroutine actualCount := 0 - _, err := store.ListDirectoryEntries(ctx, "/concurrent", "", true, 10000, func(entry *filer.Entry) bool { + _, err := store.ListDirectoryEntries(ctx, "/concurrent", "", true, 10000, func(entry *filer.Entry) (bool, error) { actualCount++ - return true + return true, nil }) if err != nil { t.Fatalf("ListDirectoryEntries failed: %v", err) @@ -265,9 +265,9 @@ func TestFoundationDBStore_ConcurrentTransactions(t *testing.T) { totalExpectedEntries := successCount * entriesPerTransaction actualCount := 0 - _, err := store.ListDirectoryEntries(ctx, "/transactions", "", true, 10000, func(entry *filer.Entry) bool { + _, err := store.ListDirectoryEntries(ctx, "/transactions", "", true, 10000, func(entry *filer.Entry) (bool, error) { actualCount++ - return true + return true, nil }) if err != nil { t.Fatalf("ListDirectoryEntries failed: %v", err) @@ -335,9 +335,9 @@ func TestFoundationDBStore_ConcurrentDirectoryOperations(t *testing.T) { dirPath := fmt.Sprintf("/worker%d/dir%d", w, d) fileCount := 0 - _, err := store.ListDirectoryEntries(ctx, dirPath, "", true, 1000, func(entry *filer.Entry) bool { + _, err := store.ListDirectoryEntries(ctx, dirPath, "", true, 1000, func(entry *filer.Entry) (bool, error) { fileCount++ - return true + return true, nil }) if err != nil { t.Errorf("ListDirectoryEntries failed for %s: %v", dirPath, err) diff --git a/test/foundationdb/foundationdb_integration_test.go b/test/foundationdb/foundationdb_integration_test.go index 5fdf993d7..63ed41ef9 100644 --- a/test/foundationdb/foundationdb_integration_test.go +++ b/test/foundationdb/foundationdb_integration_test.go @@ -115,9 +115,9 @@ func TestFoundationDBStore_DirectoryOperations(t *testing.T) { // Test ListDirectoryEntries var listedFiles []string - lastFileName, err := store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) bool { + lastFileName, err := store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) (bool, error) { listedFiles = append(listedFiles, entry.Name()) - return true + return true, nil }) if err != nil { t.Fatalf("ListDirectoryEntries failed: %v", err) @@ -132,9 +132,9 @@ func TestFoundationDBStore_DirectoryOperations(t *testing.T) { // Test ListDirectoryPrefixedEntries var prefixedFiles []string - _, err = store.ListDirectoryPrefixedEntries(ctx, testDir, "", true, 100, "file", func(entry *filer.Entry) bool { + _, err = store.ListDirectoryPrefixedEntries(ctx, testDir, "", true, 100, "file", func(entry *filer.Entry) (bool, error) { prefixedFiles = append(prefixedFiles, entry.Name()) - return true + return true, nil }) if err != nil { t.Fatalf("ListDirectoryPrefixedEntries failed: %v", err) @@ -153,9 +153,9 @@ func TestFoundationDBStore_DirectoryOperations(t *testing.T) { // Verify children are deleted var remainingFiles []string - _, err = store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) bool { + _, err = store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) (bool, error) { remainingFiles = append(remainingFiles, entry.Name()) - return true + return true, nil }) if err != nil { t.Fatalf("ListDirectoryEntries after delete failed: %v", err) diff --git a/test/foundationdb/mock_integration_test.go b/test/foundationdb/mock_integration_test.go index 5073ba5b3..9639932ba 100644 --- a/test/foundationdb/mock_integration_test.go +++ b/test/foundationdb/mock_integration_test.go @@ -2,6 +2,7 @@ package foundationdb import ( "context" + "fmt" "sort" "strings" "testing" @@ -157,14 +158,20 @@ func (store *MockFoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Con continue } - if !eachEntryFunc(entry) { + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr) break } + if !resEachEntryFunc { + break + } + lastFileName = entry.Name() count++ } - return lastFileName, nil + return lastFileName, err } func (store *MockFoundationDBStore) KvPut(ctx context.Context, key []byte, value []byte) error { @@ -390,9 +397,9 @@ func TestMockFoundationDBStore_DirectoryOperations(t *testing.T) { // Test ListDirectoryEntries var listedFiles []string - lastFileName, err := store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) bool { + lastFileName, err := store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) (bool, error) { listedFiles = append(listedFiles, entry.Name()) - return true + return true, nil }) if err != nil { t.Fatalf("ListDirectoryEntries failed: %v", err) @@ -409,9 +416,9 @@ func TestMockFoundationDBStore_DirectoryOperations(t *testing.T) { // Verify children are deleted var remainingFiles []string - _, err = store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) bool { + _, err = store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) (bool, error) { remainingFiles = append(remainingFiles, entry.Name()) - return true + return true, nil }) if err != nil { t.Fatalf("ListDirectoryEntries after delete failed: %v", err) diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index a83b33341..0b27104cf 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -326,17 +326,23 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) } - if !eachEntryFunc(entry) { + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } - return lastFileName, nil + return lastFileName, err } func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil) + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) } func (store *AbstractSqlStore) Shutdown() { diff --git a/weed/filer/arangodb/arangodb_store.go b/weed/filer/arangodb/arangodb_store.go index 0a3a06d16..7b2184c62 100644 --- a/weed/filer/arangodb/arangodb_store.go +++ b/weed/filer/arangodb/arangodb_store.go @@ -335,7 +335,13 @@ sort d.name asc break } - if !eachEntryFunc(entry) { + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go index 0d0c17e1d..968a2b3c3 100644 --- a/weed/filer/cassandra/cassandra_store.go +++ b/weed/filer/cassandra/cassandra_store.go @@ -206,12 +206,23 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath u glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.FullPath, resEachEntryFuncErr) + glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", entry.FullPath, resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } - if err = iter.Close(); err != nil { - glog.V(0).InfofCtx(ctx, "list iterator close: %v", err) + if errClose := iter.Close(); errClose != nil { + glog.V(0).InfofCtx(ctx, "list iterator close: %v", errClose) + if err == nil { + return lastFileName, errClose + } } return lastFileName, err diff --git a/weed/filer/cassandra2/cassandra_store.go b/weed/filer/cassandra2/cassandra_store.go index 8ff7f5874..7ce3d32c1 100644 --- a/weed/filer/cassandra2/cassandra_store.go +++ b/weed/filer/cassandra2/cassandra_store.go @@ -206,12 +206,24 @@ func (store *Cassandra2Store) ListDirectoryEntries(ctx context.Context, dirPath glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.FullPath, resEachEntryFuncErr) + glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", entry.FullPath, resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } - if err = iter.Close(); err != nil { - glog.V(0).InfofCtx(ctx, "list iterator close: %v", err) + + if errClose := iter.Close(); errClose != nil { + glog.V(0).InfofCtx(ctx, "list iterator close: %v", errClose) + if err == nil { + return lastFileName, errClose + } } return lastFileName, err diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index 5b88025e4..159330dec 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -198,12 +198,12 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e } func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { - _, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool { + _, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) (bool, error) { if err := store.DeleteEntry(ctx, entry.FullPath); err != nil { glog.ErrorfCtx(ctx, "elastic delete %s: %v.", entry.FullPath, err) - return false + return false, err } - return true + return true, nil }) return } @@ -258,9 +258,17 @@ func (store *ElasticStore) listDirectoryEntries( if fileName == startFileName && !inclusive { continue } - if !eachEntryFunc(esEntry.Entry) { - break + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(esEntry.Entry) + if resEachEntryFuncErr != nil { + glog.ErrorfCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr) + return lastFileName, fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", fileName, resEachEntryFuncErr) } + + if !resEachEntryFunc { + return lastFileName, nil + } + lastFileName = fileName } } diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go index d300a7048..0b83bacc8 100644 --- a/weed/filer/etcd/etcd_store.go +++ b/weed/filer/etcd/etcd_store.go @@ -9,7 +9,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/transport" - "go.etcd.io/etcd/client/v3" + clientv3 "go.etcd.io/etcd/client/v3" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -212,9 +212,17 @@ func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr) break } + + if !resEachEntryFunc { + break + } + lastFileName = fileName } diff --git a/weed/filer/filer.go b/weed/filer/filer.go index b68004a8b..f9f3d4fb2 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -369,10 +369,11 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta var s3ExpiredEntries []*Entry var hasValidEntries bool - lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool { + lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) (bool, error) { select { case <-ctx.Done(): - return false + glog.Errorf("Context is done.") + return false, fmt.Errorf("context canceled: %w", ctx.Err()) default: if entry.TtlSec > 0 { if entry.IsExpireS3Enabled() { @@ -380,13 +381,13 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta // Collect for deletion after iteration completes to avoid DB deadlock s3ExpiredEntries = append(s3ExpiredEntries, entry) expiredCount++ - return true + return true, nil } } else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { // Collect for deletion after iteration completes to avoid DB deadlock expiredEntries = append(expiredEntries, entry) expiredCount++ - return true + return true, nil } } // Track that we found at least one valid (non-expired) entry @@ -496,9 +497,9 @@ func (f *Filer) DeleteEmptyParentDirectories(ctx context.Context, dirPath util.F // IsDirectoryEmpty checks if a directory contains any entries func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bool, error) { isEmpty := true - _, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) bool { + _, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) (bool, error) { isEmpty = false - return false // Stop after first entry + return false, nil // Stop after first entry }) return isEmpty, err } diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go index 6c7ba0747..294fc0e7f 100644 --- a/weed/filer/filer_search.go +++ b/weed/filer/filer_search.go @@ -2,10 +2,11 @@ package filer import ( "context" - "github.com/seaweedfs/seaweedfs/weed/util" "math" "path/filepath" "strings" + + "github.com/seaweedfs/seaweedfs/weed/util" ) func splitPattern(pattern string) (prefix string, restPattern string) { @@ -27,9 +28,9 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start limit = math.MaxInt32 - 1 } - _, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) bool { + _, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) (bool, error) { entries = append(entries, entry) - return true + return true, nil }) hasMore = int64(len(entries)) >= limit+1 @@ -68,24 +69,32 @@ func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath return 0, lastFileName, err } - lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool { + lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) (bool, error) { nameToTest := entry.Name() if len(namePatternExclude) > 0 { if matched, matchErr := filepath.Match(namePatternExclude, nameToTest); matchErr == nil && matched { missedCount++ - return true + return true, nil } } if len(restNamePattern) > 0 { if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && !matched { missedCount++ - return true + return true, nil } } - if !eachEntryFunc(entry) { - return false + + res, resErr := eachEntryFunc(entry) + + if resErr != nil { + return false, resErr + } + + if !res { + return false, nil } - return true + + return true, nil }) if err != nil { return diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index 87e212ea5..968943608 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -3,8 +3,9 @@ package filer import ( "context" "errors" - "github.com/seaweedfs/seaweedfs/weed/util" "io" + + "github.com/seaweedfs/seaweedfs/weed/util" ) const CountEntryChunksForGzip = 50 @@ -16,7 +17,7 @@ var ( ErrKvNotFound = errors.New("kv: not found") ) -type ListEachEntryFunc func(entry *Entry) bool +type ListEachEntryFunc func(entry *Entry) (bool, error) type FilerStore interface { // GetName gets the name to locate the configuration in filer.toml file diff --git a/weed/filer/filerstore_translate_path.go b/weed/filer/filerstore_translate_path.go index 900154fde..97d388466 100644 --- a/weed/filer/filerstore_translate_path.go +++ b/weed/filer/filerstore_translate_path.go @@ -2,9 +2,10 @@ package filer import ( "context" - "github.com/seaweedfs/seaweedfs/weed/util" "math" "strings" + + "github.com/seaweedfs/seaweedfs/weed/util" ) var ( @@ -111,7 +112,7 @@ func (t *FilerStorePathTranslator) ListDirectoryEntries(ctx context.Context, dir newFullPath := t.translatePath(dirPath) - return t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit, func(entry *Entry) bool { + return t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit, func(entry *Entry) (bool, error) { entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath return eachEntryFunc(entry) }) @@ -125,7 +126,7 @@ func (t *FilerStorePathTranslator) ListDirectoryPrefixedEntries(ctx context.Cont limit = math.MaxInt32 - 1 } - return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool { + return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) (bool, error) { entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath return eachEntryFunc(entry) }) diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index ea039d444..8694db984 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -2,6 +2,7 @@ package filer import ( "context" + "fmt" "io" "math" "strings" @@ -254,7 +255,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath }() // glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit) - return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool { + return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) (bool, error) { fsw.maybeReadHardLink(ctx, entry) filer_pb.AfterEntryDeserialization(entry.GetChunks()) return eachEntryFunc(entry) @@ -273,7 +274,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, limit = math.MaxInt32 - 1 } // glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit) - adjustedEntryFunc := func(entry *Entry) bool { + adjustedEntryFunc := func(entry *Entry) (bool, error) { fsw.maybeReadHardLink(ctx, entry) filer_pb.AfterEntryDeserialization(entry.GetChunks()) return eachEntryFunc(entry) @@ -293,9 +294,9 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u } var notPrefixed []*Entry - lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool { + lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) (bool, error) { notPrefixed = append(notPrefixed, entry) - return true + return true, nil }) if err != nil { return @@ -306,7 +307,14 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u for _, entry := range notPrefixed { if strings.HasPrefix(entry.Name(), prefix) { count++ - if !eachEntryFunc(entry) { + res, resErr := eachEntryFunc(entry) + + if resErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.Name(), resErr) + return + } + + if !res { return } if count >= limit { @@ -316,9 +324,9 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u } if count < limit && lastFileName < prefix { notPrefixed = notPrefixed[:0] - lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool { + lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) (bool, error) { notPrefixed = append(notPrefixed, entry) - return true + return true, nil }) if err != nil { return diff --git a/weed/filer/foundationdb/foundationdb_store.go b/weed/filer/foundationdb/foundationdb_store.go index 509ee4b86..720afd7bc 100644 --- a/weed/filer/foundationdb/foundationdb_store.go +++ b/weed/filer/foundationdb/foundationdb_store.go @@ -318,12 +318,12 @@ func (store *FoundationDBStore) deleteFolderChildrenInBatches(ctx context.Contex var subDirectories []util.FullPath // List entries - we'll process BATCH_SIZE at a time - _, err := store.ListDirectoryEntries(ctxNoTxn, fullpath, "", true, int64(BATCH_SIZE), func(entry *filer.Entry) bool { + _, err := store.ListDirectoryEntries(ctxNoTxn, fullpath, "", true, int64(BATCH_SIZE), func(entry *filer.Entry) (bool, error) { entriesToDelete = append(entriesToDelete, entry.FullPath) if entry.IsDirectory() { subDirectories = append(subDirectories, entry.FullPath) } - return true + return true, nil }) if err != nil { @@ -474,9 +474,15 @@ func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context continue } - if !eachEntryFunc(entry) { + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + glog.ErrorfCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr) + return lastFileName, fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", fileName, resEachEntryFuncErr) + } + if !resEachEntryFunc { break } + lastFileName = fileName } diff --git a/weed/filer/foundationdb/foundationdb_store_test.go b/weed/filer/foundationdb/foundationdb_store_test.go index 215c98c76..73255d67d 100644 --- a/weed/filer/foundationdb/foundationdb_store_test.go +++ b/weed/filer/foundationdb/foundationdb_store_test.go @@ -372,16 +372,16 @@ func containsString(s, substr string) bool { func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) { // This test validates that DeleteFolderChildren always uses batching // to safely handle large directories, regardless of transaction context - + store := getTestStore(t) defer store.Shutdown() - + ctx := context.Background() testDir := util.FullPath(fmt.Sprintf("/test_batch_delete_%d", time.Now().UnixNano())) - + // Create a large directory (> 100 entries to trigger batching) const NUM_ENTRIES = 250 - + t.Logf("Creating %d test entries...", NUM_ENTRIES) for i := 0; i < NUM_ENTRIES; i++ { entry := &filer.Entry{ @@ -397,11 +397,11 @@ func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) { t.Fatalf("Failed to insert test entry %d: %v", i, err) } } - + // Test 1: DeleteFolderChildren outside transaction should succeed t.Run("OutsideTransaction", func(t *testing.T) { testDir1 := util.FullPath(fmt.Sprintf("/test_batch_1_%d", time.Now().UnixNano())) - + // Create entries for i := 0; i < NUM_ENTRIES; i++ { entry := &filer.Entry{ @@ -415,28 +415,28 @@ func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) { } store.InsertEntry(ctx, entry) } - + // Delete with batching err := store.DeleteFolderChildren(ctx, testDir1) if err != nil { t.Errorf("DeleteFolderChildren outside transaction should succeed, got error: %v", err) } - + // Verify all entries deleted var count int - store.ListDirectoryEntries(ctx, testDir1, "", true, 1000, func(entry *filer.Entry) bool { + store.ListDirectoryEntries(ctx, testDir1, "", true, 1000, func(entry *filer.Entry) (bool, error) { count++ - return true + return true, nil }) if count != 0 { t.Errorf("Expected all entries to be deleted, found %d", count) } }) - + // Test 2: DeleteFolderChildren with transaction context - uses its own batched transactions t.Run("WithTransactionContext", func(t *testing.T) { testDir2 := util.FullPath(fmt.Sprintf("/test_batch_2_%d", time.Now().UnixNano())) - + // Create entries for i := 0; i < NUM_ENTRIES; i++ { entry := &filer.Entry{ @@ -450,38 +450,38 @@ func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) { } store.InsertEntry(ctx, entry) } - + // Start a transaction (DeleteFolderChildren will ignore it and use its own batching) txCtx, err := store.BeginTransaction(ctx) if err != nil { t.Fatalf("BeginTransaction failed: %v", err) } - + // Delete large directory - should succeed with batching err = store.DeleteFolderChildren(txCtx, testDir2) if err != nil { t.Errorf("DeleteFolderChildren should succeed with batching even when transaction context present, got: %v", err) } - + // Rollback transaction (DeleteFolderChildren used its own transactions, so this doesn't affect deletions) store.RollbackTransaction(txCtx) - + // Verify entries are still deleted (because DeleteFolderChildren managed its own transactions) var count int - store.ListDirectoryEntries(ctx, testDir2, "", true, 1000, func(entry *filer.Entry) bool { + store.ListDirectoryEntries(ctx, testDir2, "", true, 1000, func(entry *filer.Entry) (bool, error) { count++ - return true + return true, nil }) - + if count != 0 { t.Errorf("Expected all entries to be deleted, found %d (DeleteFolderChildren uses its own transactions)", count) } }) - + // Test 3: Nested directories with batching t.Run("NestedDirectories", func(t *testing.T) { testDir3 := util.FullPath(fmt.Sprintf("/test_batch_3_%d", time.Now().UnixNano())) - + // Create nested structure for i := 0; i < 50; i++ { // Files in root @@ -495,7 +495,7 @@ func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) { }, } store.InsertEntry(ctx, entry) - + // Subdirectory subDir := &filer.Entry{ FullPath: util.NewFullPath(string(testDir3), fmt.Sprintf("dir_%02d", i)), @@ -507,7 +507,7 @@ func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) { }, } store.InsertEntry(ctx, subDir) - + // Files in subdirectory for j := 0; j < 3; j++ { subEntry := &filer.Entry{ @@ -522,24 +522,24 @@ func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) { store.InsertEntry(ctx, subEntry) } } - + // Delete all with batching err := store.DeleteFolderChildren(ctx, testDir3) if err != nil { t.Errorf("DeleteFolderChildren should handle nested directories, got: %v", err) } - + // Verify all deleted var count int - store.ListDirectoryEntries(ctx, testDir3, "", true, 1000, func(entry *filer.Entry) bool { + store.ListDirectoryEntries(ctx, testDir3, "", true, 1000, func(entry *filer.Entry) (bool, error) { count++ - return true + return true, nil }) if count != 0 { t.Errorf("Expected all nested entries to be deleted, found %d", count) } }) - + // Cleanup store.DeleteFolderChildren(ctx, testDir) } diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go index 8642146e6..7663fef9d 100644 --- a/weed/filer/hbase/hbase_store.go +++ b/weed/filer/hbase/hbase_store.go @@ -4,13 +4,14 @@ import ( "bytes" "context" "fmt" + "io" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/tsuna/gohbase" "github.com/tsuna/gohbase/hrpc" - "io" ) func init() { @@ -163,12 +164,12 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa scanner := store.Client.Scan(scan) defer scanner.Close() for { - res, err := scanner.Next() - if err == io.EOF { + res, scanErr := scanner.Next() + if scanErr == io.EOF { break } - if err != nil { - return lastFileName, err + if scanErr != nil { + return lastFileName, scanErr } if len(res.Cells) == 0 { continue @@ -206,12 +207,18 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + return lastFileName, fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.FullPath, resEachEntryFuncErr) + } + + if !resEachEntryFunc { break } } - return lastFileName, nil + return lastFileName, err } func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) { diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go index ff1465c23..fea6e0a3d 100644 --- a/weed/filer/leveldb/leveldb_store.go +++ b/weed/filer/leveldb/leveldb_store.go @@ -209,7 +209,14 @@ func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go index 1bd6fe597..c3bb2ac55 100644 --- a/weed/filer/leveldb2/leveldb2_store.go +++ b/weed/filer/leveldb2/leveldb2_store.go @@ -216,7 +216,14 @@ func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, di glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go index eb8b4e578..439143ceb 100644 --- a/weed/filer/leveldb3/leveldb3_store.go +++ b/weed/filer/leveldb3/leveldb3_store.go @@ -345,7 +345,14 @@ func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, di glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go index 21463dc32..f87878f08 100644 --- a/weed/filer/mongodb/mongodb_store.go +++ b/weed/filer/mongodb/mongodb_store.go @@ -319,14 +319,22 @@ func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dir break } - if !eachEntryFunc(entry) { + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr) break } + if !resEachEntryFunc { + break + } } - if err := cur.Close(ctx); err != nil { - glog.V(0).InfofCtx(ctx, "list iterator close: %v", err) + if errClose := cur.Close(ctx); errClose != nil { + glog.V(0).InfofCtx(ctx, "list iterator close: %v", errClose) + if err == nil { + return lastFileName, errClose + } } return lastFileName, err diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go index 407491a04..7c2a0e47b 100644 --- a/weed/filer/redis/universal_redis_store.go +++ b/weed/filer/redis/universal_redis_store.go @@ -191,7 +191,14 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirP continue } } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go index 1fa384f29..0dbf7a72a 100644 --- a/weed/filer/redis2/universal_redis_store.go +++ b/weed/filer/redis2/universal_redis_store.go @@ -206,7 +206,14 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dir continue } } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } diff --git a/weed/filer/redis3/universal_redis_store.go b/weed/filer/redis3/universal_redis_store.go index 699683d91..84cd42908 100644 --- a/weed/filer/redis3/universal_redis_store.go +++ b/weed/filer/redis3/universal_redis_store.go @@ -140,6 +140,7 @@ func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dir dirListKey := genDirectoryListKey(string(dirPath)) counter := int64(0) + var callbackErr error err = listChildren(ctx, store, dirListKey, startFileName, func(fileName string) bool { if startFileName != "" { if !includeStartFile && startFileName == fileName { @@ -164,9 +165,18 @@ func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dir } } counter++ - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr) + callbackErr = resEachEntryFuncErr + return false + } + + if !resEachEntryFunc { return false } + if counter >= limit { return false } @@ -174,6 +184,13 @@ func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dir return true }) + if callbackErr != nil { + return lastFileName, fmt.Errorf( + "failed to process eachEntryFunc for dir %q, entry %q: %w", + dirPath, lastFileName, callbackErr, + ) + } + return lastFileName, err } diff --git a/weed/filer/redis_lua/universal_redis_store.go b/weed/filer/redis_lua/universal_redis_store.go index 20b83a2a9..35f6d4991 100644 --- a/weed/filer/redis_lua/universal_redis_store.go +++ b/weed/filer/redis_lua/universal_redis_store.go @@ -173,7 +173,14 @@ func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, d continue } } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go index 044dc1342..2283efa6f 100644 --- a/weed/filer/rocksdb/rocksdb_store.go +++ b/weed/filer/rocksdb/rocksdb_store.go @@ -251,6 +251,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir defer ro.Destroy() ro.SetFillCache(false) + var callbackErr error iter := store.db.NewIterator(ro) defer iter.Close() err = enumerate(iter, directoryPrefix, lastFileStart, includeStartFile, limit, startFileName, func(key, value []byte) bool { @@ -269,11 +270,28 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) return false } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr) + callbackErr = resEachEntryFuncErr + return false + } + + if !resEachEntryFunc { return false } + return true }) + + if callbackErr != nil { + return lastFileName, fmt.Errorf( + "failed to process eachEntryFunc for dir %q, entry %q: %w", + dirPath, lastFileName, callbackErr, + ) + } + if err != nil { return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err) } diff --git a/weed/filer/store_test/test_suite.go b/weed/filer/store_test/test_suite.go index fda694f26..ae334cdd8 100644 --- a/weed/filer/store_test/test_suite.go +++ b/weed/filer/store_test/test_suite.go @@ -3,11 +3,12 @@ package store_test import ( "context" "fmt" + "os" + "testing" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/stretchr/testify/assert" - "os" - "testing" ) func TestFilerStore(t *testing.T, store filer.FilerStore) { @@ -23,16 +24,16 @@ func TestFilerStore(t *testing.T, store filer.FilerStore) { { var counter int - lastFileName, err := store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), "", false, 3, func(entry *filer.Entry) bool { + lastFileName, err := store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), "", false, 3, func(entry *filer.Entry) (bool, error) { counter++ - return true + return true, nil }) assert.Nil(t, err, "list directory") assert.Equal(t, 3, counter, "directory list counter") assert.Equal(t, "f00002", lastFileName, "directory list last file") - lastFileName, err = store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), lastFileName, false, 1024, func(entry *filer.Entry) bool { + lastFileName, err = store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), lastFileName, false, 1024, func(entry *filer.Entry) (bool, error) { counter++ - return true + return true, nil }) assert.Nil(t, err, "list directory") assert.Equal(t, 1027, counter, "directory list counter") diff --git a/weed/filer/tarantool/tarantool_store.go b/weed/filer/tarantool/tarantool_store.go index 4c9f8a600..1bcd31830 100644 --- a/weed/filer/tarantool/tarantool_store.go +++ b/weed/filer/tarantool/tarantool_store.go @@ -305,7 +305,14 @@ func (store *TarantoolStore) ListDirectoryEntries(ctx context.Context, dirPath w glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go index 3708ddec5..307d2b3fb 100644 --- a/weed/filer/tikv/tikv_store.go +++ b/weed/filer/tikv/tikv_store.go @@ -223,6 +223,7 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat if err != nil { return lastFileName, err } + var callbackErr error err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { iter, err := txn.Iter(lastFileStart, nil) if err != nil { @@ -283,12 +284,33 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat // Only increment counter for non-expired entries i++ - if err := iter.Next(); !eachEntryFunc(entry) || err != nil { + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr) + callbackErr = resEachEntryFuncErr + break + } + + nextErr := iter.Next() + if nextErr != nil { + err = nextErr + break + } + + if !resEachEntryFunc { break } } return err }) + + if callbackErr != nil { + return lastFileName, fmt.Errorf( + "failed to process eachEntryFunc for dir %q, entry %q: %w", + dirPath, lastFileName, callbackErr, + ) + } + if err != nil { return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err) } diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go index 90b13aa04..ee94d13e1 100644 --- a/weed/filer/ydb/ydb_store.go +++ b/weed/filer/ydb/ydb_store.go @@ -313,7 +313,12 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath return fmt.Errorf("decode entry %s: %w", entry.FullPath, decodeErr) } - if !eachEntryFunc(entry) { + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + return fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.FullPath, resEachEntryFuncErr) + } + + if !resEachEntryFunc { return nil } diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go index 0f0b1de30..9578aff72 100644 --- a/weed/mount/meta_cache/meta_cache.go +++ b/weed/mount/meta_cache/meta_cache.go @@ -146,9 +146,9 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full glog.Warningf("unsynchronized dir: %v", dirPath) } - _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool { + _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) (bool, error) { if entry.TtlSec > 0 && entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) { - return true + return true, nil } mc.mapIdFromFilerToLocal(entry) return eachEntryFunc(entry) diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go index 6e18b50e8..ebf0d9191 100644 --- a/weed/mount/weedfs_dir_read.go +++ b/weed/mount/weedfs_dir_read.go @@ -2,13 +2,14 @@ package mount import ( "context" + "math" + "sync" + "github.com/hanwen/go-fuse/v2/fuse" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache" "github.com/seaweedfs/seaweedfs/weed/util" - "math" - "sync" ) type DirectoryHandleId uint64 @@ -153,7 +154,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl } var dirEntry fuse.DirEntry - processEachEntryFn := func(entry *filer.Entry) bool { + processEachEntryFn := func(entry *filer.Entry) (bool, error) { dirEntry.Name = entry.Name() dirEntry.Mode = toSyscallMode(entry.Mode) inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, isPlusMode) @@ -161,13 +162,13 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl if !isPlusMode { if !out.AddDirEntry(dirEntry) { isEarlyTerminated = true - return false + return false, nil } } else { entryOut := out.AddDirLookupEntry(dirEntry) if entryOut == nil { isEarlyTerminated = true - return false + return false, nil } if fh, found := wfs.fhMap.FindFileHandle(inode); found { glog.V(4).Infof("readdir opened file %s", dirPath.Child(dirEntry.Name)) @@ -175,7 +176,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl } wfs.outputFilerEntry(entryOut, inode, entry) } - return true + return true, nil } if input.Offset < directoryStreamBaseOffset { @@ -206,7 +207,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl entryCurrentIndex := input.Offset - dh.entryStreamOffset for uint64(len(dh.entryStream)) > entryCurrentIndex { entry := dh.entryStream[entryCurrentIndex] - if processEachEntryFn(entry) { + if process, _ := processEachEntryFn(entry); process { lastEntryName = entry.Name() entryCurrentIndex++ } else { @@ -221,7 +222,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) return fuse.EIO } - listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool { + listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) (bool, error) { dh.entryStream = append(dh.entryStream, entry) return processEachEntryFn(entry) }) diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 02eceebde..ef5225181 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -56,19 +56,19 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file var listErr error for limit > 0 { var hasEntries bool - lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) bool { + lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) (bool, error) { hasEntries = true if err = stream.Send(&filer_pb.ListEntriesResponse{ Entry: entry.ToProtoEntry(), }); err != nil { - return false + return false, err } limit-- if limit == 0 { - return false + return false, nil } - return true + return true, nil }) if listErr != nil { diff --git a/weed/server/filer_grpc_server_traverse_meta.go b/weed/server/filer_grpc_server_traverse_meta.go index 841e7b88b..9e317e83f 100644 --- a/weed/server/filer_grpc_server_traverse_meta.go +++ b/weed/server/filer_grpc_server_traverse_meta.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -63,13 +64,13 @@ func (fs *FilerServer) iterateDirectory(ctx context.Context, dirPath util.FullPa var listErr error for { var hasEntries bool - lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) bool { + lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) (bool, error) { hasEntries = true if fnErr := fn(entry); fnErr != nil { err = fnErr - return false + return false, err } - return true + return true, nil }) if listErr != nil { return listErr