diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index 5b88025e4..97ba2e767 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -258,9 +258,17 @@ func (store *ElasticStore) listDirectoryEntries( if fileName == startFileName && !inclusive { continue } - if !eachEntryFunc(esEntry.Entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(esEntry.Entry) + if resEachEntryFuncErr != nil { + glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr) + return lastFileName, fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr) + } + + if !resEachEntryFunc { break } + lastFileName = fileName } } diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go index 044dc1342..2283efa6f 100644 --- a/weed/filer/rocksdb/rocksdb_store.go +++ b/weed/filer/rocksdb/rocksdb_store.go @@ -251,6 +251,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir defer ro.Destroy() ro.SetFillCache(false) + var callbackErr error iter := store.db.NewIterator(ro) defer iter.Close() err = enumerate(iter, directoryPrefix, lastFileStart, includeStartFile, limit, startFileName, func(key, value []byte) bool { @@ -269,11 +270,28 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) return false } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr) + callbackErr = resEachEntryFuncErr + return false + } + + if !resEachEntryFunc { return false } + return true }) + + if callbackErr != nil { + return lastFileName, fmt.Errorf( + "failed to process eachEntryFunc for dir %q, entry %q: %w", + dirPath, lastFileName, callbackErr, + ) + } + if err != nil { return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err) } diff --git a/weed/filer/tarantool/tarantool_store.go b/weed/filer/tarantool/tarantool_store.go index 4c9f8a600..1bcd31830 100644 --- a/weed/filer/tarantool/tarantool_store.go +++ b/weed/filer/tarantool/tarantool_store.go @@ -305,7 +305,14 @@ func (store *TarantoolStore) ListDirectoryEntries(ctx context.Context, dirPath w glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go index 3708ddec5..a06213fbf 100644 --- a/weed/filer/tikv/tikv_store.go +++ b/weed/filer/tikv/tikv_store.go @@ -223,6 +223,7 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat if err != nil { return lastFileName, err } + var callbackErr error err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { iter, err := txn.Iter(lastFileStart, nil) if err != nil { @@ -283,12 +284,27 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat // Only increment counter for non-expired entries i++ - if err := iter.Next(); !eachEntryFunc(entry) || err != nil { + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr) + callbackErr = resEachEntryFuncErr + break + } + + if err := iter.Next(); !resEachEntryFunc || err != nil { break } } return err }) + + if callbackErr != nil { + return lastFileName, fmt.Errorf( + "failed to process eachEntryFunc for dir %q, entry %q: %w", + dirPath, lastFileName, callbackErr, + ) + } + if err != nil { return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err) } diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go index 90b13aa04..dca6f0bb3 100644 --- a/weed/filer/ydb/ydb_store.go +++ b/weed/filer/ydb/ydb_store.go @@ -313,7 +313,12 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath return fmt.Errorf("decode entry %s: %w", entry.FullPath, decodeErr) } - if !eachEntryFunc(entry) { + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + return fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr) + } + + if !resEachEntryFunc { return nil }