Browse Source

fix ceph tests

pull/5580/head
Konstantin Lebedev 8 months ago
parent
commit
c9f3eead8c
  1. 37
      weed/filer/abstract_sql/abstract_sql_store.go
  2. 2
      weed/filer/arangodb/arangodb_store.go
  3. 2
      weed/filer/cassandra/cassandra_store.go
  4. 2
      weed/filer/elastic/v7/elastic_store.go
  5. 2
      weed/filer/etcd/etcd_store.go
  6. 4
      weed/filer/filer.go
  7. 20
      weed/filer/filer_search.go
  8. 2
      weed/filer/filerstore.go
  9. 4
      weed/filer/filerstore_translate_path.go
  10. 4
      weed/filer/filerstore_wrapper.go
  11. 2
      weed/filer/hbase/hbase_store.go
  12. 2
      weed/filer/leveldb/leveldb_store.go
  13. 2
      weed/filer/leveldb2/leveldb2_store.go
  14. 2
      weed/filer/leveldb3/leveldb3_store.go
  15. 2
      weed/filer/mongodb/mongodb_store.go
  16. 2
      weed/filer/mysql/mysql_sql_gen.go
  17. 2
      weed/filer/redis/universal_redis_store.go
  18. 2
      weed/filer/redis2/universal_redis_store.go
  19. 2
      weed/filer/redis3/universal_redis_store.go
  20. 2
      weed/filer/redis_lua/universal_redis_store.go
  21. 2
      weed/filer/rocksdb/rocksdb_store.go
  22. 2
      weed/filer/tikv/tikv_store.go
  23. 2
      weed/filer/ydb/ydb_store.go
  24. 3
      weed/pb/filer.proto
  25. 30
      weed/pb/filer_pb/filer.pb.go
  26. 7
      weed/s3api/s3api_object_handlers.go
  27. 132
      weed/s3api/s3api_object_handlers_list.go
  28. 2
      weed/s3api/s3api_xsd_generated.go
  29. 14
      weed/server/filer_grpc_server.go
  30. 2
      weed/server/master_grpc_server.go
  31. 2
      weed/topology/data_node.go
  32. 2
      weed/topology/topology.go

37
weed/filer/abstract_sql/abstract_sql_store.go

