diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index a83b33341..78c08d617 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -326,7 +326,13 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) } - if !eachEntryFunc(entry) { + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } diff --git a/weed/filer/arangodb/arangodb_store.go b/weed/filer/arangodb/arangodb_store.go index 0a3a06d16..a2ba3c068 100644 --- a/weed/filer/arangodb/arangodb_store.go +++ b/weed/filer/arangodb/arangodb_store.go @@ -335,7 +335,13 @@ sort d.name asc break } - if !eachEntryFunc(entry) { + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go index 0d0c17e1d..fe34ec6db 100644 --- a/weed/filer/cassandra/cassandra_store.go +++ b/weed/filer/cassandra/cassandra_store.go @@ -206,12 +206,20 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath u glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = resEachEntryFuncErr + glog.V(0).Infof("Failed in process eachEntryFnc: %v", err) + break + } + + if !resEachEntryFunc { break } } - if err = iter.Close(); err != nil { - glog.V(0).InfofCtx(ctx, "list iterator close: %v", err) + if errClose := iter.Close(); errClose != nil { + glog.V(0).Infof("list iterator close: %v", errClose) } return lastFileName, err diff --git a/weed/filer/cassandra2/cassandra_store.go b/weed/filer/cassandra2/cassandra_store.go index 8ff7f5874..7dc37121e 100644 --- a/weed/filer/cassandra2/cassandra_store.go +++ b/weed/filer/cassandra2/cassandra_store.go @@ -206,12 +206,21 @@ func (store *Cassandra2Store) ListDirectoryEntries(ctx context.Context, dirPath glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = resEachEntryFuncErr + glog.V(0).Infof("Failed in process eachEntryFnc: %v", err) + break + } + + if !resEachEntryFunc { break } } - if err = iter.Close(); err != nil { - glog.V(0).InfofCtx(ctx, "list iterator close: %v", err) + + if errClose := iter.Close(); errClose != nil { + glog.V(0).Infof("list iterator close: %v", errClose) } return lastFileName, err diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go index d300a7048..841e2476c 100644 --- a/weed/filer/etcd/etcd_store.go +++ b/weed/filer/etcd/etcd_store.go @@ -9,7 +9,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/transport" - "go.etcd.io/etcd/client/v3" + clientv3 "go.etcd.io/etcd/client/v3" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -212,9 +212,17 @@ func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr) break } + + if !resEachEntryFunc { + break + } + lastFileName = fileName } diff --git a/weed/filer/filer.go b/weed/filer/filer.go index d3d2de948..dde7126f9 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -373,10 +373,11 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta var s3ExpiredEntries []*Entry var hasValidEntries bool - lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool { + lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) (bool, error) { select { case <-ctx.Done(): - return false + glog.Errorf("Context is done.") + return false, fmt.Errorf("Context is done. Error : %s", ctx.Err()) default: if entry.TtlSec > 0 { if entry.IsExpireS3Enabled() { @@ -384,13 +385,13 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta // Collect for deletion after iteration completes to avoid DB deadlock s3ExpiredEntries = append(s3ExpiredEntries, entry) expiredCount++ - return true + return true, nil } } else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { // Collect for deletion after iteration completes to avoid DB deadlock expiredEntries = append(expiredEntries, entry) expiredCount++ - return true + return true, nil } } // Track that we found at least one valid (non-expired) entry @@ -500,9 +501,9 @@ func (f *Filer) DeleteEmptyParentDirectories(ctx context.Context, dirPath util.F // IsDirectoryEmpty checks if a directory contains any entries func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bool, error) { isEmpty := true - _, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) bool { + _, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) (bool, error) { isEmpty = false - return false // Stop after first entry + return false, nil // Stop after first entry }) return isEmpty, err } diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go index 6c7ba0747..294fc0e7f 100644 --- a/weed/filer/filer_search.go +++ b/weed/filer/filer_search.go @@ -2,10 +2,11 @@ package filer import ( "context" - "github.com/seaweedfs/seaweedfs/weed/util" "math" "path/filepath" "strings" + + "github.com/seaweedfs/seaweedfs/weed/util" ) func splitPattern(pattern string) (prefix string, restPattern string) { @@ -27,9 +28,9 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start limit = math.MaxInt32 - 1 } - _, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) bool { + _, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) (bool, error) { entries = append(entries, entry) - return true + return true, nil }) hasMore = int64(len(entries)) >= limit+1 @@ -68,24 +69,32 @@ func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath return 0, lastFileName, err } - lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool { + lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) (bool, error) { nameToTest := entry.Name() if len(namePatternExclude) > 0 { if matched, matchErr := filepath.Match(namePatternExclude, nameToTest); matchErr == nil && matched { missedCount++ - return true + return true, nil } } if len(restNamePattern) > 0 { if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && !matched { missedCount++ - return true + return true, nil } } - if !eachEntryFunc(entry) { - return false + + res, resErr := eachEntryFunc(entry) + + if resErr != nil { + return false, resErr + } + + if !res { + return false, nil } - return true + + return true, nil }) if err != nil { return diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index 87e212ea5..968943608 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -3,8 +3,9 @@ package filer import ( "context" "errors" - "github.com/seaweedfs/seaweedfs/weed/util" "io" + + "github.com/seaweedfs/seaweedfs/weed/util" ) const CountEntryChunksForGzip = 50 @@ -16,7 +17,7 @@ var ( ErrKvNotFound = errors.New("kv: not found") ) -type ListEachEntryFunc func(entry *Entry) bool +type ListEachEntryFunc func(entry *Entry) (bool, error) type FilerStore interface { // GetName gets the name to locate the configuration in filer.toml file diff --git a/weed/filer/filerstore_translate_path.go b/weed/filer/filerstore_translate_path.go index 900154fde..97d388466 100644 --- a/weed/filer/filerstore_translate_path.go +++ b/weed/filer/filerstore_translate_path.go @@ -2,9 +2,10 @@ package filer import ( "context" - "github.com/seaweedfs/seaweedfs/weed/util" "math" "strings" + + "github.com/seaweedfs/seaweedfs/weed/util" ) var ( @@ -111,7 +112,7 @@ func (t *FilerStorePathTranslator) ListDirectoryEntries(ctx context.Context, dir newFullPath := t.translatePath(dirPath) - return t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit, func(entry *Entry) bool { + return t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit, func(entry *Entry) (bool, error) { entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath return eachEntryFunc(entry) }) @@ -125,7 +126,7 @@ func (t *FilerStorePathTranslator) ListDirectoryPrefixedEntries(ctx context.Cont limit = math.MaxInt32 - 1 } - return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool { + return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) (bool, error) { entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath return eachEntryFunc(entry) }) diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index ea039d444..a3643928e 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -2,6 +2,7 @@ package filer import ( "context" + "fmt" "io" "math" "strings" @@ -254,7 +255,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath }() // glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit) - return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool { + return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) (bool, error) { fsw.maybeReadHardLink(ctx, entry) filer_pb.AfterEntryDeserialization(entry.GetChunks()) return eachEntryFunc(entry) @@ -273,7 +274,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, limit = math.MaxInt32 - 1 } // glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit) - adjustedEntryFunc := func(entry *Entry) bool { + adjustedEntryFunc := func(entry *Entry) (bool, error) { fsw.maybeReadHardLink(ctx, entry) filer_pb.AfterEntryDeserialization(entry.GetChunks()) return eachEntryFunc(entry) @@ -293,9 +294,9 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u } var notPrefixed []*Entry - lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool { + lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) (bool, error) { notPrefixed = append(notPrefixed, entry) - return true + return true, nil }) if err != nil { return @@ -306,7 +307,14 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u for _, entry := range notPrefixed { if strings.HasPrefix(entry.Name(), prefix) { count++ - if !eachEntryFunc(entry) { + res, resErr := eachEntryFunc(entry) + + if resErr != nil { + err = fmt.Errorf("Failed in process eachEntryFnc: ", resErr) + return + } + + if !res { return } if count >= limit { @@ -316,9 +324,9 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u } if count < limit && lastFileName < prefix { notPrefixed = notPrefixed[:0] - lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool { + lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) (bool, error) { notPrefixed = append(notPrefixed, entry) - return true + return true, nil }) if err != nil { return diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go index 8642146e6..0255241d6 100644 --- a/weed/filer/hbase/hbase_store.go +++ b/weed/filer/hbase/hbase_store.go @@ -4,13 +4,14 @@ import ( "bytes" "context" "fmt" + "io" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/tsuna/gohbase" "github.com/tsuna/gohbase/hrpc" - "io" ) func init() { @@ -206,7 +207,14 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go index ff1465c23..b88db92d4 100644 --- a/weed/filer/leveldb/leveldb_store.go +++ b/weed/filer/leveldb/leveldb_store.go @@ -209,7 +209,14 @@ func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go index 1bd6fe597..1e09f0f6f 100644 --- a/weed/filer/leveldb2/leveldb2_store.go +++ b/weed/filer/leveldb2/leveldb2_store.go @@ -216,7 +216,14 @@ func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, di glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go index eb8b4e578..c9928a3de 100644 --- a/weed/filer/leveldb3/leveldb3_store.go +++ b/weed/filer/leveldb3/leveldb3_store.go @@ -345,7 +345,14 @@ func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, di glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) break } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go index 21463dc32..9464a4425 100644 --- a/weed/filer/mongodb/mongodb_store.go +++ b/weed/filer/mongodb/mongodb_store.go @@ -319,10 +319,15 @@ func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dir break } - if !eachEntryFunc(entry) { + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr) break } + if !resEachEntryFunc { + break + } } if err := cur.Close(ctx); err != nil { diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go index 407491a04..e24f81db1 100644 --- a/weed/filer/redis/universal_redis_store.go +++ b/weed/filer/redis/universal_redis_store.go @@ -191,7 +191,14 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirP continue } } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go index 1fa384f29..39cd8276a 100644 --- a/weed/filer/redis2/universal_redis_store.go +++ b/weed/filer/redis2/universal_redis_store.go @@ -206,7 +206,14 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dir continue } } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } diff --git a/weed/filer/redis3/universal_redis_store.go b/weed/filer/redis3/universal_redis_store.go index 699683d91..8b4a90fe3 100644 --- a/weed/filer/redis3/universal_redis_store.go +++ b/weed/filer/redis3/universal_redis_store.go @@ -164,9 +164,16 @@ func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dir } } counter++ - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr) + } + + if !resEachEntryFunc { return false } + if counter >= limit { return false } diff --git a/weed/filer/redis_lua/universal_redis_store.go b/weed/filer/redis_lua/universal_redis_store.go index 20b83a2a9..a406f3889 100644 --- a/weed/filer/redis_lua/universal_redis_store.go +++ b/weed/filer/redis_lua/universal_redis_store.go @@ -173,7 +173,14 @@ func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, d continue } } - if !eachEntryFunc(entry) { + + resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry) + if resEachEntryFuncErr != nil { + err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr) + break + } + + if !resEachEntryFunc { break } } diff --git a/weed/filer/store_test/test_suite.go b/weed/filer/store_test/test_suite.go index fda694f26..ae334cdd8 100644 --- a/weed/filer/store_test/test_suite.go +++ b/weed/filer/store_test/test_suite.go @@ -3,11 +3,12 @@ package store_test import ( "context" "fmt" + "os" + "testing" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/stretchr/testify/assert" - "os" - "testing" ) func TestFilerStore(t *testing.T, store filer.FilerStore) { @@ -23,16 +24,16 @@ func TestFilerStore(t *testing.T, store filer.FilerStore) { { var counter int - lastFileName, err := store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), "", false, 3, func(entry *filer.Entry) bool { + lastFileName, err := store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), "", false, 3, func(entry *filer.Entry) (bool, error) { counter++ - return true + return true, nil }) assert.Nil(t, err, "list directory") assert.Equal(t, 3, counter, "directory list counter") assert.Equal(t, "f00002", lastFileName, "directory list last file") - lastFileName, err = store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), lastFileName, false, 1024, func(entry *filer.Entry) bool { + lastFileName, err = store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), lastFileName, false, 1024, func(entry *filer.Entry) (bool, error) { counter++ - return true + return true, nil }) assert.Nil(t, err, "list directory") assert.Equal(t, 1027, counter, "directory list counter") diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go index 0f0b1de30..9578aff72 100644 --- a/weed/mount/meta_cache/meta_cache.go +++ b/weed/mount/meta_cache/meta_cache.go @@ -146,9 +146,9 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full glog.Warningf("unsynchronized dir: %v", dirPath) } - _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool { + _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) (bool, error) { if entry.TtlSec > 0 && entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) { - return true + return true, nil } mc.mapIdFromFilerToLocal(entry) return eachEntryFunc(entry) diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go index 6e18b50e8..ebf0d9191 100644 --- a/weed/mount/weedfs_dir_read.go +++ b/weed/mount/weedfs_dir_read.go @@ -2,13 +2,14 @@ package mount import ( "context" + "math" + "sync" + "github.com/hanwen/go-fuse/v2/fuse" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache" "github.com/seaweedfs/seaweedfs/weed/util" - "math" - "sync" ) type DirectoryHandleId uint64 @@ -153,7 +154,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl } var dirEntry fuse.DirEntry - processEachEntryFn := func(entry *filer.Entry) bool { + processEachEntryFn := func(entry *filer.Entry) (bool, error) { dirEntry.Name = entry.Name() dirEntry.Mode = toSyscallMode(entry.Mode) inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, isPlusMode) @@ -161,13 +162,13 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl if !isPlusMode { if !out.AddDirEntry(dirEntry) { isEarlyTerminated = true - return false + return false, nil } } else { entryOut := out.AddDirLookupEntry(dirEntry) if entryOut == nil { isEarlyTerminated = true - return false + return false, nil } if fh, found := wfs.fhMap.FindFileHandle(inode); found { glog.V(4).Infof("readdir opened file %s", dirPath.Child(dirEntry.Name)) @@ -175,7 +176,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl } wfs.outputFilerEntry(entryOut, inode, entry) } - return true + return true, nil } if input.Offset < directoryStreamBaseOffset { @@ -206,7 +207,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl entryCurrentIndex := input.Offset - dh.entryStreamOffset for uint64(len(dh.entryStream)) > entryCurrentIndex { entry := dh.entryStream[entryCurrentIndex] - if processEachEntryFn(entry) { + if process, _ := processEachEntryFn(entry); process { lastEntryName = entry.Name() entryCurrentIndex++ } else { @@ -221,7 +222,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) return fuse.EIO } - listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool { + listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) (bool, error) { dh.entryStream = append(dh.entryStream, entry) return processEachEntryFn(entry) }) diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 02eceebde..ef5225181 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -56,19 +56,19 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file var listErr error for limit > 0 { var hasEntries bool - lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) bool { + lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) (bool, error) { hasEntries = true if err = stream.Send(&filer_pb.ListEntriesResponse{ Entry: entry.ToProtoEntry(), }); err != nil { - return false + return false, err } limit-- if limit == 0 { - return false + return false, nil } - return true + return true, nil }) if listErr != nil { diff --git a/weed/server/filer_grpc_server_traverse_meta.go b/weed/server/filer_grpc_server_traverse_meta.go index 841e7b88b..9e317e83f 100644 --- a/weed/server/filer_grpc_server_traverse_meta.go +++ b/weed/server/filer_grpc_server_traverse_meta.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -63,13 +64,13 @@ func (fs *FilerServer) iterateDirectory(ctx context.Context, dirPath util.FullPa var listErr error for { var hasEntries bool - lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) bool { + lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) (bool, error) { hasEntries = true if fnErr := fn(entry); fnErr != nil { err = fnErr - return false + return false, err } - return true + return true, nil }) if listErr != nil { return listErr