Browse Source

test ListDirectoryPrefixedEntries

pull/1431/head
Устюжанин Антон Александрович 5 years ago
parent
commit
33a9e5e2d1
  1. 69
      weed/filer2/abstract_sql/abstract_sql_store.go
  2. 31
      weed/filer2/cassandra/cassandra_store.go
  3. 34
      weed/filer2/etcd/etcd_store.go
  4. 10
      weed/filer2/filer.go
  5. 5
      weed/filer2/filerstore.go
  6. 31
      weed/filer2/leveldb/leveldb_store.go
  7. 31
      weed/filer2/leveldb2/leveldb2_store.go
  8. 31
      weed/filer2/mongodb/mongodb_store.go
  9. 2
      weed/filer2/mysql/mysql_store.go
  10. 30
      weed/filer2/redis/universal_redis_store.go
  11. 31
      weed/filer2/redis2/universal_redis_store.go
  12. 9
      weed/server/filer_grpc_server.go

69
weed/filer2/abstract_sql/abstract_sql_store.go

@ -150,8 +150,75 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
return nil return nil
} }
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
//func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
// sqlText := store.SqlListExclusive
// if inclusive {
// sqlText = store.SqlListInclusive
// }
//
// rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), prefix, limit)
// if err != nil {
// return nil, fmt.Errorf("list %s : %v", fullpath, err)
// }
// defer rows.Close()
//
// for rows.Next() {
// var name string
// var data []byte
// if err = rows.Scan(&name, &data); err != nil {
// glog.V(0).Infof("scan %s : %v", fullpath, err)
// return nil, fmt.Errorf("scan %s: %v", fullpath, err)
// }
//
// entry := &filer2.Entry{
// FullPath: util.NewFullPath(string(fullpath), name),
// }
// if err = entry.DecodeAttributesAndChunks(data); err != nil {
// glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
// return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
// }
//
// entries = append(entries, entry)
// }
//
// return entries, nil
//}
//func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
// return nil, fmt.Errorf("not implemented")
//
//}
func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
for count < limit {
for _, entry := range notPrefixed {
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
}
return entries, nil
}
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
sqlText := store.SqlListExclusive sqlText := store.SqlListExclusive
if inclusive { if inclusive {
sqlText = store.SqlListInclusive sqlText = store.SqlListInclusive

31
weed/filer2/cassandra/cassandra_store.go

@ -3,6 +3,7 @@ package cassandra
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"github.com/gocql/gocql" "github.com/gocql/gocql"
@ -126,6 +127,36 @@ func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath
return nil return nil
} }
func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
for count < limit {
for _, entry := range notPrefixed {
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
}
return entries, nil
}
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) { limit int) (entries []*filer2.Entry, err error) {

34
weed/filer2/etcd/etcd_store.go

@ -135,9 +135,37 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_
return nil return nil
} }
func (store *EtcdStore) ListDirectoryEntries(
ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
) (entries []*filer2.Entry, err error) {
func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
for count < limit {
for _, entry := range notPrefixed {
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
}
return entries, nil
}
func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "") directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
resp, err := store.client.Get(ctx, string(directoryPrefix), resp, err := store.client.Get(ctx, string(directoryPrefix),

10
weed/filer2/filer.go

@ -259,15 +259,15 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
} }
func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) ([]*Entry, error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 { if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1] p = p[0 : len(p)-1]
} }
var makeupEntries []*Entry var makeupEntries []*Entry
entries, expiredCount, lastFileName, err := f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
entries, expiredCount, lastFileName, err := f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix)
for expiredCount > 0 && err == nil { for expiredCount > 0 && err == nil {
makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount)
makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix)
if err == nil { if err == nil {
entries = append(entries, makeupEntries...) entries = append(entries, makeupEntries...)
} }
@ -276,8 +276,8 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start
return entries, err return entries, err
} }
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, lastFileName string, err error) {
listedEntries, listErr := f.Store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*Entry, expiredCount int, lastFileName string, err error) {
listedEntries, listErr := f.Store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix)
if listErr != nil { if listErr != nil {
return listedEntries, expiredCount, "", listErr return listedEntries, expiredCount, "", listErr
} }