@ -333,37 +333,58 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
return lastFileName, nil
}
func (store *AbstractSqlStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *AbstractSqlStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
bucketDir := fmt.Sprintf("/buckets/%s", bucket)
if err != nil {
return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
}
glog.V(5).Infof("ListRecursivePrefixedEntries lastFileName %s shortPath %v, prefix %v, sql %s", lastFileName, string(shortPath), prefix, store.GetSqlListRecursive(bucket))
rows, err := db.QueryContext(ctx, store.GetSqlListRecursive(bucket), startFileName, util.HashStringToLong(string(shortPath)), prefix+"%", string(shortPath)+prefix+"%", limit+1)
shortDir := string(shortPath)
var dirPrefix string
if shortDir == "/" {
dirPrefix = fmt.Sprintf("/%s%%", prefix)
} else {
dirPrefix = fmt.Sprintf("%s/%s%%", shortDir, prefix)
}
glog.V(0).Infof("ListRecursivePrefixedEntries %s lastFileName %s shortPath %v, prefix %v, startFileName %s, limit %d, delimiter %v, dirPrefix %s", string(dirPath), lastFileName, string(shortPath), prefix, startFileName, limit, delimiter, dirPrefix)
rows, err := db.QueryContext(ctx, store.GetSqlListRecursive(bucket), startFileName, util.HashStringToLong(shortDir), prefix+"%", dirPrefix, limit+1)
if err != nil {
glog.Errorf("list %s : %v", dirPath, err)
return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
defer rows.Close()
for rows.Next() {
var dir, name string
var dir, name, fileName string
var data []byte
if err = rows.Scan(&dir, &name, &data); err != nil {
glog.V(0).Infof("scan %s : %v", dirPath, err)
return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
}
glog.V(0).Infof("scan dir %s name %v", dir, name)
if len(dir) != 1 {
fileName = fmt.Sprintf("%s/%s", dir, name)
} else {
fileName = dir + name
}
lastFileName = fmt.Sprintf("%s/%s", dir, name)
entry := &filer.Entry{
FullPath: util.NewFullPath(string(dirPath), lastFileName),
FullPath: util.NewFullPath(bucketDir, fileName),
}
glog.V(0).Infof("scan shortDir %s dir %s name %v, lastFileName %s, FullPath %s", shortDir, dir, name, lastFileName, string(entry.FullPath))
if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
}
//if !delimiter && shortDir != dir && entry.IsDirectory() {
if !delimiter && entry.IsDirectory() {
glog.V(0).Infof("scan isDir %v skip %v", entry.IsDirectory(), entry.FullPath)
continue
}
if delimiter && shortDir != dir && !entry.IsDirectory() {
glog.V(0).Infof("scan isDir %v skip %v", entry.IsDirectory(), entry.FullPath)
continue
}
if !eachEntryFunc(entry) {
break
}

2
weed/filer/arangodb/arangodb_store.go

@ -291,7 +291,7 @@ func (store *ArangodbStore) ListDirectoryEntries(ctx context.Context, dirPath ut
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *ArangodbStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *ArangodbStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

2
weed/filer/cassandra/cassandra_store.go

@ -183,7 +183,7 @@ func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, d
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *CassandraStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *CassandraStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

2
weed/filer/elastic/v7/elastic_store.go

@ -103,7 +103,7 @@ func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *ElasticStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *ElasticStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

2
weed/filer/etcd/etcd_store.go

@ -177,7 +177,7 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_
return nil
}
func (store *EtcdStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *EtcdStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

4
weed/filer/filer.go

@ -354,7 +354,7 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
}
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, recursive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, recursive bool, delimiter bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
listFn := func(entry *Entry) bool {
select {
case <-ctx.Done():
@ -372,7 +372,7 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
}
glog.V(5).Infof("doListDirectoryEntries recursive %v path: %+v, prefix %s", recursive, p, prefix)
if recursive {
lastFileName, err = f.Store.ListRecursivePrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, listFn)
lastFileName, err = f.Store.ListRecursivePrefixedEntries(ctx, p, startFileName, inclusive, delimiter, limit, prefix, listFn)
} else {
lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, listFn)
}

20
weed/filer/filer_search.go

@ -28,7 +28,7 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start
limit = math.MaxInt32 - 1
}
_, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, false, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) bool {
_, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, false, false, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) bool {
entries = append(entries, entry)
return true
})
@ -42,7 +42,7 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start
}
// For now, prefix and namePattern are mutually exclusive
func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, recursive bool, limit int64, prefix string, namePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, recursive bool, delimiter bool, limit int64, prefix string, namePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
glog.V(5).Infof("StreamListDirectoryEntries p %v startFileName %s prefix %s namePattern %v, recursive %v", p, startFileName, prefix, namePattern, recursive)
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
@ -55,24 +55,24 @@ func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath,
}
var missedCount int64
missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, recursive, limit, prefix, restNamePattern, namePatternExclude, eachEntryFunc)
missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, recursive, delimiter, limit, prefix, restNamePattern, namePatternExclude, eachEntryFunc)
for missedCount > 0 && err == nil {
missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, recursive, missedCount, prefix, restNamePattern, namePatternExclude, eachEntryFunc)
missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, recursive, delimiter, missedCount, prefix, restNamePattern, namePatternExclude, eachEntryFunc)
}
return
}
func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, recursive bool, limit int64, prefix, restNamePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (missedCount int64, lastFileName string, err error) {
func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, recursive bool, delimiter bool, limit int64, prefix, restNamePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (missedCount int64, lastFileName string, err error) {
glog.V(5).Infof("doListPatternMatchedEntries startFileName %v, recursive %v", startFileName, recursive)
if len(restNamePattern) == 0 && len(namePatternExclude) == 0 {
lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, recursive, limit, prefix, eachEntryFunc)
lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, recursive, delimiter, limit, prefix, eachEntryFunc)
return 0, lastFileName, err
}
lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, recursive, limit, prefix, func(entry *Entry) bool {
lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, recursive, delimiter, limit, prefix, func(entry *Entry) bool {
nameToTest := entry.Name()
if len(namePatternExclude) > 0 {
if matched, matchErr := filepath.Match(namePatternExclude, nameToTest); matchErr == nil && matched {
@ -97,13 +97,13 @@ func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath
return
}
func (f *Filer) doListValidEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, recursive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
func (f *Filer) doListValidEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, recursive bool, delimiter bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
glog.V(5).Infof("doListValidEntries p %v startFileName %v, recursive %v", p, startFileName, recursive)
var expiredCount int64
expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, recursive, limit, prefix, eachEntryFunc)
expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, recursive, delimiter, limit, prefix, eachEntryFunc)
for expiredCount > 0 && err == nil {
expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, recursive, expiredCount, prefix, eachEntryFunc)
expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, recursive, delimiter, expiredCount, prefix, eachEntryFunc)
}
return
}

