From 8342f651f393a2b939de39af8a92f42077ca00fb Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Tue, 3 May 2022 15:18:28 +0500 Subject: [PATCH] fix scanNamed --- docker/compose/local-ydb-compose.yml | 20 ++++---- weed/filer/ydb/ydb_queries.go | 12 ++--- weed/filer/ydb/ydb_store.go | 75 ++++++++++++++-------------- weed/filer/ydb/ydb_store_kv.go | 13 +++-- weed/filer/ydb/ydb_types.go | 4 +- 5 files changed, 64 insertions(+), 60 deletions(-) diff --git a/docker/compose/local-ydb-compose.yml b/docker/compose/local-ydb-compose.yml index 33f550600..a17b77b8a 100644 --- a/docker/compose/local-ydb-compose.yml +++ b/docker/compose/local-ydb-compose.yml @@ -12,7 +12,7 @@ services: - GRPC_TLS_PORT=2135 - GRPC_PORT=2136 - MON_PORT=8765 - server: + s3: image: chrislusf/seaweedfs:local ports: - 9333:9333 @@ -20,14 +20,16 @@ services: - 8084:8080 - 18084:18080 - 8888:8888 + - 8000:8000 - 18888:18888 - command: "server -ip=server -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1" + command: "server -ip=s3 -filer -master.volumeSizeLimitMB=16 -volume.max=0 -volume -volume.preStopSeconds=1 -s3 -s3.config=/etc/seaweedfs/s3.json -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=false" volumes: - - ./master-cloud.toml:/etc/seaweedfs/master.toml + - ./s3.json:/etc/seaweedfs/s3.json environment: - - WEED_LEVELDB2_ENABLED=false - - WEED_YDB_ENABLED=true - - WEED_YDB_DSN=grpc://ydb:2136/?database=local - - YDB_ANONYMOUS_CREDENTIALS=1 - depends_on: - - ydb \ No newline at end of file + WEED_LEVELDB2_ENABLED: "false" + WEED_YDB_ENABLED: "true" + WEED_YDB_DSN: "grpc://ydb:2136/?database=local" + WEED_YDB_PREFIX: "seaweedfs" + YDB_ANONYMOUS_CREDENTIALS: 1 + WEED_MASTER_VOLUME_GROWTH_COPY_1: 1 + WEED_MASTER_VOLUME_GROWTH_COPY_OTHER: 1 \ No newline at end of file diff --git a/weed/filer/ydb/ydb_queries.go b/weed/filer/ydb/ydb_queries.go index 659cc2158..bc2f37b10 100644 --- a/weed/filer/ydb/ydb_queries.go +++ b/weed/filer/ydb/ydb_queries.go @@ -5,8 +5,8 @@ import asql "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" const ( insertQuery = ` DECLARE $dir_hash AS int64; - DECLARE $name AS Utf8; DECLARE $directory AS Utf8; + DECLARE $name AS Utf8; DECLARE $meta AS String; UPSERT INTO ` + asql.DEFAULT_TABLE + ` @@ -16,8 +16,8 @@ const ( updateQuery = ` DECLARE $dir_hash AS int64; - DECLARE $name AS Utf8; DECLARE $directory AS Utf8; + DECLARE $name AS Utf8; DECLARE $meta AS String; REPLACE INTO ` + asql.DEFAULT_TABLE + ` @@ -31,7 +31,7 @@ const ( DECLARE $name AS Utf8; DELETE FROM ` + asql.DEFAULT_TABLE + ` - WHERE dir_hash == $dir_hash AND name == $name; + WHERE dir_hash = $dir_hash AND name = $name; COMMIT;` findQuery = ` @@ -40,14 +40,14 @@ const ( SELECT meta FROM ` + asql.DEFAULT_TABLE + ` - WHERE dir_hash == $dir_hash AND name == $name;` + WHERE dir_hash = $dir_hash AND name = $name;` deleteFolderChildrenQuery = ` DECLARE $dir_hash AS int64; DECLARE $directory AS Utf8; DELETE FROM ` + asql.DEFAULT_TABLE + ` - WHERE dir_hash == $dir_hash AND directory == $directory; + WHERE dir_hash = $dir_hash AND directory = $directory; COMMIT;` listDirectoryQuery = ` @@ -59,6 +59,6 @@ const ( SELECT name, meta FROM ` + asql.DEFAULT_TABLE + ` - WHERE dir_hash == $dir_hash AND directory == $directory and name %s $start_name and name LIKE $prefix + WHERE dir_hash = $dir_hash AND directory = $directory and name %s $start_name and name LIKE $prefix ORDER BY name ASC LIMIT $limit;` ) diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go index 6d6389dde..678f58143 100644 --- a/weed/filer/ydb/ydb_store.go +++ b/weed/filer/ydb/ydb_store.go @@ -55,7 +55,7 @@ func (store *YdbStore) Initialize(configuration util.Configuration, prefix strin return store.initialize( configuration.GetString("filer.options.buckets_folder"), configuration.GetString(prefix+"dsn"), - configuration.GetString(prefix+"tablePathPrefix"), + configuration.GetString(prefix+"prefix"), configuration.GetBool(prefix+"useBucketPrefix"), configuration.GetInt(prefix+"connectionTimeOut"), configuration.GetInt(prefix+"poolSizeLimit"), @@ -64,7 +64,6 @@ func (store *YdbStore) Initialize(configuration util.Configuration, prefix strin func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix string, useBucketPrefix bool, connectionTimeOut int, poolSizeLimit int) (err error) { store.dirBuckets = dirBuckets - store.tablePathPrefix = tablePathPrefix store.SupportBucketTable = useBucketPrefix store.dbs = make(map[string]bool) ctx, cancel := context.WithCancel(context.Background()) @@ -83,7 +82,7 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix dsn = os.Getenv("YDB_CONNECTION_STRING") } store.DB, err = ydb.Open(ctx, dsn, opts...) - if err != nil { + if err != nil || store.DB == nil { if store.DB != nil { _ = store.DB.Close(ctx) store.DB = nil @@ -92,16 +91,12 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix } store.tablePathPrefix = path.Join(store.DB.Name(), tablePathPrefix) - if err = sugar.RemoveRecursive(ctx, store.DB, store.tablePathPrefix); err != nil { - return fmt.Errorf("RemoveRecursive %s : %v", store.tablePathPrefix, err) - } if err = sugar.MakeRecursive(ctx, store.DB, store.tablePathPrefix); err != nil { return fmt.Errorf("MakeRecursive %s : %v", store.tablePathPrefix, err) } - tablePath := path.Join(store.tablePathPrefix, abstract_sql.DEFAULT_TABLE) - if err = store.createTable(ctx, tablePath); err != nil { - glog.Errorf("createTable %s: %v", tablePath, err) + if err = store.createTable(ctx, store.tablePathPrefix); err != nil { + glog.Errorf("createTable %s: %v", store.tablePathPrefix, err) } return err } @@ -126,9 +121,15 @@ func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *tabl return nil }) } - if err != nil && processResultFunc != nil && res != nil { - if err = processResultFunc(res); err != nil { - return fmt.Errorf("process resul: %v", err) + if err != nil { + return err + } + if res != nil { + defer func() { _ = res.Close() }() + if processResultFunc != nil { + if err = processResultFunc(res); err != nil { + return fmt.Errorf("process result: %v", err) + } } } return err @@ -167,15 +168,14 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e table.ValueParam("$name", types.UTF8Value(name))) err = store.doTxOrDB(ctx, &queryWithPragma, queryParams, roTX, func(res result.Result) error { - defer func() { - _ = res.Close() - }() - for res.NextRow() { - if err := res.ScanNamed(named.Required("meta", &data)); err != nil { - return fmt.Errorf("scanNamed %s : %v", entry.FullPath, err) + for res.NextResultSet(ctx) { + for res.NextRow() { + if err = res.ScanNamed(named.OptionalWithDefault("meta", &data)); err != nil { + return fmt.Errorf("scanNamed %s : %v", fullpath, err) + } + entryFound = true + return nil } - entryFound = true - return nil } return res.Err() }) @@ -185,9 +185,12 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e if !entryFound { return nil, filer_pb.ErrNotFound } - entry.FullPath = fullpath + + entry = &filer.Entry{ + FullPath: fullpath, + } if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { - return nil, fmt.Errorf("decode %s : %v", entry.FullPath, err) + return nil, fmt.Errorf("decode %s : %v", fullpath, err) } return entry, nil @@ -232,17 +235,14 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath table.ValueParam("$limit", types.Uint64Value(uint64(limit))), ) err = store.doTxOrDB(ctx, &queryWithPragma, queryParams, roTX, func(res result.Result) error { - defer func() { - _ = res.Close() - }() + var name string + var data []byte for res.NextResultSet(ctx) { for res.NextRow() { - var name string - var data []byte if err := res.ScanNamed( - named.Required("name", &name), - named.Required("meta", &data)); err != nil { - return fmt.Errorf("scanNamed %s : %v", dir, err) + named.OptionalWithDefault("name", &name), + named.OptionalWithDefault("meta", &data)); err != nil { + return fmt.Errorf("list scanNamed %s : %v", dir, err) } lastFileName = name entry := &filer.Entry{ @@ -304,7 +304,7 @@ func (store *YdbStore) OnBucketCreation(bucket string) { defer store.dbsLock.Unlock() if err := store.createTable(context.Background(), - path.Join(store.tablePathPrefix, bucket, abstract_sql.DEFAULT_TABLE)); err != nil { + path.Join(store.tablePathPrefix, bucket)); err != nil { glog.Errorf("createTable %s: %v", bucket, err) } @@ -319,7 +319,7 @@ func (store *YdbStore) OnBucketDeletion(bucket string) { defer store.dbsLock.Unlock() if err := store.deleteTable(context.Background(), - path.Join(store.tablePathPrefix, bucket, abstract_sql.DEFAULT_TABLE)); err != nil { + path.Join(store.tablePathPrefix, bucket)); err != nil { glog.Errorf("deleteTable %s: %v", bucket, err) } @@ -331,7 +331,7 @@ func (store *YdbStore) OnBucketDeletion(bucket string) { func (store *YdbStore) createTable(ctx context.Context, prefix string) error { return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - return s.CreateTable(ctx, prefix, createTableOptions()...) + return s.CreateTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE), createTableOptions()...) }) } @@ -340,7 +340,7 @@ func (store *YdbStore) deleteTable(ctx context.Context, prefix string) error { return nil } return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - return s.DropTable(ctx, prefix) + return s.DropTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE)) }) } @@ -366,15 +366,14 @@ func (store *YdbStore) getPrefix(ctx context.Context, dir string) (tablePathPref store.dbsLock.Lock() defer store.dbsLock.Unlock() + tablePathPrefix = path.Join(store.tablePathPrefix, bucket) if _, found := store.dbs[bucket]; !found { - if err := store.createTable(ctx, - path.Join(store.tablePathPrefix, bucket, abstract_sql.DEFAULT_TABLE)); err == nil { + if err := store.createTable(ctx, tablePathPrefix); err == nil { store.dbs[bucket] = true } else { - glog.Errorf("createTable %s: %v", bucket, err) + glog.Errorf("createTable %s: %v", tablePathPrefix, err) } } - tablePathPrefix = path.Join(store.tablePathPrefix, bucket) } return } diff --git a/weed/filer/ydb/ydb_store_kv.go b/weed/filer/ydb/ydb_store_kv.go index 069c35224..6b1e9b99a 100644 --- a/weed/filer/ydb/ydb_store_kv.go +++ b/weed/filer/ydb/ydb_store_kv.go @@ -3,6 +3,7 @@ package ydb import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" "github.com/chrislusf/seaweedfs/weed/util" @@ -42,12 +43,14 @@ func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err return fmt.Errorf("kv get execute %s: %v", util.NewFullPath(dirStr, name).Name(), err) } defer func() { _ = res.Close() }() - for res.NextRow() { - if err := res.ScanNamed(named.Required("meta", &value)); err != nil { - return fmt.Errorf("scanNamed %s : %v", util.NewFullPath(dirStr, name).Name(), err) + for res.NextResultSet(ctx) { + for res.NextRow() { + if err := res.ScanNamed(named.OptionalWithDefault("meta", &value)); err != nil { + return fmt.Errorf("scanNamed %s : %v", util.NewFullPath(dirStr, name).Name(), err) + } + valueFound = true + return nil } - valueFound = true - return nil } return res.Err() }) diff --git a/weed/filer/ydb/ydb_types.go b/weed/filer/ydb/ydb_types.go index 07e69e6a5..aab3d0f87 100644 --- a/weed/filer/ydb/ydb_types.go +++ b/weed/filer/ydb/ydb_types.go @@ -22,16 +22,16 @@ type FileMetas []FileMeta func (fm *FileMeta) queryParameters() *table.QueryParameters { return table.NewQueryParameters( table.ValueParam("$dir_hash", types.Int64Value(fm.DirHash)), - table.ValueParam("$name", types.UTF8Value(fm.Name)), table.ValueParam("$directory", types.UTF8Value(fm.Directory)), + table.ValueParam("$name", types.UTF8Value(fm.Name)), table.ValueParam("$meta", types.StringValue(fm.Meta))) } func createTableOptions() []options.CreateTableOption { return []options.CreateTableOption{ options.WithColumn("dir_hash", types.Optional(types.TypeInt64)), - options.WithColumn("name", types.Optional(types.TypeUTF8)), options.WithColumn("directory", types.Optional(types.TypeUTF8)), + options.WithColumn("name", types.Optional(types.TypeUTF8)), options.WithColumn("meta", types.Optional(types.TypeString)), options.WithPrimaryKeyColumn("dir_hash", "name"), }