Browse Source

add ListRecursive

pull/5759/head
Konstantin Lebedev 6 months ago
parent
commit
762989aad3
  1. 80
      .github/workflows/s3tests.yml
  2. 5
      Makefile
  3. 2
      other/java/client/src/main/proto/filer.proto
  4. 1
      weed/command/filer.go
  5. 2
      weed/command/s3.go
  6. 1
      weed/command/server.go
  7. 71
      weed/filer/abstract_sql/abstract_sql_store.go
  8. 4
      weed/filer/arangodb/arangodb_store.go
  9. 4
      weed/filer/cassandra/cassandra_store.go
  10. 4
      weed/filer/elastic/v7/elastic_store.go
  11. 4
      weed/filer/etcd/etcd_store.go
  12. 11
      weed/filer/filer.go
  13. 20
      weed/filer/filer_search.go
  14. 2
      weed/filer/filerstore.go
  15. 9
      weed/filer/filerstore_translate_path.go
  16. 22
      weed/filer/filerstore_wrapper.go
  17. 4
      weed/filer/hbase/hbase_store.go
  18. 4
      weed/filer/leveldb/leveldb_store.go
  19. 4
      weed/filer/leveldb2/leveldb2_store.go
  20. 4
      weed/filer/leveldb3/leveldb3_store.go
  21. 4
      weed/filer/mongodb/mongodb_store.go
  22. 4
      weed/filer/mysql/mysql_sql_gen.go
  23. 4
      weed/filer/postgres/postgres_sql_gen.go
  24. 4
      weed/filer/redis/universal_redis_store.go
  25. 4
      weed/filer/redis2/universal_redis_store.go
  26. 4
      weed/filer/redis3/universal_redis_store.go
  27. 4
      weed/filer/redis_lua/universal_redis_store.go
  28. 4
      weed/filer/rocksdb/rocksdb_store.go
  29. 12
      weed/filer/sqlite/sqlite_store.go
  30. 21
      weed/filer/sqlite/sqlite_store_gen.go
  31. 4
      weed/filer/tikv/tikv_store.go
  32. 4
      weed/filer/ydb/ydb_store.go
  33. 3
      weed/pb/filer.proto
  34. 28
      weed/pb/filer_pb/filer.pb.go
  35. 2
      weed/pb/filer_pb/filer_grpc.pb.go
  36. 115
      weed/s3api/s3api_object_handlers_list.go
  37. 1
      weed/s3api/s3api_server.go
  38. 9
      weed/server/filer_grpc_server.go
  39. 2
      weed/server/filer_grpc_server_traverse_meta.go

80
.github/workflows/s3tests.yml

