diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index bd1ac9734..f5dcf6e03 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -172,7 +172,7 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat return nil } -func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) { +func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { sqlText := store.SqlListExclusive if includeStartFile { sqlText = store.SqlListInclusive @@ -180,7 +180,7 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(dirPath)), startFileName, string(dirPath), prefix+"%", limit+1) if err != nil { - return nil, false, fmt.Errorf("list %s : %v", dirPath, err) + return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) } defer rows.Close() @@ -189,30 +189,29 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, var data []byte if err = rows.Scan(&name, &data); err != nil { glog.V(0).Infof("scan %s : %v", dirPath, err) - return nil, false, fmt.Errorf("scan %s: %v", dirPath, err) + return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err) } + lastFileName = name entry := &filer.Entry{ FullPath: util.NewFullPath(string(dirPath), name), } if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err) - return nil, false, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) + return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) } - entries = append(entries, entry) - } + if !eachEntryFunc(entry) { + break + } - hasMore = int64(len(entries)) == limit+1 - if hasMore { - entries = entries[:limit] } - return entries, hasMore, nil + return lastFileName, nil } -func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) { - return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "") +func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil) } func (store *AbstractSqlStore) Shutdown() { diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go index 06fb3af46..fd2ce91a6 100644 --- a/weed/filer/cassandra/cassandra_store.go +++ b/weed/filer/cassandra/cassandra_store.go @@ -168,11 +168,11 @@ func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath return nil } -func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) { - return nil, false, filer.ErrUnsupportedListDirectoryPrefixed +func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed } -func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) { +func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { if _, ok := store.isSuperLargeDirectory(string(dirPath)); ok { return // nil, filer.ErrUnsupportedSuperLargeDirectoryListing @@ -190,23 +190,21 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath u entry := &filer.Entry{ FullPath: util.NewFullPath(string(dirPath), name), } + lastFileName = name if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil { err = decodeErr glog.V(0).Infof("list %s : %v", entry.FullPath, err) break } - entries = append(entries, entry) + if !eachEntryFunc(entry) { + break + } } if err := iter.Close(); err != nil { glog.V(0).Infof("list iterator close: %v", err) } - hasMore = int64(len(entries)) == limit+1 - if hasMore { - entries = entries[:limit] - } - - return entries, hasMore, err + return lastFileName, err } func (store *CassandraStore) Shutdown() { diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index 0e055e1fc..1e7f55599 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -96,8 +96,8 @@ func (store *ElasticStore) RollbackTransaction(ctx context.Context) error { return nil } -func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) { - return nil, false, filer.ErrUnsupportedListDirectoryPrefixed +func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed } func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { @@ -187,26 +187,28 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e } func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { - if entries, _, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil { - for _, entry := range entries { - store.DeleteEntry(ctx, entry.FullPath) + _, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool { + if err := store.DeleteEntry(ctx, entry.FullPath); err != nil { + glog.Errorf("elastic delete %s: %v.", entry.FullPath, err) + return false } - } - return nil + return true + }) + return } -func (store *ElasticStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) { +func (store *ElasticStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { if string(dirPath) == "/" { - return store.listRootDirectoryEntries(ctx, startFileName, includeStartFile, limit) + return store.listRootDirectoryEntries(ctx, startFileName, includeStartFile, limit, eachEntryFunc) } - return store.listDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) + return store.listDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc) } -func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) { +func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { indexResult, err := store.client.CatIndices().Do(ctx) if err != nil { glog.Errorf("list indices %v.", err) - return entries, false, err + return } for _, index := range indexResult { if index.Index == indexKV { @@ -216,32 +218,33 @@ func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFi if entry, err := store.FindEntry(ctx, weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil { fileName := getFileName(entry.FullPath) + lastFileName = fileName if fileName == startFileName && !inclusive { continue } limit-- if limit < 0 { - hasMore = true break } - entries = append(entries, entry) + if !eachEntryFunc(entry) { + break + } } } } - return entries, hasMore, nil + return } func (store *ElasticStore) listDirectoryEntries( - ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int64, -) (entries []*filer.Entry, hasMore bool, err error) { + ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { first := true index := getIndex(fullpath) nextStart := "" parentId := weed_util.Md5String([]byte(fullpath)) - if _, err := store.client.Refresh(index).Do(ctx); err != nil { + if _, err = store.client.Refresh(index).Do(ctx); err != nil { if elastic.IsNotFound(err) { store.client.CreateIndex(index).Do(ctx) - return entries, hasMore, nil + return } } for { @@ -249,7 +252,7 @@ func (store *ElasticStore) listDirectoryEntries( if (startFileName == "" && first) || inclusive { if result, err = store.search(ctx, index, parentId); err != nil { glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) - return entries, hasMore, err + return } } else { fullPath := string(fullpath) + "/" + startFileName @@ -259,7 +262,7 @@ func (store *ElasticStore) listDirectoryEntries( after := weed_util.Md5String([]byte(fullPath)) if result, err = store.searchAfter(ctx, index, parentId, after); err != nil { glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) - return entries, hasMore, err + return } } first = false @@ -271,22 +274,21 @@ func (store *ElasticStore) listDirectoryEntries( if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil { limit-- if limit < 0 { - hasMore = true - return entries, hasMore, nil + return lastFileName, nil } nextStart = string(esEntry.Entry.FullPath) fileName := getFileName(esEntry.Entry.FullPath) + lastFileName = fileName if fileName == startFileName && !inclusive { continue } - entries = append(entries, esEntry.Entry) + if !eachEntryFunc(esEntry.Entry) { + break + } } } - if len(result.Hits.Hits) < store.maxPageSize { - break - } } - return entries, hasMore, nil + return } func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) { diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go index b7acc2049..8159c634d 100644 --- a/weed/filer/etcd/etcd_store.go +++ b/weed/filer/etcd/etcd_store.go @@ -139,17 +139,17 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_ return nil } -func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) { - return nil, false, filer.ErrUnsupportedListDirectoryPrefixed +func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed } -func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) { +func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { directoryPrefix := genDirectoryKeyPrefix(dirPath, "") resp, err := store.client.Get(ctx, string(directoryPrefix), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) if err != nil { - return nil, false, fmt.Errorf("list %s : %v", dirPath, err) + return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) } for _, kv := range resp.Kvs { @@ -160,9 +160,9 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_u if fileName == startFileName && !includeStartFile { continue } + lastFileName = fileName limit-- if limit < 0 { - hasMore = true break } entry := &filer.Entry{ @@ -173,10 +173,12 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_u glog.V(0).Infof("list %s : %v", entry.FullPath, err) break } - entries = append(entries, entry) + if !eachEntryFunc(entry) { + break + } } - return entries, hasMore, err + return lastFileName, err } func genKey(dirPath, fileName string) (key []byte) { diff --git a/weed/filer/filer.go b/weed/filer/filer.go index dd0fcf2cf..e59887763 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -281,22 +281,19 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e } -func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string) (entries []*Entry, hasMore bool, expiredCount int64, lastFileName string, err error) { - listedEntries, listHasMore, listErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix) - hasMore = listHasMore - if listErr != nil { - return listedEntries, hasMore, expiredCount, "", listErr - } - for _, entry := range listedEntries { - lastFileName = entry.Name() +func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) { + lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool { if entry.TtlSec > 0 { if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { f.Store.DeleteOneEntry(ctx, entry) expiredCount++ - continue + return true } } - entries = append(entries, entry) + return eachEntryFunc(entry) + }) + if err != nil { + return expiredCount, lastFileName, err } return } diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go index 0fa57dd06..8c9688ceb 100644 --- a/weed/filer/filer_search.go +++ b/weed/filer/filer_search.go @@ -32,49 +32,76 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start var missedCount int64 var lastFileName string - entries, hasMore, missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit, prefix, restNamePattern) + missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit+1, prefix, restNamePattern, func(entry *Entry) bool { + entries = append(entries, entry) + return true + }) for missedCount > 0 && err == nil { - var makeupEntries []*Entry - makeupEntries, hasMore, missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount, prefix, restNamePattern) - for _, entry := range makeupEntries { + missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount+1, prefix, restNamePattern, func(entry *Entry) bool { entries = append(entries, entry) - } + return true + }) + } + + hasMore = int64(len(entries)) >= limit+1 + if hasMore { + entries = entries[:limit] } return entries, hasMore, err } -func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix, restNamePattern string) (matchedEntries []*Entry, hasMore bool, missedCount int64, lastFileName string, err error) { - var foundEntries []*Entry +// For now, prefix and namePattern are mutually exclusive +func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) { + if strings.HasSuffix(string(p), "/") && len(p) > 1 { + p = p[0 : len(p)-1] + } - foundEntries, hasMore, lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix) - if err != nil { - return + prefixInNamePattern, restNamePattern := splitPattern(namePattern) + if prefixInNamePattern != "" { + prefix = prefixInNamePattern + } + var missedCount int64 + + missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit, prefix, restNamePattern, eachEntryFunc) + + for missedCount > 0 && err == nil { + missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount, prefix, restNamePattern, eachEntryFunc) } + + return +} + +func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix, restNamePattern string, eachEntryFunc ListEachEntryFunc) (missedCount int64, lastFileName string, err error) { + if len(restNamePattern) == 0 { - return foundEntries, false, 0, lastFileName, nil + lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, eachEntryFunc) + return 0, lastFileName, err } - for _, entry := range foundEntries { + + lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool { nameToTest := strings.ToLower(entry.Name()) if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && matched { - matchedEntries = append(matchedEntries, entry) + if !eachEntryFunc(entry) { + return false + } } else { missedCount++ } + return true + }) + if err != nil { + return } return } -func (f *Filer) doListValidEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string) (entries []*Entry, hasMore bool, lastFileName string, err error) { - var makeupEntries []*Entry +func (f *Filer) doListValidEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) { var expiredCount int64 - entries, hasMore, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix) + expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix, eachEntryFunc) for expiredCount > 0 && err == nil { - makeupEntries, hasMore, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix) - if err == nil { - entries = append(entries, makeupEntries...) - } + expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix, eachEntryFunc) } return } diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index a28c4d01b..8955a25c7 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -13,6 +13,8 @@ var ( ErrKvNotFound = errors.New("kv: not found") ) +type ListEachEntryFunc func(entry *Entry) bool + type FilerStore interface { // GetName gets the name to locate the configuration in filer.toml file GetName() string @@ -24,8 +26,8 @@ type FilerStore interface { FindEntry(context.Context, util.FullPath) (entry *Entry, err error) DeleteEntry(context.Context, util.FullPath) (err error) DeleteFolderChildren(context.Context, util.FullPath) (err error) - ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) ([]*Entry, bool, error) - ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) ([]*Entry, bool, error) + ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) + ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) BeginTransaction(ctx context.Context) (context.Context, error) CommitTransaction(ctx context.Context) error diff --git a/weed/filer/filerstore_translate_path.go b/weed/filer/filerstore_translate_path.go index 0e2ddd914..00bf82ed4 100644 --- a/weed/filer/filerstore_translate_path.go +++ b/weed/filer/filerstore_translate_path.go @@ -106,32 +106,24 @@ func (t *FilerStorePathTranlator) DeleteFolderChildren(ctx context.Context, fp u return t.actualStore.DeleteFolderChildren(ctx, newFullPath) } -func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) ([]*Entry, bool, error) { +func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) { newFullPath := t.translatePath(dirPath) - entries, hasMore, err := t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit) - if err != nil { - return nil, hasMore, err - } - for _, entry := range entries { + return t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit, func(entry *Entry) bool { entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath - } - return entries, hasMore, err + return eachEntryFunc(entry) + }) } -func (t *FilerStorePathTranlator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) ([]*Entry, bool, error) { +func (t *FilerStorePathTranlator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) { newFullPath := t.translatePath(dirPath) - entries, hasMore, err := t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix) - if err != nil { - return nil, hasMore, err - } - for _, entry := range entries { + return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool { entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath - } - return entries, hasMore, nil + return eachEntryFunc(entry) + }) } func (t *FilerStorePathTranlator) BeginTransaction(ctx context.Context) (context.Context, error) { diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index 2bb7793f0..64baac371 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -194,7 +194,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util. return actualStore.DeleteFolderChildren(ctx, fp) } -func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) ([]*Entry, bool, error) { +func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) { actualStore := fsw.getActualStore(dirPath + "/") stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc() start := time.Now() @@ -203,18 +203,14 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath }() glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit) - entries, hasMore, err := actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) - if err != nil { - return nil, hasMore, err - } - for _, entry := range entries { + return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool { fsw.maybeReadHardLink(ctx, entry) filer_pb.AfterEntryDeserialization(entry.Chunks) - } - return entries, hasMore, err + return eachEntryFunc(entry) + }) } -func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) ([]*Entry, bool, error) { +func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) { actualStore := fsw.getActualStore(dirPath + "/") stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc() start := time.Now() @@ -222,48 +218,52 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds()) }() glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit) - entries, hasMore, err := actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix) + lastFileName, err = actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, eachEntryFunc) if err == ErrUnsupportedListDirectoryPrefixed { - entries, hasMore, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix) - } - if err != nil { - return nil, hasMore, err - } - for _, entry := range entries { - fsw.maybeReadHardLink(ctx, entry) - filer_pb.AfterEntryDeserialization(entry.Chunks) + lastFileName, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool { + fsw.maybeReadHardLink(ctx, entry) + filer_pb.AfterEntryDeserialization(entry.Chunks) + return eachEntryFunc(entry) + }) } - return entries, hasMore, nil + return lastFileName, err } -func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*Entry, hasMore bool, err error) { +func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) { actualStore := fsw.getActualStore(dirPath + "/") - entries, hasMore, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) - if err != nil { - return nil, hasMore, err - } if prefix == "" { + return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc) + } + + var notPrefixed []*Entry + lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool { + notPrefixed = append(notPrefixed, entry) + return true + }) + if err != nil { return } count := int64(0) - var lastFileName string - notPrefixed := entries - entries = nil for count < limit && len(notPrefixed) > 0 { for _, entry := range notPrefixed { - lastFileName = entry.Name() if strings.HasPrefix(entry.Name(), prefix) { count++ - entries = append(entries, entry) + if !eachEntryFunc(entry) { + return + } if count >= limit { break } } } if count < limit { - notPrefixed, hasMore, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit) + notPrefixed = notPrefixed[:0] + _, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool { + notPrefixed = append(notPrefixed, entry) + return true + }) if err != nil { return } diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go index c93374b15..2e4491515 100644 --- a/weed/filer/hbase/hbase_store.go +++ b/weed/filer/hbase/hbase_store.go @@ -148,20 +148,18 @@ func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.Ful return } -func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) ([]*filer.Entry, bool, error) { - return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "") +func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) } -func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) ([]*filer.Entry, bool, error) { +func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} expectedPrefix := []byte(dirPath.Child(prefix)) scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family)) if err != nil { - return nil, false, err + return lastFileName, err } - var hasMore bool - var entries []*filer.Entry scanner := store.Client.Scan(scan) defer scanner.Close() for { @@ -170,7 +168,7 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa break } if err != nil { - return entries, hasMore, err + return lastFileName, err } if len(res.Cells) == 0 { continue @@ -187,6 +185,8 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa continue } + lastFileName = fileName + value := cell.Value if fileName == startFileName && !includeStartFile { @@ -195,7 +195,6 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa limit-- if limit < 0 { - hasMore = true break } entry := &filer.Entry{ @@ -206,10 +205,12 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa glog.V(0).Infof("list %s : %v", entry.FullPath, err) break } - entries = append(entries, entry) + if !eachEntryFunc(entry) { + break + } } - return entries, hasMore, nil + return lastFileName, nil } func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) { diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go index c968ca2da..f0ae64769 100644 --- a/weed/filer/leveldb/leveldb_store.go +++ b/weed/filer/leveldb/leveldb_store.go @@ -162,11 +162,11 @@ func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath we return nil } -func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) { - return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "") +func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) } -func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) { +func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix) lastFileStart := directoryPrefix @@ -187,9 +187,9 @@ func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir if fileName == startFileName && !includeStartFile { continue } + lastFileName = fileName limit-- if limit < 0 { - hasMore = true break } entry := &filer.Entry{ @@ -200,11 +200,13 @@ func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir glog.V(0).Infof("list %s : %v", entry.FullPath, err) break } - entries = append(entries, entry) + if !eachEntryFunc(entry) { + break + } } iter.Release() - return entries, hasMore, err + return lastFileName, err } func genKey(dirPath, fileName string) (key []byte) { diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go index cc0d67eac..965721460 100644 --- a/weed/filer/leveldb2/leveldb2_store.go +++ b/weed/filer/leveldb2/leveldb2_store.go @@ -171,11 +171,11 @@ func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath w return nil } -func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) { - return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "") +func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) } -func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) { +func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { directoryPrefix, partitionId := genDirectoryKeyPrefix(dirPath, prefix, store.dbCount) lastFileStart := directoryPrefix @@ -196,9 +196,9 @@ func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, di if fileName == startFileName && !includeStartFile { continue } + lastFileName = fileName limit-- if limit < 0 { - hasMore = true break } entry := &filer.Entry{ @@ -211,11 +211,13 @@ func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, di glog.V(0).Infof("list %s : %v", entry.FullPath, err) break } - entries = append(entries, entry) + if !eachEntryFunc(entry) { + break + } } iter.Release() - return entries, hasMore, err + return lastFileName, err } func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int) { diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go index afbcabaa6..24e00edc7 100644 --- a/weed/filer/leveldb3/leveldb3_store.go +++ b/weed/filer/leveldb3/leveldb3_store.go @@ -286,15 +286,15 @@ func (store *LevelDB3Store) DeleteFolderChildren(ctx context.Context, fullpath w return nil } -func (store *LevelDB3Store) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) { - return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "") +func (store *LevelDB3Store) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) } -func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) { +func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { db, _, shortPath, err := store.findDB(dirPath, true) if err != nil { - return nil, false, fmt.Errorf("findDB %s : %v", dirPath, err) + return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err) } directoryPrefix := genDirectoryKeyPrefix(shortPath, prefix) @@ -316,9 +316,9 @@ func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, di if fileName == startFileName && !includeStartFile { continue } + lastFileName = fileName limit-- if limit < 0 { - hasMore = true break } entry := &filer.Entry{ @@ -331,11 +331,13 @@ func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, di glog.V(0).Infof("list %s : %v", entry.FullPath, err) break } - entries = append(entries, entry) + if !eachEntryFunc(entry) { + break + } } iter.Release() - return entries, hasMore, err + return lastFileName, err } func genKey(dirPath, fileName string) (key []byte) { diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go index 2e9556f1f..1ef5056f4 100644 --- a/weed/filer/mongodb/mongodb_store.go +++ b/weed/filer/mongodb/mongodb_store.go @@ -178,11 +178,11 @@ func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath ut return nil } -func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) { - return nil, false, filer.ErrUnsupportedListDirectoryPrefixed +func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed } -func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) { +func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { var where = bson.M{"directory": string(dirPath), "name": bson.M{"$gt": startFileName}} if includeStartFile { @@ -190,38 +190,37 @@ func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath uti "$gte": startFileName, } } - optLimit := int64(limit + 1) + optLimit := int64(limit) opts := &options.FindOptions{Limit: &optLimit, Sort: bson.M{"name": 1}} cur, err := store.connect.Database(store.database).Collection(store.collectionName).Find(ctx, where, opts) for cur.Next(ctx) { var data Model err := cur.Decode(&data) if err != nil && err != mongo.ErrNoDocuments { - return nil, false, err + return lastFileName, err } entry := &filer.Entry{ FullPath: util.NewFullPath(string(dirPath), data.Name), } + lastFileName = data.Name if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta)); decodeErr != nil { err = decodeErr glog.V(0).Infof("list %s : %v", entry.FullPath, err) break } - entries = append(entries, entry) - } + if !eachEntryFunc(entry) { + break + } - hasMore = int64(len(entries)) == limit+1 - if hasMore { - entries = entries[:limit] } if err := cur.Close(ctx); err != nil { glog.V(0).Infof("list iterator close: %v", err) } - return entries, hasMore, err + return lastFileName, err } func (store *MongodbStore) Shutdown() { diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go index 8399b4e99..30d11a7f4 100644 --- a/weed/filer/redis/universal_redis_store.go +++ b/weed/filer/redis/universal_redis_store.go @@ -125,16 +125,16 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full return nil } -func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) { - return nil, false, filer.ErrUnsupportedListDirectoryPrefixed +func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed } -func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) { +func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { dirListKey := genDirectoryListKey(string(dirPath)) members, err := store.Client.SMembers(ctx, dirListKey).Result() if err != nil { - return nil, false, fmt.Errorf("list %s : %v", dirPath, err) + return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) } // skip @@ -160,15 +160,15 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirP }) // limit - if limit < int64(len(entries)) { + if limit < int64(len(members)) { members = members[:limit] - hasMore = true } // fetch entry meta for _, fileName := range members { path := util.NewFullPath(string(dirPath), fileName) entry, err := store.FindEntry(ctx, path) + lastFileName = fileName if err != nil { glog.V(0).Infof("list %s : %v", path, err) if err == filer_pb.ErrNotFound { @@ -182,11 +182,13 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirP continue } } - entries = append(entries, entry) + if !eachEntryFunc(entry) { + break + } } } - return entries, hasMore, err + return lastFileName, err } func genDirectoryListKey(dir string) (dirList string) { diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go index 7b4e9d325..aab3d1f4a 100644 --- a/weed/filer/redis2/universal_redis_store.go +++ b/weed/filer/redis2/universal_redis_store.go @@ -149,11 +149,11 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful return nil } -func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) { - return nil, false, filer.ErrUnsupportedListDirectoryPrefixed +func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed } -func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) { +func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { dirListKey := genDirectoryListKey(string(dirPath)) start := int64(0) @@ -163,20 +163,16 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dir start++ } } - members, err := store.Client.ZRange(ctx, dirListKey, start, start+int64(limit)-1+1).Result() + members, err := store.Client.ZRange(ctx, dirListKey, start, start+int64(limit)-1).Result() if err != nil { - return nil, false, fmt.Errorf("list %s : %v", dirPath, err) - } - - hasMore = int64(len(entries)) == limit+1 - if hasMore { - members = members[:len(members)-1] + return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) } // fetch entry meta for _, fileName := range members { path := util.NewFullPath(string(dirPath), fileName) entry, err := store.FindEntry(ctx, path) + lastFileName = fileName if err != nil { glog.V(0).Infof("list %s : %v", path, err) if err == filer_pb.ErrNotFound { @@ -190,11 +186,13 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dir continue } } - entries = append(entries, entry) + if !eachEntryFunc(entry) { + break + } } } - return entries, hasMore, err + return lastFileName, err } func genDirectoryListKey(dir string) (dirList string) { diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go index 98023e82e..70c301725 100644 --- a/weed/filer/rocksdb/rocksdb_store.go +++ b/weed/filer/rocksdb/rocksdb_store.go @@ -158,7 +158,7 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we iter := store.db.NewIterator(ro) defer iter.Close() - _, err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool { + err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool { batch.Delete(key) return true }) @@ -175,7 +175,7 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we return nil } -func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int64, fn func(key, value []byte) bool) (hasMore bool, err error) { +func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int64, fn func(key, value []byte) bool) (err error) { if len(lastKey) == 0 { iter.Seek(prefix) @@ -196,7 +196,6 @@ func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey if limit > 0 { i++ if i > limit { - hasMore = true break } } @@ -216,16 +215,16 @@ func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey } if err := iter.Err(); err != nil { - return hasMore, fmt.Errorf("prefix scan iterator: %v", err) + return fmt.Errorf("prefix scan iterator: %v", err) } - return hasMore, nil + return nil } -func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) { - return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "") +func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) } -func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) { +func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix) lastFileStart := directoryPrefix @@ -239,7 +238,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir iter := store.db.NewIterator(ro) defer iter.Close() - hasMore, err = enumerate(iter, directoryPrefix, lastFileStart, includeStartFile, limit, func(key, value []byte) bool { + err = enumerate(iter, directoryPrefix, lastFileStart, includeStartFile, limit, func(key, value []byte) bool { fileName := getNameFromKey(key) if fileName == "" { return true @@ -247,6 +246,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir entry := &filer.Entry{ FullPath: weed_util.NewFullPath(string(dirPath), fileName), } + lastFileName = fileName // println("list", entry.FullPath, "chunks", len(entry.Chunks)) if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil { @@ -254,14 +254,16 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir glog.V(0).Infof("list %s : %v", entry.FullPath, err) return false } - entries = append(entries, entry) + if !eachEntryFunc(entry) { + return false + } return true }) if err != nil { - return entries, false, fmt.Errorf("prefix list %s : %v", dirPath, err) + return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err) } - return entries, false, err + return lastFileName, err } func genKey(dirPath, fileName string) (key []byte) { diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index f2fbd99d3..2cc332635 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -319,14 +319,14 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) return nil, fuse.EIO } - listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int64(math.MaxInt32)) + listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool { + processEachEntryFn(entry.ToProtoEntry(), false) + return true + }) if listErr != nil { glog.Errorf("list meta cache: %v", listErr) return nil, fuse.EIO } - for _, cachedEntry := range listedEntries { - processEachEntryFn(cachedEntry.ToProtoEntry(), false) - } return } diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index c16c4a938..f4e4d7d6e 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -117,22 +117,22 @@ func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err err return mc.localStore.DeleteEntry(ctx, fp) } -func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) ([]*filer.Entry, error) { +func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error { mc.RLock() defer mc.RUnlock() if !mc.visitedBoundary.HasVisited(dirPath) { - return nil, fmt.Errorf("unsynchronized dir: %v", dirPath) + return fmt.Errorf("unsynchronized dir: %v", dirPath) } - entries, _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) - if err != nil { - return nil, err - } - for _, entry := range entries { + _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool { mc.mapIdFromFilerToLocal(entry) + return eachEntryFunc(entry) + }) + if err != nil { + return err } - return entries, err + return err } func (mc *MetaCache) Shutdown() { diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 5cdf44e96..b0563d8bd 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -44,7 +44,7 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L }, nil } -func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error { +func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) (err error) { glog.V(4).Infof("ListEntries %v", req) @@ -60,23 +60,12 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file lastFileName := req.StartFromFileName includeLastFile := req.InclusiveStartFrom + var listErr error for limit > 0 { - entries, hasMore, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "") - - if err != nil { - return err - } - if len(entries) == 0 { - return nil - } - - includeLastFile = false - - for _, entry := range entries { - - lastFileName = entry.Name() - - if err := stream.Send(&filer_pb.ListEntriesResponse{ + var hasEntries bool + lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", func(entry *filer.Entry) bool { + hasEntries = true + if err = stream.Send(&filer_pb.ListEntriesResponse{ Entry: &filer_pb.Entry{ Name: entry.Name(), IsDirectory: entry.IsDirectory(), @@ -88,19 +77,28 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file Content: entry.Content, }, }); err != nil { - return err + return false } limit-- if limit == 0 { - return nil + return false } - } + return true + }) - if !hasMore { - break + if listErr != nil { + return listErr + } + if err != nil { + return err + } + if !hasEntries { + return nil } + includeLastFile = false + } return nil