Browse Source

added error return in type ListEachEntryFunc

pull/7485/head
Roman Tamarov 3 weeks ago
parent
commit
f87fc65017
  1. 8
      weed/filer/abstract_sql/abstract_sql_store.go
  2. 8
      weed/filer/arangodb/arangodb_store.go
  3. 14
      weed/filer/cassandra/cassandra_store.go
  4. 15
      weed/filer/cassandra2/cassandra_store.go
  5. 12
      weed/filer/etcd/etcd_store.go
  6. 13
      weed/filer/filer.go
  7. 27
      weed/filer/filer_search.go
  8. 5
      weed/filer/filerstore.go
  9. 7
      weed/filer/filerstore_translate_path.go
  10. 22
      weed/filer/filerstore_wrapper.go
  11. 12
      weed/filer/hbase/hbase_store.go
  12. 9
      weed/filer/leveldb/leveldb_store.go
  13. 9
      weed/filer/leveldb2/leveldb2_store.go
  14. 9
      weed/filer/leveldb3/leveldb3_store.go
  15. 7
      weed/filer/mongodb/mongodb_store.go
  16. 9
      weed/filer/redis/universal_redis_store.go
  17. 9
      weed/filer/redis2/universal_redis_store.go
  18. 9
      weed/filer/redis3/universal_redis_store.go
  19. 9
      weed/filer/redis_lua/universal_redis_store.go
  20. 13
      weed/filer/store_test/test_suite.go
  21. 4
      weed/mount/meta_cache/meta_cache.go
  22. 17
      weed/mount/weedfs_dir_read.go
  23. 8
      weed/server/filer_grpc_server.go
  24. 7
      weed/server/filer_grpc_server_traverse_meta.go

8
weed/filer/abstract_sql/abstract_sql_store.go

@ -326,7 +326,13 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
} }
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break break
} }

8
weed/filer/arangodb/arangodb_store.go

@ -335,7 +335,13 @@ sort d.name asc
break break
} }
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break break
} }

14
weed/filer/cassandra/cassandra_store.go

@ -206,12 +206,20 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath u
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break break
} }
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = resEachEntryFuncErr
glog.V(0).Infof("Failed in process eachEntryFnc: %v", err)
break
}
if !resEachEntryFunc {
break break
} }
} }
if err = iter.Close(); err != nil {
glog.V(0).InfofCtx(ctx, "list iterator close: %v", err)
if errClose := iter.Close(); errClose != nil {
glog.V(0).Infof("list iterator close: %v", errClose)
} }
return lastFileName, err return lastFileName, err

15
weed/filer/cassandra2/cassandra_store.go

@ -206,12 +206,21 @@ func (store *Cassandra2Store) ListDirectoryEntries(ctx context.Context, dirPath
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break break
} }
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = resEachEntryFuncErr
glog.V(0).Infof("Failed in process eachEntryFnc: %v", err)
break break
} }
if !resEachEntryFunc {
break
} }
if err = iter.Close(); err != nil {
glog.V(0).InfofCtx(ctx, "list iterator close: %v", err)
}
if errClose := iter.Close(); errClose != nil {
glog.V(0).Infof("list iterator close: %v", errClose)
} }
return lastFileName, err return lastFileName, err

12
weed/filer/etcd/etcd_store.go

@ -9,7 +9,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/transport" "go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
@ -212,9 +212,17 @@ func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break break
} }
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr)
break break
} }
if !resEachEntryFunc {
break
}
lastFileName = fileName lastFileName = fileName
} }

13
weed/filer/filer.go