@ -284,3 +284,83 @@ jobs:
s3tests_boto3/functional/test_s3.py::test_bucket_list_long_name \ s3tests_boto3/functional/test_s3.py::test_bucket_list_long_name \
s3tests_boto3/functional/test_s3.py::test_bucket_list_special_prefix s3tests_boto3/functional/test_s3.py::test_bucket_list_special_prefix
kill -9 $pid || true kill -9 $pid || true
- name: Run Ceph S3 tests list recursive with SQL store
timeout-minutes: 15
env:
S3TEST_CONF: /__w/seaweedfs/seaweedfs/docker/compose/s3tests.conf
shell: bash
run: |
cd /__w/seaweedfs/seaweedfs/weed
go install -tags "sqlite" -buildvcs=false
export WEED_LEVELDB2_ENABLED="false" WEED_SQLITE_ENABLED="true" WEED_SQLITE_DBFILE="./filer.db"
set -x
weed -v 0 server -s3.allowListRecursive=true -filer -filer.maxMB=64 -s3 -ip.bind 0.0.0.0 \
-master.raftHashicorp -master.electionTimeout 1s -master.volumeSizeLimitMB=1024 \
-volume.max=100 -volume.preStopSeconds=1 -s3.port=8000 -metricsPort=9324 \
-s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=true -s3.config=../docker/compose/s3.json &
pid=$!
sleep 10
cd /s3-tests
sed -i "s/assert prefixes == \['foo%2B1\/', 'foo\/', 'quux%20ab\/'\]/assert prefixes == \['foo\/', 'foo%2B1\/', 'quux%20ab\/'\]/" s3tests_boto3/functional/test_s3.py
tox -- \
s3tests_boto3/functional/test_s3.py::test_bucket_list_empty \
s3tests_boto3/functional/test_s3.py::test_bucket_list_distinct \
s3tests_boto3/functional/test_s3.py::test_bucket_list_many \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_many \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_delimiter_basic \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_encoding_basic \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_delimiter_prefix \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_delimiter_prefix_ends_with_delimiter \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_delimiter_alt \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_delimiter_prefix_underscore \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_delimiter_percentage \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_delimiter_whitespace \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_delimiter_dot \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_delimiter_unreadable \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_delimiter_empty \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_delimiter_none \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_delimiter_not_exist \
s3tests_boto3/functional/test_s3.py::test_bucket_list_prefix_delimiter_basic \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_prefix_delimiter_basic \
s3tests_boto3/functional/test_s3.py::test_bucket_list_prefix_delimiter_alt \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_prefix_delimiter_alt \
s3tests_boto3/functional/test_s3.py::test_bucket_list_prefix_delimiter_prefix_not_exist \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_prefix_delimiter_prefix_not_exist \
s3tests_boto3/functional/test_s3.py::test_bucket_list_prefix_delimiter_delimiter_not_exist \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_prefix_delimiter_delimiter_not_exist \
s3tests_boto3/functional/test_s3.py::test_bucket_list_prefix_delimiter_prefix_delimiter_not_exist \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_prefix_delimiter_prefix_delimiter_not_exist \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_fetchowner_notempty \
s3tests_boto3/functional/test_s3.py::test_bucket_list_prefix_basic \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_prefix_basic \
s3tests_boto3/functional/test_s3.py::test_bucket_list_prefix_alt \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_prefix_alt \
s3tests_boto3/functional/test_s3.py::test_bucket_list_prefix_empty \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_prefix_empty \
s3tests_boto3/functional/test_s3.py::test_bucket_list_prefix_none \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_prefix_none \
s3tests_boto3/functional/test_s3.py::test_bucket_list_prefix_not_exist \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_prefix_not_exist \
s3tests_boto3/functional/test_s3.py::test_bucket_list_prefix_unreadable \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_prefix_unreadable \
s3tests_boto3/functional/test_s3.py::test_bucket_list_maxkeys_one \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_maxkeys_one \
s3tests_boto3/functional/test_s3.py::test_bucket_list_maxkeys_zero \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_maxkeys_zero \
s3tests_boto3/functional/test_s3.py::test_bucket_list_marker_none \
s3tests_boto3/functional/test_s3.py::test_bucket_list_marker_empty \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_continuationtoken_empty \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_continuationtoken \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_both_continuationtoken_startafter \
s3tests_boto3/functional/test_s3.py::test_bucket_list_marker_unreadable \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_startafter_unreadable \
s3tests_boto3/functional/test_s3.py::test_bucket_list_marker_not_in_list \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_startafter_not_in_list \
s3tests_boto3/functional/test_s3.py::test_bucket_list_marker_after_list \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_startafter_after_list \
s3tests_boto3/functional/test_s3.py::test_bucket_list_objects_anonymous_fail \
s3tests_boto3/functional/test_s3.py::test_bucket_listv2_objects_anonymous_fail \
s3tests_boto3/functional/test_s3.py::test_bucket_list_long_name \
s3tests_boto3/functional/test_s3.py::test_bucket_list_special_prefix
kill -9 $pid || true

5
Makefile

@ -6,7 +6,7 @@ debug ?= 0
all: install all: install
install: install:
cd weed; go install
cd weed; go install -tags "sqlite"
warp_install: warp_install:
go install github.com/minio/warp@v0.7.6 go install github.com/minio/warp@v0.7.6
@ -15,7 +15,8 @@ full_install:
cd weed; go install -tags "elastic gocdk sqlite ydb tikv rclone" cd weed; go install -tags "elastic gocdk sqlite ydb tikv rclone"
server: install server: install
weed -v 0 server -s3 -filer -filer.maxMB=64 -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1 -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=true -s3.config=./docker/compose/s3.json -metricsPort=9324
export WEED_LEVELDB2_ENABLED="false";export WEED_SQLITE_ENABLED="true"; export WEED_SQLITE_DBFILE="/tmp/filer.db"; \
weed -v 0 server -s3.allowListRecursive=true -dir /tmp -master.volumeSizeLimitMB=1024 -s3 -filer -filer.maxMB=64 -filer.port.public=7777 -volume.max=100 -volume.preStopSeconds=1 -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=true -s3.config=./docker/compose/s3.json -metricsPort=9324
benchmark: install warp_install benchmark: install warp_install
pkill weed || true pkill weed || true

2
other/java/client/src/main/proto/filer.proto

