Browse Source

fix scanNamed

pull/2996/head
Konstantin Lebedev 3 years ago
parent
commit
8342f651f3
  1. 20
      docker/compose/local-ydb-compose.yml
  2. 12
      weed/filer/ydb/ydb_queries.go
  3. 67
      weed/filer/ydb/ydb_store.go
  4. 5
      weed/filer/ydb/ydb_store_kv.go
  5. 4
      weed/filer/ydb/ydb_types.go

20
docker/compose/local-ydb-compose.yml

@ -12,7 +12,7 @@ services:
- GRPC_TLS_PORT=2135 - GRPC_TLS_PORT=2135
- GRPC_PORT=2136 - GRPC_PORT=2136
- MON_PORT=8765 - MON_PORT=8765
server:
s3:
image: chrislusf/seaweedfs:local image: chrislusf/seaweedfs:local
ports: ports:
- 9333:9333 - 9333:9333
@ -20,14 +20,16 @@ services:
- 8084:8080 - 8084:8080
- 18084:18080 - 18084:18080
- 8888:8888 - 8888:8888
- 8000:8000
- 18888:18888 - 18888:18888
command: "server -ip=server -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1"
command: "server -ip=s3 -filer -master.volumeSizeLimitMB=16 -volume.max=0 -volume -volume.preStopSeconds=1 -s3 -s3.config=/etc/seaweedfs/s3.json -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=false"
volumes: volumes:
- ./master-cloud.toml:/etc/seaweedfs/master.toml
- ./s3.json:/etc/seaweedfs/s3.json
environment: environment:
- WEED_LEVELDB2_ENABLED=false
- WEED_YDB_ENABLED=true
- WEED_YDB_DSN=grpc://ydb:2136/?database=local
- YDB_ANONYMOUS_CREDENTIALS=1
depends_on:
- ydb
WEED_LEVELDB2_ENABLED: "false"
WEED_YDB_ENABLED: "true"
WEED_YDB_DSN: "grpc://ydb:2136/?database=local"
WEED_YDB_PREFIX: "seaweedfs"
YDB_ANONYMOUS_CREDENTIALS: 1
WEED_MASTER_VOLUME_GROWTH_COPY_1: 1
WEED_MASTER_VOLUME_GROWTH_COPY_OTHER: 1

12
weed/filer/ydb/ydb_queries.go