2
weed/filer/filerstore.go

@ -32,7 +32,7 @@ type FilerStore interface {
DeleteFolderChildren(context.Context, util.FullPath) (err error)
ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
BeginTransaction(ctx context.Context) (context.Context, error)
CommitTransaction(ctx context.Context) error

4
weed/filer/filerstore_translate_path.go

@ -118,12 +118,12 @@ func (t *FilerStorePathTranslator) ListDirectoryEntries(ctx context.Context, dir
})
}
func (t *FilerStorePathTranslator) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) {
func (t *FilerStorePathTranslator) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) {
glog.V(5).Infof("ListRecursivePrefixedEntries dirPath %v", dirPath)
newFullPath := t.translatePath(dirPath)
return t.actualStore.ListRecursivePrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool {
return t.actualStore.ListRecursivePrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, delimiter, limit, prefix, func(entry *Entry) bool {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
return eachEntryFunc(entry)
})

4
weed/filer/filerstore_wrapper.go

@ -274,7 +274,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
return lastFileName, err
}
func (fsw *FilerStoreWrapper) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
func (fsw *FilerStoreWrapper) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
actualStore := fsw.getActualStore(dirPath + "/")
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixRecursiveList").Inc()
start := time.Now()
@ -290,7 +290,7 @@ func (fsw *FilerStoreWrapper) ListRecursivePrefixedEntries(ctx context.Context,
filer_pb.AfterEntryDeserialization(entry.GetChunks())
return eachEntryFunc(entry)
}
lastFileName, err = actualStore.ListRecursivePrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, adjustedEntryFunc)
lastFileName, err = actualStore.ListRecursivePrefixedEntries(ctx, dirPath, startFileName, includeStartFile, delimiter, limit, prefix, adjustedEntryFunc)
if err == ErrUnsupportedListDirectoryPrefixed {
lastFileName, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, adjustedEntryFunc)
}

2
weed/filer/hbase/hbase_store.go

@ -152,7 +152,7 @@ func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *HbaseStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *HbaseStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

2
weed/filer/leveldb/leveldb_store.go

@ -174,7 +174,7 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, dirPath wee
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *LevelDBStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *LevelDBStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

2
weed/filer/leveldb2/leveldb2_store.go

@ -178,7 +178,7 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, dirPath we
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *LevelDB2Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *LevelDB2Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

2
weed/filer/leveldb3/leveldb3_store.go