@ -100,10 +100,12 @@ message ListEntriesRequest {
string startFromFileName = 3; string startFromFileName = 3;
bool inclusiveStartFrom = 4; bool inclusiveStartFrom = 4;
uint32 limit = 5; uint32 limit = 5;
bool recursive = 6;
} }
message ListEntriesResponse { message ListEntriesResponse {
Entry entry = 1; Entry entry = 1;
string dir = 2;
} }
message RemoteEntry { message RemoteEntry {

1
weed/command/filer.go

@ -110,6 +110,7 @@ func init() {
filerS3Options.auditLogConfig = cmdFiler.Flag.String("s3.auditLogConfig", "", "path to the audit log config file") filerS3Options.auditLogConfig = cmdFiler.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")
filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders") filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders")
filerS3Options.allowDeleteBucketNotEmpty = cmdFiler.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") filerS3Options.allowDeleteBucketNotEmpty = cmdFiler.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
filerS3Options.allowListRecursive = cmdFiler.Flag.Bool("s3.allowListRecursive", false, "allows recursive listing of directories by prefix on the side of the filer store with SQL")
filerS3Options.localSocket = cmdFiler.Flag.String("s3.localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock") filerS3Options.localSocket = cmdFiler.Flag.String("s3.localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock")
// start webdav on filer // start webdav on filer

2
weed/command/s3.go

@ -51,6 +51,7 @@ type S3Options struct {
metricsHttpPort *int metricsHttpPort *int
allowEmptyFolder *bool allowEmptyFolder *bool
allowDeleteBucketNotEmpty *bool allowDeleteBucketNotEmpty *bool
allowListRecursive *bool
auditLogConfig *string auditLogConfig *string
localFilerSocket *string localFilerSocket *string
dataCenter *string dataCenter *string
@ -77,6 +78,7 @@ func init() {
s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", true, "allow empty folders") s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", true, "allow empty folders")
s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
s3StandaloneOptions.allowListRecursive = cmdS3.Flag.Bool("allowListRecursive", false, "allows recursive listing of directories by prefix on the side of the filer store with SQL")
s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path") s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path")
s3StandaloneOptions.localSocket = cmdS3.Flag.String("localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock") s3StandaloneOptions.localSocket = cmdS3.Flag.String("localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock")
} }

1
weed/command/server.go

@ -153,6 +153,7 @@ func init() {
s3Options.auditLogConfig = cmdServer.Flag.String("s3.auditLogConfig", "", "path to the audit log config file") s3Options.auditLogConfig = cmdServer.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")
s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders") s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders")
s3Options.allowDeleteBucketNotEmpty = cmdServer.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") s3Options.allowDeleteBucketNotEmpty = cmdServer.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
s3Options.allowListRecursive = cmdServer.Flag.Bool("s3.allowListRecursive", false, "allows recursive listing of directories by prefix on the side of the filer store with SQL")
s3Options.localSocket = cmdServer.Flag.String("s3.localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock") s3Options.localSocket = cmdServer.Flag.String("s3.localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock")
iamOptions.port = cmdServer.Flag.Int("iam.port", 8111, "iam server http listen port") iamOptions.port = cmdServer.Flag.Int("iam.port", 8111, "iam server http listen port")

71
weed/filer/abstract_sql/abstract_sql_store.go

@ -21,6 +21,7 @@ type SqlGenerator interface {
GetSqlDeleteFolderChildren(tableName string) string GetSqlDeleteFolderChildren(tableName string) string
GetSqlListExclusive(tableName string) string GetSqlListExclusive(tableName string) string
GetSqlListInclusive(tableName string) string GetSqlListInclusive(tableName string) string
GetSqlListRecursive(tableName string) string
GetSqlCreateTable(tableName string) string GetSqlCreateTable(tableName string) string
GetSqlDropTable(tableName string) string GetSqlDropTable(tableName string) string
} }
@ -334,6 +335,76 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
return lastFileName, nil return lastFileName, nil
} }
func (store *AbstractSqlStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
if err != nil {
return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
}
bucketDir := ""
if bucket != DEFAULT_TABLE {
bucketDir = fmt.Sprintf("/buckets/%s", bucket)
}
shortDir := string(shortPath)
namePrefix := prefix + "%"
var dirPrefix string
isPrefixEndsWithDelimiter := false
if delimiter {
if prefix == "" && len(startFileName) == 0 {
dirPrefix = shortDir
limit += 1
isPrefixEndsWithDelimiter = true
}
} else {
if strings.HasSuffix(shortDir, "/") {
dirPrefix = fmt.Sprintf("%s%s%%", shortDir, prefix)
} else {
dirPrefix = fmt.Sprintf("%s/%s%%", shortDir, prefix)
}
}
rows, err := db.QueryContext(ctx, store.GetSqlListRecursive(bucket), startFileName, util.HashStringToLong(shortDir), namePrefix, dirPrefix, limit+1)
if err != nil {
glog.Errorf("list %s : %v", dirPath, err)
return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
defer rows.Close()
for rows.Next() {
var dir, name, fileName string
var data []byte
if err = rows.Scan(&dir, &name, &data); err != nil {
glog.V(0).Infof("scan %s : %v", dirPath, err)
return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
}
if strings.HasSuffix(dir, "/") {
fileName = dir + name
} else {
fileName = fmt.Sprintf("%s/%s", dir, name)
}
lastFileName = fmt.Sprintf("%s%s", dir, name)
entry := &filer.Entry{
FullPath: util.NewFullPath(bucketDir, fileName),
}
if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
glog.Errorf("scan decode %s : %v", entry.FullPath, err)
return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
}
isDirectory := entry.IsDirectory() && entry.Attr.Mime == "" && entry.Attr.FileSize == 0
if !delimiter && isDirectory {
continue
}
glog.V(0).Infof("ListRecursivePrefixedEntries bucket %s, shortDir: %s, bucketDir: %s, lastFileName %s, fileName %s", bucket, shortDir, bucketDir, lastFileName, fileName)
if isPrefixEndsWithDelimiter && shortDir == lastFileName && isDirectory {
continue
}
if !eachEntryFunc(entry) {
break
}
}
return lastFileName, nil
}
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil) return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
} }

