Browse Source

filer store wrapper can implement the logic to filter by prefi

pull/1431/head
Konstantin Lebedev 4 years ago
parent
commit
2ea638f865
  1. 2
      weed/filer2/abstract_sql/abstract_sql_store.go
  2. 36
      weed/filer2/cassandra/cassandra_store.go
  3. 34
      weed/filer2/etcd/etcd_store.go
  4. 44
      weed/filer2/filerstore.go
  5. 36
      weed/filer2/leveldb/leveldb_store.go
  6. 40
      weed/filer2/leveldb2/leveldb2_store.go
  7. 35
      weed/filer2/mongodb/mongodb_store.go
  8. 34
      weed/filer2/redis/universal_redis_store.go
  9. 35
      weed/filer2/redis2/universal_redis_store.go

2
weed/filer2/abstract_sql/abstract_sql_store.go

@ -182,9 +182,9 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
return entries, nil return entries, nil
} }
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "") return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
} }
func (store *AbstractSqlStore) Shutdown() { func (store *AbstractSqlStore) Shutdown() {

36
weed/filer2/cassandra/cassandra_store.go

@ -3,8 +3,6 @@ package cassandra
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"github.com/gocql/gocql" "github.com/gocql/gocql"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
@ -128,39 +126,7 @@ func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath
} }
func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
var lastFileName string
for count < limit {
for _, entry := range notPrefixed {
lastFileName = entry.Name()
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, lastFileName, inclusive, limit)
if err != nil {
return nil, err
}
if len(notPrefixed) == 0 {
break
}
}
return entries, nil
return nil, fmt.Errorf("UNSUPPORTED")
} }
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,

34
weed/filer2/etcd/etcd_store.go

@ -136,39 +136,7 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_
} }
func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
var lastFileName string
for count < limit {
for _, entry := range notPrefixed {
lastFileName = entry.Name()
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, lastFileName, inclusive, limit)
if err != nil {
return nil, err
}
if len(notPrefixed) == 0 {
break
}
}
return entries, nil
return nil, fmt.Errorf("UNSUPPORTED")
} }
func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {

44
weed/filer2/filerstore.go

@ -2,6 +2,8 @@ package filer2
import ( import (
"context" "context"
"fmt"
"strings"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@ -22,6 +24,7 @@ type FilerStore interface {
DeleteFolderChildren(context.Context, util.FullPath) (err error) DeleteFolderChildren(context.Context, util.FullPath) (err error)
ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
ListDirectoryUnSupPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error)
BeginTransaction(ctx context.Context) (context.Context, error) BeginTransaction(ctx context.Context) (context.Context, error)
CommitTransaction(ctx context.Context) error CommitTransaction(ctx context.Context) error
@ -136,8 +139,10 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
defer func() { defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds()) stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
}() }()
entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix) entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
if err == fmt.Errorf("UNSUPPORTED") {
entries, err = fsw.ListDirectoryUnSupPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -147,6 +152,43 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
return entries, nil return entries, nil
} }
func (fsw *FilerStoreWrapper) ListDirectoryUnSupPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
count := 0
notPrefixed, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
var lastFileName string
for count < limit {
for _, entry := range notPrefixed {
lastFileName = entry.Name()
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, includeStartFile, limit)
if err != nil {
return nil, err
}
if len(notPrefixed) == 0 {
break
}
}
return entries, nil
}
func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) { func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
return fsw.ActualStore.BeginTransaction(ctx) return fsw.ActualStore.BeginTransaction(ctx)
} }

36
weed/filer2/leveldb/leveldb_store.go

@ -4,8 +4,6 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"strings"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
@ -161,39 +159,7 @@ func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
} }
func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
var lastFileName string
for count < limit {
for _, entry := range notPrefixed {
lastFileName = entry.Name()
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, lastFileName, inclusive, limit)
if err != nil {
return nil, err
}
if len(notPrefixed) == 0 {
break
}
}
return entries, nil
return nil, fmt.Errorf("UNSUPPORTED")
} }
func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,

40
weed/filer2/leveldb2/leveldb2_store.go

@ -5,14 +5,12 @@ import (
"context" "context"
"crypto/md5" "crypto/md5"
"fmt" "fmt"
"io"
"os"
"strings"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util" leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
"io"
"os"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
@ -170,39 +168,7 @@ func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath w
} }
func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
var lastFileName string
for count < limit {
for _, entry := range notPrefixed {
lastFileName = entry.Name()
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, lastFileName, inclusive, limit)
if err != nil {
return nil, err
}
if len(notPrefixed) == 0 {
break
}
}
return entries, nil
return nil, fmt.Errorf("UNSUPPORTED")
} }
func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,

35
weed/filer2/mongodb/mongodb_store.go

@ -11,7 +11,6 @@ import (
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx" "go.mongodb.org/mongo-driver/x/bsonx"
"strings"
"time" "time"
) )
@ -169,39 +168,7 @@ func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath ut
} }
func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
var lastFileName string
for count < limit {
for _, entry := range notPrefixed {
lastFileName = entry.Name()
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, lastFileName, inclusive, limit)
if err != nil {
return nil, err
}
if len(notPrefixed) == 0 {
break
}
}
return entries, nil
return nil, fmt.Errorf("UNSUPPORTED")
} }
func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {

34
weed/filer2/redis/universal_redis_store.go

@ -122,39 +122,7 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full
} }
func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
var lastFileName string
for count < limit {
for _, entry := range notPrefixed {
lastFileName = entry.Name()
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, lastFileName, inclusive, limit)
if err != nil {
return nil, err
}
if len(notPrefixed) == 0 {
break
}
}
return entries, nil
return nil, fmt.Errorf("UNSUPPORTED")
} }
func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,

35
weed/filer2/redis2/universal_redis_store.go

@ -3,7 +3,6 @@ package redis2
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/go-redis/redis" "github.com/go-redis/redis"
@ -118,39 +117,7 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful
} }
func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) { func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer2.Entry, err error) {
count := 0
notPrefixed, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
if err != nil {
return nil, err
}
if prefix == "" {
return notPrefixed, nil
}
var lastFileName string
for count < limit {
for _, entry := range notPrefixed {
lastFileName = entry.Name()
if strings.HasPrefix(entry.Name(), prefix) {
count++
entries = append(entries, entry)
}
}
if count >= limit {
break
}
notPrefixed, err = store.ListDirectoryEntries(ctx, fullpath, lastFileName, inclusive, limit)
if err != nil {
return nil, err
}
if len(notPrefixed) == 0 {
break
}
}
return entries, nil
return nil, fmt.Errorf("UNSUPPORTED")
} }
func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,

Loading…
Cancel
Save