@ -373,10 +373,11 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
var s3ExpiredEntries []*Entry var s3ExpiredEntries []*Entry
var hasValidEntries bool var hasValidEntries bool
lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) (bool, error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return false
glog.Errorf("Context is done.")
return false, fmt.Errorf("Context is done. Error : %s", ctx.Err())
default: default:
if entry.TtlSec > 0 { if entry.TtlSec > 0 {
if entry.IsExpireS3Enabled() { if entry.IsExpireS3Enabled() {
@ -384,13 +385,13 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
// Collect for deletion after iteration completes to avoid DB deadlock // Collect for deletion after iteration completes to avoid DB deadlock
s3ExpiredEntries = append(s3ExpiredEntries, entry) s3ExpiredEntries = append(s3ExpiredEntries, entry)
expiredCount++ expiredCount++
return true
return true, nil
} }
} else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { } else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
// Collect for deletion after iteration completes to avoid DB deadlock // Collect for deletion after iteration completes to avoid DB deadlock
expiredEntries = append(expiredEntries, entry) expiredEntries = append(expiredEntries, entry)
expiredCount++ expiredCount++
return true
return true, nil
} }
} }
// Track that we found at least one valid (non-expired) entry // Track that we found at least one valid (non-expired) entry
@ -500,9 +501,9 @@ func (f *Filer) DeleteEmptyParentDirectories(ctx context.Context, dirPath util.F
// IsDirectoryEmpty checks if a directory contains any entries // IsDirectoryEmpty checks if a directory contains any entries
func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bool, error) { func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bool, error) {
isEmpty := true isEmpty := true
_, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) bool {
_, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) (bool, error) {
isEmpty = false isEmpty = false
return false // Stop after first entry
return false, nil // Stop after first entry
}) })
return isEmpty, err return isEmpty, err
} }

27
weed/filer/filer_search.go

@ -2,10 +2,11 @@ package filer
import ( import (
"context" "context"
"github.com/seaweedfs/seaweedfs/weed/util"
"math" "math"
"path/filepath" "path/filepath"
"strings" "strings"
"github.com/seaweedfs/seaweedfs/weed/util"
) )
func splitPattern(pattern string) (prefix string, restPattern string) { func splitPattern(pattern string) (prefix string, restPattern string) {
@ -27,9 +28,9 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start
limit = math.MaxInt32 - 1 limit = math.MaxInt32 - 1
} }
_, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) bool {
_, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) (bool, error) {
entries = append(entries, entry) entries = append(entries, entry)
return true
return true, nil
}) })
hasMore = int64(len(entries)) >= limit+1 hasMore = int64(len(entries)) >= limit+1
@ -68,24 +69,32 @@ func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath
return 0, lastFileName, err return 0, lastFileName, err
} }
lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) (bool, error) {
nameToTest := entry.Name() nameToTest := entry.Name()
if len(namePatternExclude) > 0 { if len(namePatternExclude) > 0 {
if matched, matchErr := filepath.Match(namePatternExclude, nameToTest); matchErr == nil && matched { if matched, matchErr := filepath.Match(namePatternExclude, nameToTest); matchErr == nil && matched {
missedCount++ missedCount++
return true
return true, nil
} }
} }
if len(restNamePattern) > 0 { if len(restNamePattern) > 0 {
if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && !matched { if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && !matched {
missedCount++ missedCount++
return true
return true, nil
}
} }
res, resErr := eachEntryFunc(entry)
if resErr != nil {
return false, resErr
} }
if !eachEntryFunc(entry) {
return false
if !res {
return false, nil
} }
return true
return true, nil
}) })
if err != nil { if err != nil {
return return

5
weed/filer/filerstore.go

@ -3,8 +3,9 @@ package filer
import ( import (
"context" "context"
"errors" "errors"
"github.com/seaweedfs/seaweedfs/weed/util"
"io" "io"
"github.com/seaweedfs/seaweedfs/weed/util"
) )
const CountEntryChunksForGzip = 50 const CountEntryChunksForGzip = 50
@ -16,7 +17,7 @@ var (
ErrKvNotFound = errors.New("kv: not found") ErrKvNotFound = errors.New("kv: not found")
) )
type ListEachEntryFunc func(entry *Entry) bool
type ListEachEntryFunc func(entry *Entry) (bool, error)
type FilerStore interface { type FilerStore interface {
// GetName gets the name to locate the configuration in filer.toml file // GetName gets the name to locate the configuration in filer.toml file

7
weed/filer/filerstore_translate_path.go

@ -2,9 +2,10 @@ package filer
import ( import (
"context" "context"
"github.com/seaweedfs/seaweedfs/weed/util"
"math" "math"
"strings" "strings"
"github.com/seaweedfs/seaweedfs/weed/util"
) )
var ( var (
@ -111,7 +112,7 @@ func (t *FilerStorePathTranslator) ListDirectoryEntries(ctx context.Context, dir
newFullPath := t.translatePath(dirPath) newFullPath := t.translatePath(dirPath)
return t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
return t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit, func(entry *Entry) (bool, error) {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
return eachEntryFunc(entry) return eachEntryFunc(entry)
}) })
@ -125,7 +126,7 @@ func (t *FilerStorePathTranslator) ListDirectoryPrefixedEntries(ctx context.Cont
limit = math.MaxInt32 - 1 limit = math.MaxInt32 - 1
} }
return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool {
return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) (bool, error) {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
return eachEntryFunc(entry) return eachEntryFunc(entry)
}) })