4
weed/filer/arangodb/arangodb_store.go

@ -291,6 +291,10 @@ func (store *ArangodbStore) ListDirectoryEntries(ctx context.Context, dirPath ut
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
} }
func (store *ArangodbStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *ArangodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *ArangodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
targetCollection, err := store.extractBucketCollection(ctx, dirPath+"/") targetCollection, err := store.extractBucketCollection(ctx, dirPath+"/")
if err != nil { if err != nil {

4
weed/filer/cassandra/cassandra_store.go

@ -183,6 +183,10 @@ func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, d
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
} }
func (store *CassandraStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
if _, ok := store.isSuperLargeDirectory(string(dirPath)); ok { if _, ok := store.isSuperLargeDirectory(string(dirPath)); ok {

4
weed/filer/elastic/v7/elastic_store.go

@ -103,6 +103,10 @@ func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
} }
func (store *ElasticStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
index := getIndex(entry.FullPath, false) index := getIndex(entry.FullPath, false)
dir, _ := entry.FullPath.DirAndName() dir, _ := entry.FullPath.DirAndName()

4
weed/filer/etcd/etcd_store.go

@ -177,6 +177,10 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_
return nil return nil
} }
func (store *EtcdStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix) directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
lastFileStart := directoryPrefix lastFileStart := directoryPrefix

11
weed/filer/filer.go

@ -342,8 +342,8 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
} }
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, recursive bool, delimiter bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
listFn := func(entry *Entry) bool {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return false return false
@ -357,7 +357,12 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
} }
return eachEntryFunc(entry) return eachEntryFunc(entry)
} }
})
}
if recursive {
lastFileName, err = f.Store.ListRecursivePrefixedEntries(ctx, p, startFileName, inclusive, delimiter, limit, prefix, listFn)
} else {
lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, listFn)
}
if err != nil { if err != nil {
return expiredCount, lastFileName, err return expiredCount, lastFileName, err
} }

20
weed/filer/filer_search.go

@ -27,7 +27,7 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start
limit = math.MaxInt32 - 1 limit = math.MaxInt32 - 1
} }
_, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) bool {
_, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, false, false, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) bool {
entries = append(entries, entry) entries = append(entries, entry)
return true return true
}) })
@ -41,7 +41,7 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start
} }
// For now, prefix and namePattern are mutually exclusive // For now, prefix and namePattern are mutually exclusive
func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, recursive bool, delimiter bool, limit int64, prefix string, namePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 { if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1] p = p[0 : len(p)-1]
} }
@ -52,23 +52,23 @@ func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath,
} }
var missedCount int64 var missedCount int64
missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit, prefix, restNamePattern, namePatternExclude, eachEntryFunc)
missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, recursive, delimiter, limit, prefix, restNamePattern, namePatternExclude, eachEntryFunc)
for missedCount > 0 && err == nil { for missedCount > 0 && err == nil {
missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount, prefix, restNamePattern, namePatternExclude, eachEntryFunc)
missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, recursive, delimiter, missedCount, prefix, restNamePattern, namePatternExclude, eachEntryFunc)
} }
return return
} }
func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix, restNamePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (missedCount int64, lastFileName string, err error) {
func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, recursive bool, delimiter bool, limit int64, prefix, restNamePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (missedCount int64, lastFileName string, err error) {
if len(restNamePattern) == 0 && len(namePatternExclude) == 0 { if len(restNamePattern) == 0 && len(namePatternExclude) == 0 {
lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, eachEntryFunc)
lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, recursive, delimiter, limit, prefix, eachEntryFunc)
return 0, lastFileName, err return 0, lastFileName, err
} }
lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, recursive, delimiter, limit, prefix, func(entry *Entry) bool {
nameToTest := entry.Name() nameToTest := entry.Name()
if len(namePatternExclude) > 0 { if len(namePatternExclude) > 0 {
if matched, matchErr := filepath.Match(namePatternExclude, nameToTest); matchErr == nil && matched { if matched, matchErr := filepath.Match(namePatternExclude, nameToTest); matchErr == nil && matched {
@ -93,11 +93,11 @@ func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath
return return
} }
func (f *Filer) doListValidEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
func (f *Filer) doListValidEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, recursive bool, delimiter bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
var expiredCount int64 var expiredCount int64
expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix, eachEntryFunc)
expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, recursive, delimiter, limit, prefix, eachEntryFunc)
for expiredCount > 0 && err == nil { for expiredCount > 0 && err == nil {
expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix, eachEntryFunc)
expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, recursive, delimiter, expiredCount, prefix, eachEntryFunc)
} }
return return
} }

2
weed/filer/filerstore.go