5
weed/filer2/filerstore.go

@ -21,6 +21,7 @@ type FilerStore interface {
DeleteEntry(context.Context, util.FullPath) (err error) DeleteEntry(context.Context, util.FullPath) (err error)
DeleteFolderChildren(context.Context, util.FullPath) (err error) DeleteFolderChildren(context.Context, util.FullPath) (err error)
ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
BeginTransaction(ctx context.Context) (context.Context, error) BeginTransaction(ctx context.Context) (context.Context, error)
CommitTransaction(ctx context.Context) error CommitTransaction(ctx context.Context) error
@ -112,14 +113,14 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.
return fsw.ActualStore.DeleteFolderChildren(ctx, fp) return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
} }
func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc() stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
start := time.Now() start := time.Now()
defer func() { defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds()) stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
}() }()
entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
if err != nil { if err != nil {
return nil, err return nil, err
} }

31
weed/filer2/leveldb/leveldb_store.go

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"strings"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/errors"
@ -159,6 +160,36 @@ func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
return nil return nil
} }
func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
for count < limit {
for _, entry := range notPrefixed {
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
}
return entries, nil
}
func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) { limit int) (entries []*filer2.Entry, err error) {

31
weed/filer2/leveldb2/leveldb2_store.go

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"strings"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/errors"
@ -168,6 +169,36 @@ func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath w
return nil return nil
} }
func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
for count < limit {
for _, entry := range notPrefixed {
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
}
return entries, nil
}
func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) { limit int) (entries []*filer2.Entry, err error) {

31
weed/filer2/mongodb/mongodb_store.go

@ -11,6 +11,7 @@ import (
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx" "go.mongodb.org/mongo-driver/x/bsonx"
"strings"
"time" "time"
) )
@ -167,6 +168,36 @@ func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath ut
return nil return nil
} }
func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
for count < limit {
for _, entry := range notPrefixed {
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
}
return entries, nil
}
func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
var where = bson.M{"directory": string(fullpath), "name": bson.M{"$gt": startFileName}} var where = bson.M{"directory": string(fullpath), "name": bson.M{"$gt": startFileName}}

2
weed/filer2/mysql/mysql_store.go

@ -41,7 +41,7 @@ func (store *MysqlStore) Initialize(configuration util.Configuration, prefix str
func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int, func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int,
interpolateParams bool) (err error) { interpolateParams bool) (err error) {
//AND name like CONCAT(?,'%')
store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)" store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)"
store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?" store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?"
store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?" store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?"

30
weed/filer2/redis/universal_redis_store.go

@ -121,6 +121,36 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full
return nil return nil
} }
func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
for count < limit {
for _, entry := range notPrefixed {
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
}
return entries, nil
}
func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) { limit int) (entries []*filer2.Entry, err error) {

31
weed/filer2/redis2/universal_redis_store.go

@ -3,6 +3,7 @@ package redis2
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/go-redis/redis" "github.com/go-redis/redis"
@ -116,6 +117,36 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful
return nil return nil
} }
func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
for count < limit {
for _, entry := range notPrefixed {
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
}
return entries, nil
}
func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) { limit int) (entries []*filer2.Entry, err error) {

9
weed/server/filer_grpc_server.go

@ -6,7 +6,6 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
@ -59,7 +58,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
lastFileName := req.StartFromFileName lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom includeLastFile := req.InclusiveStartFrom
for limit > 0 { for limit > 0 {
entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit)
entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit, req.Prefix)
if err != nil { if err != nil {
return err return err
@ -74,12 +73,6 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
lastFileName = entry.Name() lastFileName = entry.Name()
if req.Prefix != "" {
if !strings.HasPrefix(entry.Name(), req.Prefix) {
continue
}
}
if err := stream.Send(&filer_pb.ListEntriesResponse{ if err := stream.Send(&filer_pb.ListEntriesResponse{
Entry: &filer_pb.Entry{ Entry: &filer_pb.Entry{
Name: entry.Name(), Name: entry.Name(),

Loading…
Cancel
Save