22
weed/filer/filerstore_wrapper.go

@ -2,6 +2,7 @@ package filer
import ( import (
"context" "context"
"fmt"
"io" "io"
"math" "math"
"strings" "strings"
@ -254,7 +255,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath
}() }()
// glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit) // glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) (bool, error) {
fsw.maybeReadHardLink(ctx, entry) fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.GetChunks()) filer_pb.AfterEntryDeserialization(entry.GetChunks())
return eachEntryFunc(entry) return eachEntryFunc(entry)
@ -273,7 +274,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
limit = math.MaxInt32 - 1 limit = math.MaxInt32 - 1
} }
// glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit) // glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
adjustedEntryFunc := func(entry *Entry) bool {
adjustedEntryFunc := func(entry *Entry) (bool, error) {
fsw.maybeReadHardLink(ctx, entry) fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.GetChunks()) filer_pb.AfterEntryDeserialization(entry.GetChunks())
return eachEntryFunc(entry) return eachEntryFunc(entry)
@ -293,9 +294,9 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
} }
var notPrefixed []*Entry var notPrefixed []*Entry
lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) (bool, error) {
notPrefixed = append(notPrefixed, entry) notPrefixed = append(notPrefixed, entry)
return true
return true, nil
}) })
if err != nil { if err != nil {
return return
@ -306,7 +307,14 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
for _, entry := range notPrefixed { for _, entry := range notPrefixed {
if strings.HasPrefix(entry.Name(), prefix) { if strings.HasPrefix(entry.Name(), prefix) {
count++ count++
if !eachEntryFunc(entry) {
res, resErr := eachEntryFunc(entry)
if resErr != nil {
err = fmt.Errorf("Failed in process eachEntryFnc: ", resErr)
return
}
if !res {
return return
} }
if count >= limit { if count >= limit {
@ -316,9 +324,9 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
} }
if count < limit && lastFileName < prefix { if count < limit && lastFileName < prefix {
notPrefixed = notPrefixed[:0] notPrefixed = notPrefixed[:0]
lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool {
lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) (bool, error) {
notPrefixed = append(notPrefixed, entry) notPrefixed = append(notPrefixed, entry)
return true
return true, nil
}) })
if err != nil { if err != nil {
return return

12
weed/filer/hbase/hbase_store.go

@ -4,13 +4,14 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"io"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/tsuna/gohbase" "github.com/tsuna/gohbase"
"github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/hrpc"
"io"
) )
func init() { func init() {
@ -206,7 +207,14 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break break
} }
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break break
} }
} }

9
weed/filer/leveldb/leveldb_store.go

@ -209,7 +209,14 @@ func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break break
} }
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break break
} }
} }