@ -11,6 +11,7 @@ const CountEntryChunksForGzip = 50
var ( var (
ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing") ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
ErrUnsupportedRecursivePrefixed = errors.New("unsupported recursive prefix listing")
ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing") ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing")
ErrKvNotImplemented = errors.New("kv not implemented yet") ErrKvNotImplemented = errors.New("kv not implemented yet")
ErrKvNotFound = errors.New("kv: not found") ErrKvNotFound = errors.New("kv: not found")
@ -31,6 +32,7 @@ type FilerStore interface {
DeleteFolderChildren(context.Context, util.FullPath) (err error) DeleteFolderChildren(context.Context, util.FullPath) (err error)
ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
BeginTransaction(ctx context.Context) (context.Context, error) BeginTransaction(ctx context.Context) (context.Context, error)
CommitTransaction(ctx context.Context) error CommitTransaction(ctx context.Context) error

9
weed/filer/filerstore_translate_path.go

@ -117,6 +117,15 @@ func (t *FilerStorePathTranslator) ListDirectoryEntries(ctx context.Context, dir
}) })
} }
func (t *FilerStorePathTranslator) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) {
newFullPath := t.translatePath(dirPath)
return t.actualStore.ListRecursivePrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, delimiter, limit, prefix, func(entry *Entry) bool {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
return eachEntryFunc(entry)
})
}
func (t *FilerStorePathTranslator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) { func (t *FilerStorePathTranslator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) {
newFullPath := t.translatePath(dirPath) newFullPath := t.translatePath(dirPath)

22
weed/filer/filerstore_wrapper.go

@ -274,6 +274,28 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
return lastFileName, err return lastFileName, err
} }
func (fsw *FilerStoreWrapper) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
actualStore := fsw.getActualStore(dirPath + "/")
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixRecursiveList").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixRecursiveList").Observe(time.Since(start).Seconds())
}()
if limit > math.MaxInt32-1 {
limit = math.MaxInt32 - 1
}
adjustedEntryFunc := func(entry *Entry) bool {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.GetChunks())
return eachEntryFunc(entry)
}
lastFileName, err = actualStore.ListRecursivePrefixedEntries(ctx, dirPath, startFileName, includeStartFile, delimiter, limit, prefix, adjustedEntryFunc)
if err == ErrUnsupportedListDirectoryPrefixed {
lastFileName, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, adjustedEntryFunc)
}
return lastFileName, err
}
func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) { func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
actualStore := fsw.getActualStore(dirPath + "/") actualStore := fsw.getActualStore(dirPath + "/")

4
weed/filer/hbase/hbase_store.go

@ -152,6 +152,10 @@ func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
} }
func (store *HbaseStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
expectedPrefix := []byte(dirPath.Child(prefix)) expectedPrefix := []byte(dirPath.Child(prefix))

4
weed/filer/leveldb/leveldb_store.go

@ -174,6 +174,10 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, dirPath wee
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
} }
func (store *LevelDBStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix) directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)

4
weed/filer/leveldb2/leveldb2_store.go