@ -5,8 +5,8 @@ import asql "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
const ( const (
insertQuery = ` insertQuery = `
DECLARE $dir_hash AS int64; DECLARE $dir_hash AS int64;
DECLARE $name AS Utf8;
DECLARE $directory AS Utf8; DECLARE $directory AS Utf8;
DECLARE $name AS Utf8;
DECLARE $meta AS String; DECLARE $meta AS String;
UPSERT INTO ` + asql.DEFAULT_TABLE + ` UPSERT INTO ` + asql.DEFAULT_TABLE + `
@ -16,8 +16,8 @@ const (
updateQuery = ` updateQuery = `
DECLARE $dir_hash AS int64; DECLARE $dir_hash AS int64;
DECLARE $name AS Utf8;
DECLARE $directory AS Utf8; DECLARE $directory AS Utf8;
DECLARE $name AS Utf8;
DECLARE $meta AS String; DECLARE $meta AS String;
REPLACE INTO ` + asql.DEFAULT_TABLE + ` REPLACE INTO ` + asql.DEFAULT_TABLE + `
@ -31,7 +31,7 @@ const (
DECLARE $name AS Utf8; DECLARE $name AS Utf8;
DELETE FROM ` + asql.DEFAULT_TABLE + ` DELETE FROM ` + asql.DEFAULT_TABLE + `
WHERE dir_hash == $dir_hash AND name == $name;
WHERE dir_hash = $dir_hash AND name = $name;
COMMIT;` COMMIT;`
findQuery = ` findQuery = `
@ -40,14 +40,14 @@ const (
SELECT meta SELECT meta
FROM ` + asql.DEFAULT_TABLE + ` FROM ` + asql.DEFAULT_TABLE + `
WHERE dir_hash == $dir_hash AND name == $name;`
WHERE dir_hash = $dir_hash AND name = $name;`
deleteFolderChildrenQuery = ` deleteFolderChildrenQuery = `
DECLARE $dir_hash AS int64; DECLARE $dir_hash AS int64;
DECLARE $directory AS Utf8; DECLARE $directory AS Utf8;
DELETE FROM ` + asql.DEFAULT_TABLE + ` DELETE FROM ` + asql.DEFAULT_TABLE + `
WHERE dir_hash == $dir_hash AND directory == $directory;
WHERE dir_hash = $dir_hash AND directory = $directory;
COMMIT;` COMMIT;`
listDirectoryQuery = ` listDirectoryQuery = `
@ -59,6 +59,6 @@ const (
SELECT name, meta SELECT name, meta
FROM ` + asql.DEFAULT_TABLE + ` FROM ` + asql.DEFAULT_TABLE + `
WHERE dir_hash == $dir_hash AND directory == $directory and name %s $start_name and name LIKE $prefix
WHERE dir_hash = $dir_hash AND directory = $directory and name %s $start_name and name LIKE $prefix
ORDER BY name ASC LIMIT $limit;` ORDER BY name ASC LIMIT $limit;`
) )

67
weed/filer/ydb/ydb_store.go

@ -55,7 +55,7 @@ func (store *YdbStore) Initialize(configuration util.Configuration, prefix strin
return store.initialize( return store.initialize(
configuration.GetString("filer.options.buckets_folder"), configuration.GetString("filer.options.buckets_folder"),
configuration.GetString(prefix+"dsn"), configuration.GetString(prefix+"dsn"),
configuration.GetString(prefix+"tablePathPrefix"),
configuration.GetString(prefix+"prefix"),
configuration.GetBool(prefix+"useBucketPrefix"), configuration.GetBool(prefix+"useBucketPrefix"),
configuration.GetInt(prefix+"connectionTimeOut"), configuration.GetInt(prefix+"connectionTimeOut"),
configuration.GetInt(prefix+"poolSizeLimit"), configuration.GetInt(prefix+"poolSizeLimit"),
@ -64,7 +64,6 @@ func (store *YdbStore) Initialize(configuration util.Configuration, prefix strin
func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix string, useBucketPrefix bool, connectionTimeOut int, poolSizeLimit int) (err error) { func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix string, useBucketPrefix bool, connectionTimeOut int, poolSizeLimit int) (err error) {
store.dirBuckets = dirBuckets store.dirBuckets = dirBuckets
store.tablePathPrefix = tablePathPrefix
store.SupportBucketTable = useBucketPrefix store.SupportBucketTable = useBucketPrefix
store.dbs = make(map[string]bool) store.dbs = make(map[string]bool)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -83,7 +82,7 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix
dsn = os.Getenv("YDB_CONNECTION_STRING") dsn = os.Getenv("YDB_CONNECTION_STRING")
} }
store.DB, err = ydb.Open(ctx, dsn, opts...) store.DB, err = ydb.Open(ctx, dsn, opts...)
if err != nil {
if err != nil || store.DB == nil {
if store.DB != nil { if store.DB != nil {
_ = store.DB.Close(ctx) _ = store.DB.Close(ctx)
store.DB = nil store.DB = nil
@ -92,16 +91,12 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix
} }
store.tablePathPrefix = path.Join(store.DB.Name(), tablePathPrefix) store.tablePathPrefix = path.Join(store.DB.Name(), tablePathPrefix)
if err = sugar.RemoveRecursive(ctx, store.DB, store.tablePathPrefix); err != nil {
return fmt.Errorf("RemoveRecursive %s : %v", store.tablePathPrefix, err)
}
if err = sugar.MakeRecursive(ctx, store.DB, store.tablePathPrefix); err != nil { if err = sugar.MakeRecursive(ctx, store.DB, store.tablePathPrefix); err != nil {
return fmt.Errorf("MakeRecursive %s : %v", store.tablePathPrefix, err) return fmt.Errorf("MakeRecursive %s : %v", store.tablePathPrefix, err)
} }
tablePath := path.Join(store.tablePathPrefix, abstract_sql.DEFAULT_TABLE)
if err = store.createTable(ctx, tablePath); err != nil {
glog.Errorf("createTable %s: %v", tablePath, err)
if err = store.createTable(ctx, store.tablePathPrefix); err != nil {
glog.Errorf("createTable %s: %v", store.tablePathPrefix, err)
} }
return err return err
} }
@ -126,9 +121,15 @@ func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *tabl
return nil return nil
}) })
} }
if err != nil && processResultFunc != nil && res != nil {
if err != nil {
return err
}
if res != nil {
defer func() { _ = res.Close() }()
if processResultFunc != nil {
if err = processResultFunc(res); err != nil { if err = processResultFunc(res); err != nil {
return fmt.Errorf("process resul: %v", err)
return fmt.Errorf("process result: %v", err)
}
} }
} }
return err return err
@ -167,16 +168,15 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e
table.ValueParam("$name", types.UTF8Value(name))) table.ValueParam("$name", types.UTF8Value(name)))
err = store.doTxOrDB(ctx, &queryWithPragma, queryParams, roTX, func(res result.Result) error { err = store.doTxOrDB(ctx, &queryWithPragma, queryParams, roTX, func(res result.Result) error {
defer func() {
_ = res.Close()
}()
for res.NextResultSet(ctx) {
for res.NextRow() { for res.NextRow() {
if err := res.ScanNamed(named.Required("meta", &data)); err != nil {
return fmt.Errorf("scanNamed %s : %v", entry.FullPath, err)
if err = res.ScanNamed(named.OptionalWithDefault("meta", &data)); err != nil {
return fmt.Errorf("scanNamed %s : %v", fullpath, err)
} }
entryFound = true entryFound = true
return nil return nil
} }
}
return res.Err() return res.Err()
}) })
if err != nil { if err != nil {
@ -185,9 +185,12 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e
if !entryFound { if !entryFound {
return nil, filer_pb.ErrNotFound return nil, filer_pb.ErrNotFound
} }
entry.FullPath = fullpath
entry = &filer.Entry{
FullPath: fullpath,
}
if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
return nil, fmt.Errorf("decode %s : %v", entry.FullPath, err)
return nil, fmt.Errorf("decode %s : %v", fullpath, err)
} }
return entry, nil return entry, nil
@ -232,17 +235,14 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath
table.ValueParam("$limit", types.Uint64Value(uint64(limit))), table.ValueParam("$limit", types.Uint64Value(uint64(limit))),
) )
err = store.doTxOrDB(ctx, &queryWithPragma, queryParams, roTX, func(res result.Result) error { err = store.doTxOrDB(ctx, &queryWithPragma, queryParams, roTX, func(res result.Result) error {
defer func() {
_ = res.Close()
}()
for res.NextResultSet(ctx) {
for res.NextRow() {
var name string var name string
var data []byte var data []byte
for res.NextResultSet(ctx) {
for res.NextRow() {
if err := res.ScanNamed( if err := res.ScanNamed(
named.Required("name", &name),
named.Required("meta", &data)); err != nil {
return fmt.Errorf("scanNamed %s : %v", dir, err)
named.OptionalWithDefault("name", &name),
named.OptionalWithDefault("meta", &data)); err != nil {
return fmt.Errorf("list scanNamed %s : %v", dir, err)
} }
lastFileName = name lastFileName = name
entry := &filer.Entry{ entry := &filer.Entry{
@ -304,7 +304,7 @@ func (store *YdbStore) OnBucketCreation(bucket string) {
defer store.dbsLock.Unlock() defer store.dbsLock.Unlock()
if err := store.createTable(context.Background(), if err := store.createTable(context.Background(),
path.Join(store.tablePathPrefix, bucket, abstract_sql.DEFAULT_TABLE)); err != nil {
path.Join(store.tablePathPrefix, bucket)); err != nil {
glog.Errorf("createTable %s: %v", bucket, err) glog.Errorf("createTable %s: %v", bucket, err)
} }
@ -319,7 +319,7 @@ func (store *YdbStore) OnBucketDeletion(bucket string) {
defer store.dbsLock.Unlock() defer store.dbsLock.Unlock()
if err := store.deleteTable(context.Background(), if err := store.deleteTable(context.Background(),
path.Join(store.tablePathPrefix, bucket, abstract_sql.DEFAULT_TABLE)); err != nil {
path.Join(store.tablePathPrefix, bucket)); err != nil {
glog.Errorf("deleteTable %s: %v", bucket, err) glog.Errorf("deleteTable %s: %v", bucket, err)
} }
@ -331,7 +331,7 @@ func (store *YdbStore) OnBucketDeletion(bucket string) {
func (store *YdbStore) createTable(ctx context.Context, prefix string) error { func (store *YdbStore) createTable(ctx context.Context, prefix string) error {
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
return s.CreateTable(ctx, prefix, createTableOptions()...)
return s.CreateTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE), createTableOptions()...)
}) })
} }
@ -340,7 +340,7 @@ func (store *YdbStore) deleteTable(ctx context.Context, prefix string) error {
return nil return nil
} }
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
return s.DropTable(ctx, prefix)
return s.DropTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE))
}) })
} }
@ -366,15 +366,14 @@ func (store *YdbStore) getPrefix(ctx context.Context, dir string) (tablePathPref
store.dbsLock.Lock() store.dbsLock.Lock()
defer store.dbsLock.Unlock() defer store.dbsLock.Unlock()
tablePathPrefix = path.Join(store.tablePathPrefix, bucket)
if _, found := store.dbs[bucket]; !found { if _, found := store.dbs[bucket]; !found {
if err := store.createTable(ctx,
path.Join(store.tablePathPrefix, bucket, abstract_sql.DEFAULT_TABLE)); err == nil {
if err := store.createTable(ctx, tablePathPrefix); err == nil {
store.dbs[bucket] = true store.dbs[bucket] = true
} else { } else {
glog.Errorf("createTable %s: %v", bucket, err)
glog.Errorf("createTable %s: %v", tablePathPrefix, err)
} }
} }
tablePathPrefix = path.Join(store.tablePathPrefix, bucket)
} }
return return
} }