9
weed/filer/leveldb2/leveldb2_store.go

@ -216,7 +216,14 @@ func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, di
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break break
} }
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break break
} }
} }

9
weed/filer/leveldb3/leveldb3_store.go

@ -345,7 +345,14 @@ func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, di
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err) glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break break
} }
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break break
} }
} }

7
weed/filer/mongodb/mongodb_store.go

@ -319,10 +319,15 @@ func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
break break
} }
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr)
break break
} }
if !resEachEntryFunc {
break
}
} }
if err := cur.Close(ctx); err != nil { if err := cur.Close(ctx); err != nil {

9
weed/filer/redis/universal_redis_store.go

@ -191,7 +191,14 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirP
continue continue
} }
} }
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break break
} }
} }

9
weed/filer/redis2/universal_redis_store.go

@ -206,7 +206,14 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dir
continue continue
} }
} }
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break break
} }
} }

9
weed/filer/redis3/universal_redis_store.go

@ -164,9 +164,16 @@ func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dir
} }
} }
counter++ counter++
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr)
}
if !resEachEntryFunc {
return false return false
} }
if counter >= limit { if counter >= limit {
return false return false
} }

9
weed/filer/redis_lua/universal_redis_store.go

@ -173,7 +173,14 @@ func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, d
continue continue
} }
} }
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("Failed in process eachEntryFnc: ", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break break
} }
} }

13
weed/filer/store_test/test_suite.go

@ -3,11 +3,12 @@ package store_test
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"testing"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"os"
"testing"
) )
func TestFilerStore(t *testing.T, store filer.FilerStore) { func TestFilerStore(t *testing.T, store filer.FilerStore) {
@ -23,16 +24,16 @@ func TestFilerStore(t *testing.T, store filer.FilerStore) {
{ {
var counter int var counter int
lastFileName, err := store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), "", false, 3, func(entry *filer.Entry) bool {
lastFileName, err := store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), "", false, 3, func(entry *filer.Entry) (bool, error) {
counter++ counter++
return true
return true, nil
}) })
assert.Nil(t, err, "list directory") assert.Nil(t, err, "list directory")
assert.Equal(t, 3, counter, "directory list counter") assert.Equal(t, 3, counter, "directory list counter")
assert.Equal(t, "f00002", lastFileName, "directory list last file") assert.Equal(t, "f00002", lastFileName, "directory list last file")
lastFileName, err = store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), lastFileName, false, 1024, func(entry *filer.Entry) bool {
lastFileName, err = store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), lastFileName, false, 1024, func(entry *filer.Entry) (bool, error) {
counter++ counter++
return true
return true, nil
}) })
assert.Nil(t, err, "list directory") assert.Nil(t, err, "list directory")
assert.Equal(t, 1027, counter, "directory list counter") assert.Equal(t, 1027, counter, "directory list counter")

4
weed/mount/meta_cache/meta_cache.go

@ -146,9 +146,9 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full
glog.Warningf("unsynchronized dir: %v", dirPath) glog.Warningf("unsynchronized dir: %v", dirPath)
} }
_, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {
_, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) (bool, error) {
if entry.TtlSec > 0 && entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) { if entry.TtlSec > 0 && entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) {
return true
return true, nil
} }
mc.mapIdFromFilerToLocal(entry) mc.mapIdFromFilerToLocal(entry)
return eachEntryFunc(entry) return eachEntryFunc(entry)

17
weed/mount/weedfs_dir_read.go