@ -180,6 +180,10 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, dirPath we
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
} }
func (store *LevelDB2Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
directoryPrefix, partitionId := genDirectoryKeyPrefix(dirPath, prefix, store.dbCount) directoryPrefix, partitionId := genDirectoryKeyPrefix(dirPath, prefix, store.dbCount)

4
weed/filer/leveldb3/leveldb3_store.go

@ -304,6 +304,10 @@ func (store *LevelDB3Store) ListDirectoryEntries(ctx context.Context, dirPath we
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
} }
func (store *LevelDB3Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
db, _, shortPath, err := store.findDB(dirPath, true) db, _, shortPath, err := store.findDB(dirPath, true)

4
weed/filer/mongodb/mongodb_store.go

@ -232,6 +232,10 @@ func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
} }
func (store *MongodbStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
var where = bson.M{"directory": string(dirPath), "name": bson.M{"$gt": startFileName}} var where = bson.M{"directory": string(dirPath), "name": bson.M{"$gt": startFileName}}
if includeStartFile { if includeStartFile {

4
weed/filer/mysql/mysql_sql_gen.go

@ -49,6 +49,10 @@ func (gen *SqlGenMysql) GetSqlListInclusive(tableName string) string {
return fmt.Sprintf("SELECT `name`, `meta` FROM `%s` WHERE `dirhash` = ? AND `name` >= ? AND `directory` = ? AND `name` LIKE ? ORDER BY `name` ASC LIMIT ?", tableName) return fmt.Sprintf("SELECT `name`, `meta` FROM `%s` WHERE `dirhash` = ? AND `name` >= ? AND `directory` = ? AND `name` LIKE ? ORDER BY `name` ASC LIMIT ?", tableName)
} }
func (gen *SqlGenMysql) GetSqlListRecursive(tableName string) string {
return fmt.Sprintf("SELECT `directory`, `name`, `meta` FROM `%s` WHERE CONCAT(`directory`, `name`) > ? AND ((`dirhash` = ? AND `name` like ?) OR CONCAT(`directory`, `name`) like ?) ORDER BY CONCAT(`directory`, `name`) ASC LIMIT ?", tableName)
}
func (gen *SqlGenMysql) GetSqlCreateTable(tableName string) string { func (gen *SqlGenMysql) GetSqlCreateTable(tableName string) string {
return fmt.Sprintf(gen.CreateTableSqlTemplate, tableName) return fmt.Sprintf(gen.CreateTableSqlTemplate, tableName)
} }

4
weed/filer/postgres/postgres_sql_gen.go

@ -49,6 +49,10 @@ func (gen *SqlGenPostgres) GetSqlListInclusive(tableName string) string {
return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, tableName) return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, tableName)
} }
func (gen *SqlGenPostgres) GetSqlListRecursive(tableName string) string {
return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash>$1 AND name>$2 AND directory like $3 AND name like $4 ORDER BY DIRECTORY,NAME ASC LIMIT $5`, tableName)
}
func (gen *SqlGenPostgres) GetSqlCreateTable(tableName string) string { func (gen *SqlGenPostgres) GetSqlCreateTable(tableName string) string {
return fmt.Sprintf(gen.CreateTableSqlTemplate, tableName) return fmt.Sprintf(gen.CreateTableSqlTemplate, tableName)
} }

4
weed/filer/redis/universal_redis_store.go

@ -138,6 +138,10 @@ func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Conte
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
} }
func (store *UniversalRedisStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath)) dirListKey := genDirectoryListKey(string(dirPath))

4
weed/filer/redis2/universal_redis_store.go

@ -165,6 +165,10 @@ func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Cont
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
} }
func (store *UniversalRedis2Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath)) dirListKey := genDirectoryListKey(string(dirPath))

4
weed/filer/redis3/universal_redis_store.go

@ -135,6 +135,10 @@ func (store *UniversalRedis3Store) ListDirectoryPrefixedEntries(ctx context.Cont
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
} }
func (store *UniversalRedis3Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath)) dirListKey := genDirectoryListKey(string(dirPath))

4
weed/filer/redis_lua/universal_redis_store.go

@ -133,6 +133,10 @@ func (store *UniversalRedisLuaStore) ListDirectoryPrefixedEntries(ctx context.Co
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
} }
func (store *UniversalRedisLuaStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath)) dirListKey := genDirectoryListKey(string(dirPath))

4
weed/filer/rocksdb/rocksdb_store.go

@ -235,6 +235,10 @@ func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, dirPath wee
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
} }
func (store *RocksDBStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix) directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)

12
weed/filer/sqlite/sqlite_store.go

@ -10,10 +10,10 @@ import (
"context" "context"
"database/sql" "database/sql"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/filer/mysql"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql" "github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
"github.com/seaweedfs/seaweedfs/weed/filer/mysql"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
_ "modernc.org/sqlite" _ "modernc.org/sqlite"
) )
@ -54,10 +54,12 @@ func (store *SqliteStore) Initialize(configuration util.Configuration, prefix st
func (store *SqliteStore) initialize(dbFile, createTable, upsertQuery string) (err error) { func (store *SqliteStore) initialize(dbFile, createTable, upsertQuery string) (err error) {
store.SupportBucketTable = true store.SupportBucketTable = true
store.SqlGenerator = &mysql.SqlGenMysql{
CreateTableSqlTemplate: createTable,
DropTableSqlTemplate: "drop table `%s`",
UpsertQueryTemplate: upsertQuery,
store.SqlGenerator = &SqlGenSqlite{
SqlGenMysql: mysql.SqlGenMysql{
CreateTableSqlTemplate: createTable,
DropTableSqlTemplate: "drop table `%s`",
UpsertQueryTemplate: upsertQuery,
},
} }
var dbErr error var dbErr error

21
weed/filer/sqlite/sqlite_store_gen.go

@ -0,0 +1,21 @@
package sqlite
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer/mysql"
_ "github.com/go-sql-driver/mysql"
"github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
)
type SqlGenSqlite struct {
mysql.SqlGenMysql
}
var (
_ = abstract_sql.SqlGenerator(&SqlGenSqlite{})
)
func (gen *SqlGenSqlite) GetSqlListRecursive(tableName string) string {
return fmt.Sprintf("SELECT `directory`, `name`, `meta` FROM `%s` WHERE `directory` || `name` > ? AND ((`dirhash` = ? AND `name` like ?) OR `directory` || `name` like ?) ORDER BY `directory` || `name` ASC LIMIT ?", tableName)
}

4
weed/filer/tikv/tikv_store.go

@ -210,6 +210,10 @@ func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.F
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
} }
func (store *TikvStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) { func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
lastFileName := "" lastFileName := ""
directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix) directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)

4
weed/filer/ydb/ydb_store.go

@ -289,6 +289,10 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath
return lastFileName, nil return lastFileName, nil
} }
func (store *YdbStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}
func (store *YdbStore) BeginTransaction(ctx context.Context) (context.Context, error) { func (store *YdbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
session, err := store.DB.Table().CreateSession(ctx) session, err := store.DB.Table().CreateSession(ctx)
if err != nil { if err != nil {

3
weed/pb/filer.proto

@ -100,10 +100,13 @@ message ListEntriesRequest {
string startFromFileName = 3; string startFromFileName = 3;
bool inclusiveStartFrom = 4; bool inclusiveStartFrom = 4;
uint32 limit = 5; uint32 limit = 5;
bool recursive = 6;
bool delimiter = 7;
} }
message ListEntriesResponse { message ListEntriesResponse {
Entry entry = 1; Entry entry = 1;
string path = 2;
} }
message RemoteEntry { message RemoteEntry {

28
weed/pb/filer_pb/filer.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.32.0
// protoc v4.25.3
// protoc-gen-go v1.34.1
// protoc v5.26.1
// source: filer.proto // source: filer.proto
package filer_pb package filer_pb
@ -132,6 +132,8 @@ type ListEntriesRequest struct {
StartFromFileName string `protobuf:"bytes,3,opt,name=startFromFileName,proto3" json:"startFromFileName,omitempty"` StartFromFileName string `protobuf:"bytes,3,opt,name=startFromFileName,proto3" json:"startFromFileName,omitempty"`
InclusiveStartFrom bool `protobuf:"varint,4,opt,name=inclusiveStartFrom,proto3" json:"inclusiveStartFrom,omitempty"` InclusiveStartFrom bool `protobuf:"varint,4,opt,name=inclusiveStartFrom,proto3" json:"inclusiveStartFrom,omitempty"`
Limit uint32 `protobuf:"varint,5,opt,name=limit,proto3" json:"limit,omitempty"` Limit uint32 `protobuf:"varint,5,opt,name=limit,proto3" json:"limit,omitempty"`
Recursive bool `protobuf:"varint,6,opt,name=recursive,proto3" json:"recursive,omitempty"`
Delimiter bool `protobuf:"varint,7,opt,name=delimiter,proto3" json:"delimiter,omitempty"`
} }
func (x *ListEntriesRequest) Reset() { func (x *ListEntriesRequest) Reset() {
@ -201,12 +203,27 @@ func (x *ListEntriesRequest) GetLimit() uint32 {
return 0 return 0
} }
func (x *ListEntriesRequest) GetRecursive() bool {
if x != nil {
return x.Recursive
}
return false
}
func (x *ListEntriesRequest) GetDelimiter() bool {
if x != nil {
return x.Delimiter
}
return false
}
type ListEntriesResponse struct { type ListEntriesResponse struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
Entry *Entry `protobuf:"bytes,1,opt,name=entry,proto3" json:"entry,omitempty"` Entry *Entry `protobuf:"bytes,1,opt,name=entry,proto3" json:"entry,omitempty"`
Path string `protobuf:"bytes,2,opt,name=path,proto3" json:"path,omitempty"`
} }
func (x *ListEntriesResponse) Reset() { func (x *ListEntriesResponse) Reset() {
@ -248,6 +265,13 @@ func (x *ListEntriesResponse) GetEntry() *Entry {
return nil return nil
} }
func (x *ListEntriesResponse) GetPath() string {
if x != nil {
return x.Path
}
return ""
}
type RemoteEntry struct { type RemoteEntry struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache

2
weed/pb/filer_pb/filer_grpc.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions: // versions:
// - protoc-gen-go-grpc v1.3.0 // - protoc-gen-go-grpc v1.3.0
// - protoc v4.25.3
// - protoc v5.26.1
// source: filer.proto // source: filer.proto
package filer_pb package filer_pb

115
weed/s3api/s3api_object_handlers_list.go

@ -147,6 +147,78 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
prefixEndsOnDelimiter: strings.HasSuffix(originalPrefix, "/") && len(originalMarker) == 0, prefixEndsOnDelimiter: strings.HasSuffix(originalPrefix, "/") && len(originalMarker) == 0,
} }
if s3a.option.AllowListRecursive && (delimiter == "" || delimiter == "/") {
reqDir = bucketPrefix
if idx := strings.LastIndex(originalPrefix, "/"); idx > 0 {
reqDir += originalPrefix[:idx]
prefix = originalPrefix[idx+1:]
}
// This is necessary for SQL request with WHERE `directory` || `name` > originalMarker
if len(originalMarker) > 0 && originalMarker[0:1] != "/" {
marker = s3a.getStartFileFromKey(originalMarker)
} else {
marker = originalMarker
}
response = ListBucketResult{
Name: bucket,
Prefix: originalPrefix,
Marker: originalMarker,
MaxKeys: int(maxKeys),
Delimiter: delimiter,
}
if encodingTypeUrl {
response.EncodingType = s3.EncodingTypeUrl
}
if maxKeys == 0 {
return
}
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
doErr = s3a.doListFilerRecursiveEntries(client, reqDir, prefix, cursor, marker, delimiter, false,
func(path string, entry *filer_pb.Entry) {
key := path[len(bucketPrefix):]
if cursor.isTruncated {
nextMarker = cursor.nextMarker
return
}
defer func() {
if cursor.maxKeys == 0 {
cursor.isTruncated = true
cursor.nextMarker = s3a.getStartFileFromKey(key)
}
}()
if delimiter == "/" {
if entry.IsDirectoryKeyObject() {
contents = append(contents, newListEntry(entry, key+"/", "", "", bucketPrefix, fetchOwner, false, encodingTypeUrl))
cursor.maxKeys--
return
}
if entry.IsDirectory {
var prefixKey string
if encodingTypeUrl {
prefixKey = urlPathEscape(key + "/")
} else {
prefixKey = key + "/"
}
commonPrefixes = append(commonPrefixes, PrefixEntry{
Prefix: prefixKey,
})
cursor.maxKeys--
return
}
}
contents = append(contents, newListEntry(entry, key, "", "", bucketPrefix, fetchOwner, false, encodingTypeUrl))
cursor.maxKeys--
},
)
return nil
})
response.NextMarker = nextMarker
response.IsTruncated = len(nextMarker) != 0
response.Contents = contents
response.CommonPrefixes = commonPrefixes
return
}
// check filer // check filer
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
for { for {
@ -248,6 +320,16 @@ type ListingCursor struct {
maxKeys uint16 maxKeys uint16
isTruncated bool isTruncated bool
prefixEndsOnDelimiter bool prefixEndsOnDelimiter bool
nextMarker string
}
func (s3a *S3ApiServer) getStartFileFromKey(key string) string {
idx := strings.LastIndex(key, "/")
if idx == -1 {
return "/" + key
}
return fmt.Sprintf("/%s%s", key[0:idx], key[idx+1:len(key)])
} }
// the prefix and marker may be in different directories // the prefix and marker may be in different directories
@ -308,6 +390,39 @@ func toParentAndDescendants(dirAndName string) (dir, name string) {
return return
} }
func (s3a *S3ApiServer) doListFilerRecursiveEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, cursor *ListingCursor, marker, delimiter string, inclusiveStartFrom bool, eachEntryFn func(dir string, entry *filer_pb.Entry)) (err error) {
if prefix == "/" && delimiter == "/" {
return
}
request := &filer_pb.ListEntriesRequest{
Directory: dir,
Prefix: prefix,
Limit: uint32(cursor.maxKeys) + 1,
StartFromFileName: marker,
InclusiveStartFrom: inclusiveStartFrom,
Recursive: true,
Delimiter: delimiter == "/",
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, listErr := client.ListEntries(ctx, request)
if listErr != nil {
return fmt.Errorf("list entires %+v: %v", request, listErr)
}
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return fmt.Errorf("iterating entires %+v: %v", request, recvErr)
}
}
eachEntryFn(resp.Path, resp.Entry)
}
return
}
func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, cursor *ListingCursor, marker, delimiter string, inclusiveStartFrom bool, eachEntryFn func(dir string, entry *filer_pb.Entry)) (nextMarker string, err error) { func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, cursor *ListingCursor, marker, delimiter string, inclusiveStartFrom bool, eachEntryFn func(dir string, entry *filer_pb.Entry)) (nextMarker string, err error) {
// invariants // invariants
// prefix and marker should be under dir, marker may contain "/" // prefix and marker should be under dir, marker may contain "/"

1
weed/s3api/s3api_server.go

@ -32,6 +32,7 @@ type S3ApiServerOption struct {
GrpcDialOption grpc.DialOption GrpcDialOption grpc.DialOption
AllowEmptyFolder bool AllowEmptyFolder bool
AllowDeleteBucketNotEmpty bool AllowDeleteBucketNotEmpty bool
AllowListRecursive bool
LocalFilerSocket string LocalFilerSocket string
DataCenter string DataCenter string
FilerGroup string FilerGroup string

9
weed/server/filer_grpc_server.go

@ -47,8 +47,12 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
} }
paginationLimit := filer.PaginationSize paginationLimit := filer.PaginationSize
if limit < paginationLimit {
if paginationLimit > limit {
paginationLimit = limit paginationLimit = limit
// for skipping parent folders
if req.Recursive && !req.Delimiter {
paginationLimit *= 2
}
} }
lastFileName := req.StartFromFileName lastFileName := req.StartFromFileName
@ -56,10 +60,11 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
var listErr error var listErr error
for limit > 0 { for limit > 0 {
var hasEntries bool var hasEntries bool
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) bool {
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, req.Recursive, req.Delimiter, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) bool {
hasEntries = true hasEntries = true
if err = stream.Send(&filer_pb.ListEntriesResponse{ if err = stream.Send(&filer_pb.ListEntriesResponse{
Entry: entry.ToProtoEntry(), Entry: entry.ToProtoEntry(),
Path: string(entry.FullPath),
}); err != nil { }); err != nil {
return false return false
} }

2
weed/server/filer_grpc_server_traverse_meta.go

@ -63,7 +63,7 @@ func (fs *FilerServer) iterateDirectory(ctx context.Context, dirPath util.FullPa
var listErr error var listErr error
for { for {
var hasEntries bool var hasEntries bool
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) bool {
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, false, false, 1024, "", "", "", func(entry *filer.Entry) bool {
hasEntries = true hasEntries = true
if fnErr := fn(entry); fnErr != nil { if fnErr := fn(entry); fnErr != nil {
err = fnErr err = fnErr

Loading…
Cancel
Save