@ -301,7 +301,7 @@ func (store *LevelDB3Store) ListDirectoryEntries(ctx context.Context, dirPath we
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *LevelDB3Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *LevelDB3Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

2
weed/filer/mongodb/mongodb_store.go

@ -232,7 +232,7 @@ func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *MongodbStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *MongodbStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

2
weed/filer/mysql/mysql_sql_gen.go

@ -50,7 +50,7 @@ func (gen *SqlGenMysql) GetSqlListInclusive(tableName string) string {
}
func (gen *SqlGenMysql) GetSqlListRecursive(tableName string) string {
return fmt.Sprintf("SELECT `directory`, `name`, `meta` FROM `%s` WHERE `directory` || '/' || `name` > ? AND ((dirhash == ? AND `name` like ?) OR `directory` like ?) ORDER BY `directory`,`name` ASC LIMIT ?", tableName)
return fmt.Sprintf("SELECT `directory`, `name`, `meta` FROM `%s` WHERE `directory` || '/' || `name` > ? AND ((`dirhash` == ? AND `name` like ?) OR `directory` like ?) ORDER BY `directory` || '/' || `name` ASC LIMIT ?", tableName)
}
func (gen *SqlGenMysql) GetSqlCreateTable(tableName string) string {

2
weed/filer/redis/universal_redis_store.go

@ -138,7 +138,7 @@ func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Conte
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *UniversalRedisStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *UniversalRedisStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

2
weed/filer/redis2/universal_redis_store.go

@ -165,7 +165,7 @@ func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Cont
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *UniversalRedis2Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *UniversalRedis2Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

2
weed/filer/redis3/universal_redis_store.go

@ -135,7 +135,7 @@ func (store *UniversalRedis3Store) ListDirectoryPrefixedEntries(ctx context.Cont
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *UniversalRedis3Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *UniversalRedis3Store) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

2
weed/filer/redis_lua/universal_redis_store.go

@ -133,7 +133,7 @@ func (store *UniversalRedisLuaStore) ListDirectoryPrefixedEntries(ctx context.Co
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *UniversalRedisLuaStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *UniversalRedisLuaStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

2
weed/filer/rocksdb/rocksdb_store.go

@ -235,7 +235,7 @@ func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, dirPath wee
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *RocksDBStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *RocksDBStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

2
weed/filer/tikv/tikv_store.go

@ -210,7 +210,7 @@ func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.F
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *TikvStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *TikvStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, delimiter bool, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

2
weed/filer/ydb/ydb_store.go

@ -289,7 +289,7 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath
return lastFileName, nil
}
func (store *YdbStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *YdbStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedRecursivePrefixed
}

3
weed/pb/filer.proto

@ -98,11 +98,12 @@ message ListEntriesRequest {
bool inclusiveStartFrom = 4;
uint32 limit = 5;
bool recursive = 6;
bool delimiter = 7;
}
message ListEntriesResponse {
Entry entry = 1;
string dir = 2;
string path = 2;
}
message RemoteEntry {

30
weed/pb/filer_pb/filer.pb.go

@ -133,6 +133,7 @@ type ListEntriesRequest struct {
InclusiveStartFrom bool `protobuf:"varint,4,opt,name=inclusiveStartFrom,proto3" json:"inclusiveStartFrom,omitempty"`
Limit uint32 `protobuf:"varint,5,opt,name=limit,proto3" json:"limit,omitempty"`
Recursive bool `protobuf:"varint,6,opt,name=recursive,proto3" json:"recursive,omitempty"`
Delimiter bool `protobuf:"varint,7,opt,name=delimiter,proto3" json:"delimiter,omitempty"`
}
func (x *ListEntriesRequest) Reset() {
@ -209,13 +210,20 @@ func (x *ListEntriesRequest) GetRecursive() bool {
return false
}
func (x *ListEntriesRequest) GetDelimiter() bool {
if x != nil {
return x.Delimiter
}
return false
}
type ListEntriesResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Entry *Entry `protobuf:"bytes,1,opt,name=entry,proto3" json:"entry,omitempty"`
Dir string `protobuf:"bytes,2,opt,name=dir,proto3" json:"dir,omitempty"`
Path string `protobuf:"bytes,2,opt,name=path,proto3" json:"path,omitempty"`
}
func (x *ListEntriesResponse) Reset() {
@ -257,9 +265,9 @@ func (x *ListEntriesResponse) GetEntry() *Entry {
return nil
}
func (x *ListEntriesResponse) GetDir() string {
func (x *ListEntriesResponse) GetPath() string {
if x != nil {
return x.Dir
return x.Path
}
return ""
}
@ -4368,7 +4376,7 @@ var file_filer_proto_rawDesc = []byte{
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x6e, 0x74, 0x72,
0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f,
0x70, 0x62, 0x2e, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x22,
0xdc, 0x01, 0x0a, 0x12, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x52,
0xfa, 0x01, 0x0a, 0x12, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74,
0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63,
0x74, 0x6f, 0x72, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x18, 0x02,
@ -4381,12 +4389,14 @@ var file_filer_proto_rawDesc = []byte{
0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x46, 0x72, 0x6f, 0x6d, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69,
0x6d, 0x69, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74,
0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x63, 0x75, 0x72, 0x73, 0x69, 0x76, 0x65, 0x18, 0x06, 0x20,
0x01, 0x28, 0x08, 0x52, 0x09, 0x72, 0x65, 0x63, 0x75, 0x72, 0x73, 0x69, 0x76, 0x65, 0x22, 0x4e,
0x0a, 0x13, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x18, 0x01,
0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e,
0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03,
0x64, 0x69, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x64, 0x69, 0x72, 0x22, 0xc8,
0x01, 0x28, 0x08, 0x52, 0x09, 0x72, 0x65, 0x63, 0x75, 0x72, 0x73, 0x69, 0x76, 0x65, 0x12, 0x1c,
0x0a, 0x09, 0x64, 0x65, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x65, 0x72, 0x18, 0x07, 0x20, 0x01, 0x28,
0x08, 0x52, 0x09, 0x64, 0x65, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x65, 0x72, 0x22, 0x50, 0x0a, 0x13,
0x4c, 0x69, 0x73, 0x74, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01,
0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x45, 0x6e,
0x74, 0x72, 0x79, 0x52, 0x05, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61,
0x74, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x22, 0xc8,
0x01, 0x0a, 0x0b, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x21,
0x0a, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4e, 0x61, 0x6d,

7
weed/s3api/s3api_object_handlers.go

@ -72,7 +72,7 @@ func removeDuplicateSlashes(object string) string {
return result.String()
}
func newListEntry(entry *filer_pb.Entry, dir string, name string, bucketPrefix string, fetchOwner bool, isDirectory bool) (listEntry ListEntry) {
func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bucketPrefix string, fetchOwner bool, isDirectory bool) (listEntry ListEntry) {
storageClass := "STANDARD"
if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok {
storageClass = string(v)
@ -81,8 +81,11 @@ func newListEntry(entry *filer_pb.Entry, dir string, name string, bucketPrefix s
if isDirectory {
keyFormat += "/"
}
if key == "" {
key = fmt.Sprintf(keyFormat, dir, name)[len(bucketPrefix):]
}
listEntry = ListEntry{
Key: fmt.Sprintf(keyFormat, dir, name)[len(bucketPrefix):],
Key: key,
LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
ETag: "\"" + filer.ETag(entry) + "\"",
Size: int64(filer.FileSize(entry)),

132
weed/s3api/s3api_object_handlers_list.go

@ -33,7 +33,7 @@ type ListBucketResultV2 struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"`
Name string `xml:"Name"`
Prefix string `xml:"Prefix"`
MaxKeys int `xml:"MaxKeys"`
MaxKeys uint16 `xml:"MaxKeys"`
Delimiter string `xml:"Delimiter,omitempty"`
IsTruncated bool `xml:"IsTruncated"`
Contents []ListEntry `xml:"Contents,omitempty"`
@ -51,7 +51,7 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ
// collect parameters
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("ListObjectsV2Handler %s", bucket)
glog.V(0).Infof("ListObjectsV2Handler %s query %+v", bucket, r.URL.Query())
originalPrefix, startAfter, delimiter, continuationToken, encodingTypeUrl, fetchOwner, maxKeys := getListObjectsV2Args(r.URL.Query())
@ -106,7 +106,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
// collect parameters
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("ListObjectsV1Handler %s", bucket)
glog.V(0).Infof("ListObjectsV1Handler %s query %+v", bucket, r.URL.Query())
originalPrefix, marker, delimiter, encodingTypeUrl, maxKeys := getListObjectsV1Args(r.URL.Query())
@ -114,8 +114,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxKeys)
return
}
response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter, encodingTypeUrl, true)
response, err := s3a.listFilerEntries(bucket, originalPrefix, uint16(maxKeys), marker, delimiter, encodingTypeUrl, true)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
@ -132,7 +131,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
writeSuccessResponseXML(w, r, response)
}
func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys int, originalMarker string, delimiter string, encodingTypeUrl bool, fetchOwner bool) (response ListBucketResult, err error) {
func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys uint16, originalMarker string, delimiter string, encodingTypeUrl bool, fetchOwner bool) (response ListBucketResult, err error) {
// convert full path prefix into directory name and prefix for entry name
requestDir, prefix, marker := normalizePrefixMarker(originalPrefix, originalMarker)
bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
@ -150,52 +149,78 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
prefixEndsOnDelimiter: strings.HasSuffix(originalPrefix, "/") && len(originalMarker) == 0,
}
// Todo remove force disable
if s3a.option.AllowListRecursive && prefix != "" && (delimiter == "" || delimiter == "/") {
if s3a.option.AllowListRecursive && (delimiter == "" || delimiter == "/") {
reqDir = bucketPrefix
if idx := strings.LastIndex(originalPrefix, "/"); idx > 0 {
reqDir += originalPrefix[:idx]
prefix = originalPrefix[idx+1:]
}
// This is necessary for SQL request with WHERE `directory` || '/' || `name` > originalMarker
if len(originalMarker) > 0 && originalMarker[0:1] != "/" {
if reqDir == bucketPrefix {
marker = "//" + originalMarker
} else {
marker = "/" + originalMarker
}
} else {
marker = originalMarker
}
response = ListBucketResult{
Name: bucket,
Prefix: originalPrefix,
Marker: originalMarker,
MaxKeys: maxKeys,
Delimiter: delimiter,
}
if encodingTypeUrl {
response.EncodingType = s3.EncodingTypeUrl
}
if maxKeys == 0 {
return
}
glog.V(0).Infof("listFilerEntries reqDir: %s, prefix: %s[%s], delimiter: %v, cursor: %+v, mmarker: %s[%s]", reqDir, prefix, originalPrefix, delimiter, cursor, marker, originalMarker)
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
glog.V(0).Infof("doListFilerRecursiveEntries reqDir: %s, prefix: %s, delimiter: %s, cursor: %+v", reqDir, prefix, delimiter, cursor)
nextMarker, doErr = s3a.doListFilerRecursiveEntries(client, reqDir, prefix, cursor, marker, delimiter, false,
func(dir string, entry *filer_pb.Entry) {
glog.V(5).Infof("doListFilerRecursiveEntries dir %s, shortDir %s, entry: %+v, cursor: %+v", dir, dir[len(bucketPrefix):], entry, cursor)
doErr = s3a.doListFilerRecursiveEntries(client, reqDir, prefix, cursor, marker, delimiter, false,
func(path string, entry *filer_pb.Entry) {
isCommonDir := strings.Index(path[len(reqDir)+1:], "/") != -1
key := path[len(bucketPrefix):]
glog.V(0).Infof("doListFilerRecursiveEntries path %s, shortDir %s, key: %+v, cursor: %+v, marker: %s[%s], nextMarker: %s, isCommonDir %v", path, path[len(reqDir):], key, cursor, marker, originalMarker, cursor.nextMarker, isCommonDir)
if cursor.isTruncated {
nextMarker = cursor.nextMarker
return
}
dirName, entryName, prefixName := entryUrlEncode(dir, entry.Name, encodingTypeUrl)
isCommonDir := strings.Index(dir[len(bucketPrefix):], "/") != -1
if cursor.prefixEndsOnDelimiter && !isCommonDir && entry.Name == prefix {
return
}
defer func() {
if cursor.maxKeys == 0 {
cursor.isTruncated = true
if strings.Index(key, "/") == -1 {
cursor.nextMarker = "//" + key
} else {
cursor.nextMarker = "/" + key
}
}
}()
if delimiter == "/" {
if entry.IsDirectory {
commonPrefixes = append(commonPrefixes, PrefixEntry{
Prefix: fmt.Sprintf("%s/%s/", dirName, prefixName)[len(bucketPrefix):],
Prefix: path[len(bucketPrefix):] + "/",
})
cursor.Decrease()
cursor.maxKeys--
return
// Todo use sql group by dir
} else if isCommonDir {
return
}
}
contents = append(contents, newListEntry(entry, dirName, entryName, bucketPrefix, fetchOwner, entry.IsDirectoryKeyObject()))
cursor.Decrease()
contents = append(contents, newListEntry(entry, key, "", "", bucketPrefix, fetchOwner, entry.IsDirectoryKeyObject()))
cursor.maxKeys--
},
)
return nil
})
response = ListBucketResult{
Name: bucket,
Prefix: originalPrefix,
Marker: originalMarker,
NextMarker: nextMarker,
MaxKeys: maxKeys,
Delimiter: delimiter,
IsTruncated: cursor.isTruncated,
Contents: contents,
CommonPrefixes: commonPrefixes,
}
if encodingTypeUrl {
response.EncodingType = s3.EncodingTypeUrl
}
response.NextMarker = nextMarker
response.IsTruncated = len(nextMarker) != 0
response.Contents = contents
response.CommonPrefixes = commonPrefixes
return
}
@ -205,11 +230,11 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
empty := true
nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, cursor, marker, delimiter, false, func(dir string, entry *filer_pb.Entry) {
empty = false
glog.V(5).Infof("doListFilerEntries dir: %s entry: %+v", dir, entry)
glog.V(0).Infof("doListFilerEntries dir: %s entry: %+v", dir, entry)
dirName, entryName, prefixName := entryUrlEncode(dir, entry.Name, encodingTypeUrl)
if entry.IsDirectory {
if entry.IsDirectoryKeyObject() {
contents = append(contents, newListEntry(entry, dirName, entryName, bucketPrefix, fetchOwner, true))
contents = append(contents, newListEntry(entry, "", dirName, entryName, bucketPrefix, fetchOwner, true))
cursor.maxKeys--
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
} else if delimiter == "/" { // A response can contain CommonPrefixes only if you specify a delimiter.
@ -252,7 +277,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
}
}
if !delimiterFound {
contents = append(contents, newListEntry(entry, dirName, entryName, bucketPrefix, fetchOwner, false))
contents = append(contents, newListEntry(entry, "", dirName, entryName, bucketPrefix, fetchOwner, false))
cursor.maxKeys--
}
}
@ -299,9 +324,10 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
}
type ListingCursor struct {
maxKeys int
maxKeys uint16
isTruncated bool
prefixEndsOnDelimiter bool
nextMarker string
}
func (l *ListingCursor) Decrease() {
@ -369,26 +395,24 @@ func toParentAndDescendants(dirAndName string) (dir, name string) {
return
}
func (s3a *S3ApiServer) doListFilerRecursiveEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, cursor *ListingCursor, marker, delimiter string, inclusiveStartFrom bool, eachEntryFn func(dir string, entry *filer_pb.Entry)) (nextMarker string, err error) {
func (s3a *S3ApiServer) doListFilerRecursiveEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, cursor *ListingCursor, marker, delimiter string, inclusiveStartFrom bool, eachEntryFn func(dir string, entry *filer_pb.Entry)) (err error) {
if prefix == "/" && delimiter == "/" {
return
}
request := &filer_pb.ListEntriesRequest{
Directory: dir,
Prefix: prefix,
Limit: uint32(cursor.maxKeys),
Limit: uint32(cursor.maxKeys) + 1,
StartFromFileName: marker,
InclusiveStartFrom: inclusiveStartFrom,
Recursive: true,
}
if cursor.prefixEndsOnDelimiter {
request.Limit += 1
Delimiter: delimiter == "/",
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, listErr := client.ListEntries(ctx, request)
if listErr != nil {
return "", fmt.Errorf("list entires %+v: %v", request, listErr)
return fmt.Errorf("list entires %+v: %v", request, listErr)
}
for {
resp, recvErr := stream.Recv()
@ -396,10 +420,10 @@ func (s3a *S3ApiServer) doListFilerRecursiveEntries(client filer_pb.SeaweedFiler
if recvErr == io.EOF {
break
} else {
return "", fmt.Errorf("iterating entires %+v: %v", request, recvErr)
return fmt.Errorf("iterating entires %+v: %v", request, recvErr)
}
}
eachEntryFn(resp.Dir, resp.Entry)
eachEntryFn(resp.Path, resp.Entry)
}
return
}
@ -408,7 +432,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
// invariants
// prefix and marker should be under dir, marker may contain "/"
// maxKeys should be updated for each recursion
// glog.V(4).Infof("doListFilerEntries dir: %s, prefix: %s, marker %s, maxKeys: %d, prefixEndsOnDelimiter: %+v", dir, prefix, marker, cursor.maxKeys, cursor.prefixEndsOnDelimiter)
glog.V(0).Infof("doListFilerEntries dir: %s, prefix: %s, marker %s, maxKeys: %d, prefixEndsOnDelimiter: %+v", dir, prefix, marker, cursor.maxKeys, cursor.prefixEndsOnDelimiter)
if prefix == "/" && delimiter == "/" {
return
}
@ -524,14 +548,16 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
return
}
func getListObjectsV2Args(values url.Values) (prefix, startAfter, delimiter string, token OptionalString, encodingTypeUrl bool, fetchOwner bool, maxkeys int) {
func getListObjectsV2Args(values url.Values) (prefix, startAfter, delimiter string, token OptionalString, encodingTypeUrl bool, fetchOwner bool, maxkeys uint16) {
prefix = values.Get("prefix")
token = OptionalString{set: values.Has("continuation-token"), string: values.Get("continuation-token")}
startAfter = values.Get("start-after")
delimiter = values.Get("delimiter")
encodingTypeUrl = values.Get("encoding-type") == s3.EncodingTypeUrl
if values.Get("max-keys") != "" {
maxkeys, _ = strconv.Atoi(values.Get("max-keys"))
if maxKeys, err := strconv.ParseUint(values.Get("max-keys"), 10, 16); err == nil {
maxkeys = uint16(maxKeys)
}
} else {
maxkeys = maxObjectListSizeLimit
}
@ -539,13 +565,15 @@ func getListObjectsV2Args(values url.Values) (prefix, startAfter, delimiter stri
return
}
func getListObjectsV1Args(values url.Values) (prefix, marker, delimiter string, encodingTypeUrl bool, maxkeys int) {
func getListObjectsV1Args(values url.Values) (prefix, marker, delimiter string, encodingTypeUrl bool, maxkeys int16) {
prefix = values.Get("prefix")
marker = values.Get("marker")
delimiter = values.Get("delimiter")
encodingTypeUrl = values.Get("encoding-type") == "url"
if values.Get("max-keys") != "" {
maxkeys, _ = strconv.Atoi(values.Get("max-keys"))
if maxKeys, err := strconv.ParseInt(values.Get("max-keys"), 10, 16); err == nil {
maxkeys = int16(maxKeys)
}
} else {
maxkeys = maxObjectListSizeLimit
}

2
weed/s3api/s3api_xsd_generated.go

@ -593,7 +593,7 @@ type ListBucketResult struct {
Prefix string `xml:"Prefix"`
Marker string `xml:"Marker"`
NextMarker string `xml:"NextMarker,omitempty"`
MaxKeys int `xml:"MaxKeys"`
MaxKeys uint16 `xml:"MaxKeys"`
Delimiter string `xml:"Delimiter,omitempty"`
EncodingType string `xml:"EncodingType,omitempty"`
IsTruncated bool `xml:"IsTruncated"`

14
weed/server/filer_grpc_server.go

@ -47,8 +47,11 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
}
paginationLimit := filer.PaginationSize
if limit < paginationLimit {
if paginationLimit > limit && !req.Delimiter {
paginationLimit = limit
if req.Recursive {
paginationLimit *= 2
}
}
lastFileName := req.StartFromFileName
@ -56,17 +59,15 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
var listErr error
for limit > 0 {
var hasEntries bool
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, req.Recursive, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) bool {
//glog.V(0).Infof("StreamListDirectoryEntries req %+v", req)
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, req.Recursive, req.Delimiter, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) bool {
hasEntries = true
glog.V(5).Infof("StreamListDirectoryEntries recursive %v, entry: %+v", req.Recursive, entry)
dir, _ := entry.FullPath.DirAndName()
if err = stream.Send(&filer_pb.ListEntriesResponse{
Entry: entry.ToProtoEntry(),
Dir: dir,
Path: string(entry.FullPath),
}); err != nil {
return false
}
limit--
if limit == 0 {
return false
@ -83,7 +84,6 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
if !hasEntries {
return nil
}
includeLastFile = false
}

2
weed/server/master_grpc_server.go

@ -200,7 +200,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
message.NewVids = append(message.NewVids, uint32(v.Id))
}
for _, v := range deletedVolumes {
glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url())
glog.V(1).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url())
message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
}
}

2
weed/topology/data_node.go

@ -77,7 +77,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
for _, v := range existingVolumes {
vid := v.Id
if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid)
glog.V(1).Infoln("Deleting volume id:", vid)
disk := dn.getOrCreateDisk(v.DiskType)
delete(disk.volumes, vid)
deletedVolumes = append(deletedVolumes, v)

2
weed/topology/topology.go

@ -273,7 +273,7 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
vl.EnsureCorrectWritables(&v)
}
func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
glog.Infof("removing volume info: %+v from %v", v, dn.id)
glog.V(1).Infof("removing volume info: %+v from %v", v, dn.id)
if v.ReplicaPlacement.GetCopyCount() > 1 {
stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0)
}

Loading…
Cancel
Save