@ -2,13 +2,14 @@ package mount
import ( import (
"context" "context"
"math"
"sync"
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache" "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"math"
"sync"
) )
type DirectoryHandleId uint64 type DirectoryHandleId uint64
@ -153,7 +154,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
} }
var dirEntry fuse.DirEntry var dirEntry fuse.DirEntry
processEachEntryFn := func(entry *filer.Entry) bool {
processEachEntryFn := func(entry *filer.Entry) (bool, error) {
dirEntry.Name = entry.Name() dirEntry.Name = entry.Name()
dirEntry.Mode = toSyscallMode(entry.Mode) dirEntry.Mode = toSyscallMode(entry.Mode)
inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, isPlusMode) inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, isPlusMode)
@ -161,13 +162,13 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
if !isPlusMode { if !isPlusMode {
if !out.AddDirEntry(dirEntry) { if !out.AddDirEntry(dirEntry) {
isEarlyTerminated = true isEarlyTerminated = true
return false
return false, nil
} }
} else { } else {
entryOut := out.AddDirLookupEntry(dirEntry) entryOut := out.AddDirLookupEntry(dirEntry)
if entryOut == nil { if entryOut == nil {
isEarlyTerminated = true isEarlyTerminated = true
return false
return false, nil
} }
if fh, found := wfs.fhMap.FindFileHandle(inode); found { if fh, found := wfs.fhMap.FindFileHandle(inode); found {
glog.V(4).Infof("readdir opened file %s", dirPath.Child(dirEntry.Name)) glog.V(4).Infof("readdir opened file %s", dirPath.Child(dirEntry.Name))
@ -175,7 +176,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
} }
wfs.outputFilerEntry(entryOut, inode, entry) wfs.outputFilerEntry(entryOut, inode, entry)
} }
return true
return true, nil
} }
if input.Offset < directoryStreamBaseOffset { if input.Offset < directoryStreamBaseOffset {
@ -206,7 +207,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
entryCurrentIndex := input.Offset - dh.entryStreamOffset entryCurrentIndex := input.Offset - dh.entryStreamOffset
for uint64(len(dh.entryStream)) > entryCurrentIndex { for uint64(len(dh.entryStream)) > entryCurrentIndex {
entry := dh.entryStream[entryCurrentIndex] entry := dh.entryStream[entryCurrentIndex]
if processEachEntryFn(entry) {
if process, _ := processEachEntryFn(entry); process {
lastEntryName = entry.Name() lastEntryName = entry.Name()
entryCurrentIndex++ entryCurrentIndex++
} else { } else {
@ -221,7 +222,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return fuse.EIO return fuse.EIO
} }
listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) (bool, error) {
dh.entryStream = append(dh.entryStream, entry) dh.entryStream = append(dh.entryStream, entry)
return processEachEntryFn(entry) return processEachEntryFn(entry)
}) })

8
weed/server/filer_grpc_server.go

@ -56,19 +56,19 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
var listErr error var listErr error
for limit > 0 { for limit > 0 {
var hasEntries bool var hasEntries bool
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) bool {
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) (bool, error) {
hasEntries = true hasEntries = true
if err = stream.Send(&filer_pb.ListEntriesResponse{ if err = stream.Send(&filer_pb.ListEntriesResponse{
Entry: entry.ToProtoEntry(), Entry: entry.ToProtoEntry(),
}); err != nil { }); err != nil {
return false
return false, err
} }
limit-- limit--
if limit == 0 { if limit == 0 {
return false
return false, nil
} }
return true
return true, nil
}) })
if listErr != nil { if listErr != nil {

7
weed/server/filer_grpc_server_traverse_meta.go

@ -3,6 +3,7 @@ package weed_server
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -63,13 +64,13 @@ func (fs *FilerServer) iterateDirectory(ctx context.Context, dirPath util.FullPa
var listErr error var listErr error
for { for {
var hasEntries bool var hasEntries bool
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) bool {
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) (bool, error) {
hasEntries = true hasEntries = true
if fnErr := fn(entry); fnErr != nil { if fnErr := fn(entry); fnErr != nil {
err = fnErr err = fnErr
return false
return false, err
} }
return true
return true, nil
}) })
if listErr != nil { if listErr != nil {
return listErr return listErr

Loading…
Cancel
Save