5
weed/filer/ydb/ydb_store_kv.go

@ -3,6 +3,7 @@ package ydb
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
@ -42,13 +43,15 @@ func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err
return fmt.Errorf("kv get execute %s: %v", util.NewFullPath(dirStr, name).Name(), err) return fmt.Errorf("kv get execute %s: %v", util.NewFullPath(dirStr, name).Name(), err)
} }
defer func() { _ = res.Close() }() defer func() { _ = res.Close() }()
for res.NextResultSet(ctx) {
for res.NextRow() { for res.NextRow() {
if err := res.ScanNamed(named.Required("meta", &value)); err != nil {
if err := res.ScanNamed(named.OptionalWithDefault("meta", &value)); err != nil {
return fmt.Errorf("scanNamed %s : %v", util.NewFullPath(dirStr, name).Name(), err) return fmt.Errorf("scanNamed %s : %v", util.NewFullPath(dirStr, name).Name(), err)
} }
valueFound = true valueFound = true
return nil return nil
} }
}
return res.Err() return res.Err()
}) })

4
weed/filer/ydb/ydb_types.go

@ -22,16 +22,16 @@ type FileMetas []FileMeta
func (fm *FileMeta) queryParameters() *table.QueryParameters { func (fm *FileMeta) queryParameters() *table.QueryParameters {
return table.NewQueryParameters( return table.NewQueryParameters(
table.ValueParam("$dir_hash", types.Int64Value(fm.DirHash)), table.ValueParam("$dir_hash", types.Int64Value(fm.DirHash)),
table.ValueParam("$name", types.UTF8Value(fm.Name)),
table.ValueParam("$directory", types.UTF8Value(fm.Directory)), table.ValueParam("$directory", types.UTF8Value(fm.Directory)),
table.ValueParam("$name", types.UTF8Value(fm.Name)),
table.ValueParam("$meta", types.StringValue(fm.Meta))) table.ValueParam("$meta", types.StringValue(fm.Meta)))
} }
func createTableOptions() []options.CreateTableOption { func createTableOptions() []options.CreateTableOption {
return []options.CreateTableOption{ return []options.CreateTableOption{
options.WithColumn("dir_hash", types.Optional(types.TypeInt64)), options.WithColumn("dir_hash", types.Optional(types.TypeInt64)),
options.WithColumn("name", types.Optional(types.TypeUTF8)),
options.WithColumn("directory", types.Optional(types.TypeUTF8)), options.WithColumn("directory", types.Optional(types.TypeUTF8)),
options.WithColumn("name", types.Optional(types.TypeUTF8)),
options.WithColumn("meta", types.Optional(types.TypeString)), options.WithColumn("meta", types.Optional(types.TypeString)),
options.WithPrimaryKeyColumn("dir_hash", "name"), options.WithPrimaryKeyColumn("dir_hash", "name"),
} }

Loading…
Cancel
Save