Chris Lu
3 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 771 additions and 27 deletions
-
4.github/workflows/go.yml
-
2Makefile
-
6README.md
-
3docker/Makefile
-
35docker/compose/local-ydb-compose.yml
-
12go.mod
-
20go.sum
-
1weed/command/imports.go
-
13weed/command/scaffold/filer.toml
-
2weed/filer/abstract_sql/abstract_sql_store.go
-
8weed/filer/abstract_sql/abstract_sql_store_kv.go
-
4weed/filer/arangodb/arangodb_store.go
-
2weed/filer/cassandra/cassandra_store.go
-
2weed/filer/etcd/etcd_store.go
-
2weed/filer/filerstore.go
-
2weed/filer/hbase/hbase_store.go
-
2weed/filer/leveldb/leveldb_store.go
-
2weed/filer/leveldb2/leveldb2_store.go
-
2weed/filer/leveldb3/leveldb3_store.go
-
2weed/filer/mongodb/mongodb_store.go
-
2weed/filer/redis/universal_redis_store.go
-
2weed/filer/redis2/universal_redis_store.go
-
2weed/filer/redis3/universal_redis_store.go
-
2weed/filer/redis_lua/universal_redis_store.go
-
9weed/filer/ydb/doc.go
-
27weed/filer/ydb/readme.md
-
85weed/filer/ydb/ydb_queries.go
-
403weed/filer/ydb/ydb_store.go
-
79weed/filer/ydb/ydb_store_kv.go
-
60weed/filer/ydb/ydb_types.go
-
1weed/server/filer_server.go
@ -0,0 +1,35 @@ |
|||
version: '2' |
|||
|
|||
services: |
|||
ydb: |
|||
image: cr.yandex/yc/yandex-docker-local-ydb |
|||
ports: |
|||
- 2135:2135 |
|||
- 8765:8765 |
|||
- 2136:2136 |
|||
environment: |
|||
- YDB_DEFAULT_LOG_LEVEL=DEBUG |
|||
- GRPC_TLS_PORT=2135 |
|||
- GRPC_PORT=2136 |
|||
- MON_PORT=8765 |
|||
s3: |
|||
image: chrislusf/seaweedfs:local |
|||
ports: |
|||
- 9333:9333 |
|||
- 19333:19333 |
|||
- 8084:8080 |
|||
- 18084:18080 |
|||
- 8888:8888 |
|||
- 8000:8000 |
|||
- 18888:18888 |
|||
command: "server -ip=s3 -filer -master.volumeSizeLimitMB=16 -volume.max=0 -volume -volume.preStopSeconds=1 -s3 -s3.config=/etc/seaweedfs/s3.json -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=false" |
|||
volumes: |
|||
- ./s3.json:/etc/seaweedfs/s3.json |
|||
environment: |
|||
WEED_LEVELDB2_ENABLED: "false" |
|||
WEED_YDB_ENABLED: "true" |
|||
WEED_YDB_DSN: "grpc://ydb:2136/?database=local" |
|||
WEED_YDB_PREFIX: "seaweedfs" |
|||
YDB_ANONYMOUS_CREDENTIALS: 1 |
|||
WEED_MASTER_VOLUME_GROWTH_COPY_1: 1 |
|||
WEED_MASTER_VOLUME_GROWTH_COPY_OTHER: 1 |
@ -0,0 +1,9 @@ |
|||
/* |
|||
|
|||
Package ydb is for YDB filer store. |
|||
|
|||
The referenced "github.com/ydb-platform/ydb-go-sdk/v3" library is too big when compiled. |
|||
So this is only compiled in "make full_install". |
|||
|
|||
*/ |
|||
package ydb |
@ -0,0 +1,27 @@ |
|||
## YDB |
|||
|
|||
database: https://github.com/ydb-platform/ydb |
|||
|
|||
go driver: https://github.com/ydb-platform/ydb-go-sdk |
|||
|
|||
options: |
|||
|
|||
``` |
|||
[ydb] |
|||
enabled=true |
|||
dsn=grpcs://ydb-ru.yandex.net:2135/?database=/ru/home/username/db |
|||
prefix="seaweedfs" |
|||
useBucketPrefix=true |
|||
poolSizeLimit=50 |
|||
dialTimeOut = 10 |
|||
``` |
|||
|
|||
Authenticate produced with one of next environment variables: |
|||
* `YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS=<path/to/sa_key_file>` — used service account key file by path |
|||
* `YDB_ANONYMOUS_CREDENTIALS="1"` — used for authenticate with anonymous access. Anonymous access needs for connect to testing YDB installation |
|||
* `YDB_METADATA_CREDENTIALS="1"` — used metadata service for authenticate to YDB from yandex cloud virtual machine or from yandex function |
|||
* `YDB_ACCESS_TOKEN_CREDENTIALS=<access_token>` — used for authenticate to YDB with short-life access token. For example, access token may be IAM token |
|||
* `YDB_CONNECTION_STRING="grpcs://endpoint/?database=database"` |
|||
|
|||
* i test using this dev database: |
|||
`make dev_ydb` |
@ -0,0 +1,85 @@ |
|||
//go:build ydb
|
|||
// +build ydb
|
|||
|
|||
package ydb |
|||
|
|||
import asql "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" |
|||
|
|||
const ( |
|||
insertQuery = ` |
|||
PRAGMA TablePathPrefix("%v"); |
|||
DECLARE $dir_hash AS int64; |
|||
DECLARE $directory AS Utf8; |
|||
DECLARE $name AS Utf8; |
|||
DECLARE $meta AS String; |
|||
DECLARE $expire_at AS Optional<uint32>; |
|||
|
|||
UPSERT INTO ` + asql.DEFAULT_TABLE + ` |
|||
(dir_hash, name, directory, meta, expire_at) |
|||
VALUES |
|||
($dir_hash, $name, $directory, $meta, $expire_at);` |
|||
|
|||
updateQuery = ` |
|||
PRAGMA TablePathPrefix("%v"); |
|||
DECLARE $dir_hash AS int64; |
|||
DECLARE $directory AS Utf8; |
|||
DECLARE $name AS Utf8; |
|||
DECLARE $meta AS String; |
|||
DECLARE $expire_at AS Optional<uint32>; |
|||
|
|||
REPLACE INTO ` + asql.DEFAULT_TABLE + ` |
|||
(dir_hash, name, directory, meta, expire_at) |
|||
VALUES |
|||
($dir_hash, $name, $directory, $meta, $expire_at);` |
|||
|
|||
deleteQuery = ` |
|||
PRAGMA TablePathPrefix("%v"); |
|||
DECLARE $dir_hash AS int64; |
|||
DECLARE $name AS Utf8; |
|||
|
|||
DELETE FROM ` + asql.DEFAULT_TABLE + ` |
|||
WHERE dir_hash = $dir_hash AND name = $name;` |
|||
|
|||
findQuery = ` |
|||
PRAGMA TablePathPrefix("%v"); |
|||
DECLARE $dir_hash AS int64; |
|||
DECLARE $name AS Utf8; |
|||
|
|||
SELECT meta |
|||
FROM ` + asql.DEFAULT_TABLE + ` |
|||
WHERE dir_hash = $dir_hash AND name = $name;` |
|||
|
|||
deleteFolderChildrenQuery = ` |
|||
PRAGMA TablePathPrefix("%v"); |
|||
DECLARE $dir_hash AS int64; |
|||
DECLARE $directory AS Utf8; |
|||
|
|||
DELETE FROM ` + asql.DEFAULT_TABLE + ` |
|||
WHERE dir_hash = $dir_hash AND directory = $directory;` |
|||
|
|||
listDirectoryQuery = ` |
|||
PRAGMA TablePathPrefix("%v"); |
|||
DECLARE $dir_hash AS int64; |
|||
DECLARE $directory AS Utf8; |
|||
DECLARE $start_name AS Utf8; |
|||
DECLARE $prefix AS Utf8; |
|||
DECLARE $limit AS Uint64; |
|||
|
|||
SELECT name, meta |
|||
FROM ` + asql.DEFAULT_TABLE + ` |
|||
WHERE dir_hash = $dir_hash AND directory = $directory and name > $start_name and name LIKE $prefix |
|||
ORDER BY name ASC LIMIT $limit;` |
|||
|
|||
listInclusiveDirectoryQuery = ` |
|||
PRAGMA TablePathPrefix("%v"); |
|||
DECLARE $dir_hash AS int64; |
|||
DECLARE $directory AS Utf8; |
|||
DECLARE $start_name AS Utf8; |
|||
DECLARE $prefix AS Utf8; |
|||
DECLARE $limit AS Uint64; |
|||
|
|||
SELECT name, meta |
|||
FROM ` + asql.DEFAULT_TABLE + ` |
|||
WHERE dir_hash = $dir_hash AND directory = $directory and name >= $start_name and name LIKE $prefix |
|||
ORDER BY name ASC LIMIT $limit;` |
|||
) |
@ -0,0 +1,403 @@ |
|||
//go:build ydb
|
|||
// +build ydb
|
|||
|
|||
package ydb |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
environ "github.com/ydb-platform/ydb-go-sdk-auth-environ" |
|||
"github.com/ydb-platform/ydb-go-sdk/v3" |
|||
"github.com/ydb-platform/ydb-go-sdk/v3/sugar" |
|||
"github.com/ydb-platform/ydb-go-sdk/v3/table" |
|||
"github.com/ydb-platform/ydb-go-sdk/v3/table/options" |
|||
"github.com/ydb-platform/ydb-go-sdk/v3/table/result" |
|||
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" |
|||
"github.com/ydb-platform/ydb-go-sdk/v3/table/types" |
|||
"os" |
|||
"path" |
|||
"strings" |
|||
"sync" |
|||
"time" |
|||
) |
|||
|
|||
const ( |
|||
defaultDialTimeOut = 10 |
|||
) |
|||
|
|||
var ( |
|||
roTX = table.TxControl( |
|||
table.BeginTx(table.WithOnlineReadOnly()), |
|||
table.CommitTx(), |
|||
) |
|||
rwTX = table.DefaultTxControl() |
|||
) |
|||
|
|||
type YdbStore struct { |
|||
DB ydb.Connection |
|||
dirBuckets string |
|||
tablePathPrefix string |
|||
SupportBucketTable bool |
|||
dbs map[string]bool |
|||
dbsLock sync.Mutex |
|||
} |
|||
|
|||
func init() { |
|||
filer.Stores = append(filer.Stores, &YdbStore{}) |
|||
} |
|||
|
|||
func (store *YdbStore) GetName() string { |
|||
return "ydb" |
|||
} |
|||
|
|||
func (store *YdbStore) Initialize(configuration util.Configuration, prefix string) (err error) { |
|||
return store.initialize( |
|||
configuration.GetString("filer.options.buckets_folder"), |
|||
configuration.GetString(prefix+"dsn"), |
|||
configuration.GetString(prefix+"prefix"), |
|||
configuration.GetBool(prefix+"useBucketPrefix"), |
|||
configuration.GetInt(prefix+"dialTimeOut"), |
|||
configuration.GetInt(prefix+"poolSizeLimit"), |
|||
) |
|||
} |
|||
|
|||
func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix string, useBucketPrefix bool, dialTimeOut int, poolSizeLimit int) (err error) { |
|||
store.dirBuckets = dirBuckets |
|||
store.SupportBucketTable = useBucketPrefix |
|||
if store.SupportBucketTable { |
|||
glog.V(0).Infof("enabled BucketPrefix") |
|||
} |
|||
store.dbs = make(map[string]bool) |
|||
ctx, cancel := context.WithCancel(context.Background()) |
|||
defer cancel() |
|||
if dialTimeOut == 0 { |
|||
dialTimeOut = defaultDialTimeOut |
|||
} |
|||
opts := []ydb.Option{ |
|||
ydb.WithDialTimeout(time.Duration(dialTimeOut) * time.Second), |
|||
environ.WithEnvironCredentials(ctx), |
|||
} |
|||
if poolSizeLimit > 0 { |
|||
opts = append(opts, ydb.WithSessionPoolSizeLimit(poolSizeLimit)) |
|||
} |
|||
if dsn == "" { |
|||
dsn = os.Getenv("YDB_CONNECTION_STRING") |
|||
} |
|||
store.DB, err = ydb.Open(ctx, dsn, opts...) |
|||
if err != nil || store.DB == nil { |
|||
if store.DB != nil { |
|||
_ = store.DB.Close(ctx) |
|||
store.DB = nil |
|||
} |
|||
return fmt.Errorf("can not connect to %s error: %v", dsn, err) |
|||
} |
|||
|
|||
store.tablePathPrefix = path.Join(store.DB.Name(), tablePathPrefix) |
|||
if err = sugar.MakeRecursive(ctx, store.DB, store.tablePathPrefix); err != nil { |
|||
return fmt.Errorf("MakeRecursive %s : %v", store.tablePathPrefix, err) |
|||
} |
|||
|
|||
if err = store.createTable(ctx, store.tablePathPrefix); err != nil { |
|||
glog.Errorf("createTable %s: %v", store.tablePathPrefix, err) |
|||
} |
|||
return err |
|||
} |
|||
|
|||
func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *table.QueryParameters, tc *table.TransactionControl, processResultFunc func(res result.Result) error) (err error) { |
|||
var res result.Result |
|||
if tx, ok := ctx.Value("tx").(table.Transaction); ok { |
|||
res, err = tx.Execute(ctx, *query, params, options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) |
|||
if err != nil { |
|||
return fmt.Errorf("execute transaction: %v", err) |
|||
} |
|||
} else { |
|||
err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { |
|||
_, res, err = s.Execute(ctx, tc, *query, |
|||
params, options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) |
|||
if err != nil { |
|||
return fmt.Errorf("execute statement: %v", err) |
|||
} |
|||
return nil |
|||
}) |
|||
} |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if res != nil { |
|||
defer func() { _ = res.Close() }() |
|||
if processResultFunc != nil { |
|||
if err = processResultFunc(res); err != nil { |
|||
return fmt.Errorf("process result: %v", err) |
|||
} |
|||
} |
|||
} |
|||
return err |
|||
} |
|||
|
|||
func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Entry, isUpdate bool) (err error) { |
|||
dir, name := entry.FullPath.DirAndName() |
|||
meta, err := entry.EncodeAttributesAndChunks() |
|||
if err != nil { |
|||
return fmt.Errorf("encode %s: %s", entry.FullPath, err) |
|||
} |
|||
|
|||
if len(entry.Chunks) > filer.CountEntryChunksForGzip { |
|||
meta = util.MaybeGzipData(meta) |
|||
} |
|||
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) |
|||
fileMeta := FileMeta{util.HashStringToLong(dir), name, *shortDir, meta} |
|||
var query *string |
|||
if isUpdate { |
|||
query = withPragma(tablePathPrefix, updateQuery) |
|||
} else { |
|||
query = withPragma(tablePathPrefix, insertQuery) |
|||
} |
|||
return store.doTxOrDB(ctx, query, fileMeta.queryParameters(entry.TtlSec), rwTX, nil) |
|||
} |
|||
|
|||
func (store *YdbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { |
|||
return store.insertOrUpdateEntry(ctx, entry, false) |
|||
} |
|||
|
|||
func (store *YdbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { |
|||
return store.insertOrUpdateEntry(ctx, entry, true) |
|||
} |
|||
|
|||
func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { |
|||
dir, name := fullpath.DirAndName() |
|||
var data []byte |
|||
entryFound := false |
|||
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) |
|||
query := withPragma(tablePathPrefix, findQuery) |
|||
queryParams := table.NewQueryParameters( |
|||
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), |
|||
table.ValueParam("$name", types.UTF8Value(name))) |
|||
|
|||
err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error { |
|||
for res.NextResultSet(ctx) { |
|||
for res.NextRow() { |
|||
if err = res.ScanNamed(named.OptionalWithDefault("meta", &data)); err != nil { |
|||
return fmt.Errorf("scanNamed %s : %v", fullpath, err) |
|||
} |
|||
entryFound = true |
|||
return nil |
|||
} |
|||
} |
|||
return res.Err() |
|||
}) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
if !entryFound { |
|||
return nil, filer_pb.ErrNotFound |
|||
} |
|||
|
|||
entry = &filer.Entry{ |
|||
FullPath: fullpath, |
|||
} |
|||
if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { |
|||
return nil, fmt.Errorf("decode %s : %v", fullpath, err) |
|||
} |
|||
|
|||
return entry, nil |
|||
} |
|||
|
|||
func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { |
|||
dir, name := fullpath.DirAndName() |
|||
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) |
|||
query := withPragma(tablePathPrefix, deleteQuery) |
|||
queryParams := table.NewQueryParameters( |
|||
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), |
|||
table.ValueParam("$name", types.UTF8Value(name))) |
|||
|
|||
return store.doTxOrDB(ctx, query, queryParams, rwTX, nil) |
|||
} |
|||
|
|||
func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { |
|||
dir, _ := fullpath.DirAndName() |
|||
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) |
|||
query := withPragma(tablePathPrefix, deleteFolderChildrenQuery) |
|||
queryParams := table.NewQueryParameters( |
|||
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), |
|||
table.ValueParam("$directory", types.UTF8Value(*shortDir))) |
|||
|
|||
return store.doTxOrDB(ctx, query, queryParams, rwTX, nil) |
|||
} |
|||
|
|||
func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { |
|||
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil) |
|||
} |
|||
|
|||
func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { |
|||
dir := string(dirPath) |
|||
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir) |
|||
var query *string |
|||
if includeStartFile { |
|||
query = withPragma(tablePathPrefix, listInclusiveDirectoryQuery) |
|||
} else { |
|||
query = withPragma(tablePathPrefix, listDirectoryQuery) |
|||
} |
|||
queryParams := table.NewQueryParameters( |
|||
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))), |
|||
table.ValueParam("$directory", types.UTF8Value(*shortDir)), |
|||
table.ValueParam("$start_name", types.UTF8Value(startFileName)), |
|||
table.ValueParam("$prefix", types.UTF8Value(prefix+"%")), |
|||
table.ValueParam("$limit", types.Uint64Value(uint64(limit))), |
|||
) |
|||
err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error { |
|||
var name string |
|||
var data []byte |
|||
for res.NextResultSet(ctx) { |
|||
for res.NextRow() { |
|||
if err := res.ScanNamed( |
|||
named.OptionalWithDefault("name", &name), |
|||
named.OptionalWithDefault("meta", &data)); err != nil { |
|||
return fmt.Errorf("list scanNamed %s : %v", dir, err) |
|||
} |
|||
lastFileName = name |
|||
entry := &filer.Entry{ |
|||
FullPath: util.NewFullPath(dir, name), |
|||
} |
|||
if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { |
|||
return fmt.Errorf("scan decode %s : %v", entry.FullPath, err) |
|||
} |
|||
if !eachEntryFunc(entry) { |
|||
break |
|||
} |
|||
} |
|||
} |
|||
return res.Err() |
|||
}) |
|||
if err != nil { |
|||
return lastFileName, err |
|||
} |
|||
return lastFileName, nil |
|||
} |
|||
|
|||
func (store *YdbStore) BeginTransaction(ctx context.Context) (context.Context, error) { |
|||
session, err := store.DB.Table().CreateSession(ctx) |
|||
if err != nil { |
|||
return ctx, err |
|||
} |
|||
tx, err := session.BeginTransaction(ctx, table.TxSettings(table.WithSerializableReadWrite())) |
|||
if err != nil { |
|||
return ctx, err |
|||
} |
|||
return context.WithValue(ctx, "tx", tx), nil |
|||
} |
|||
|
|||
func (store *YdbStore) CommitTransaction(ctx context.Context) error { |
|||
if tx, ok := ctx.Value("tx").(table.Transaction); ok { |
|||
_, err := tx.CommitTx(ctx) |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (store *YdbStore) RollbackTransaction(ctx context.Context) error { |
|||
if tx, ok := ctx.Value("tx").(table.Transaction); ok { |
|||
return tx.Rollback(ctx) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (store *YdbStore) Shutdown() { |
|||
_ = store.DB.Close(context.Background()) |
|||
} |
|||
|
|||
func (store *YdbStore) CanDropWholeBucket() bool { |
|||
return store.SupportBucketTable |
|||
} |
|||
|
|||
func (store *YdbStore) OnBucketCreation(bucket string) { |
|||
store.dbsLock.Lock() |
|||
defer store.dbsLock.Unlock() |
|||
|
|||
if err := store.createTable(context.Background(), |
|||
path.Join(store.tablePathPrefix, bucket)); err != nil { |
|||
glog.Errorf("createTable %s: %v", bucket, err) |
|||
} |
|||
|
|||
if store.dbs == nil { |
|||
return |
|||
} |
|||
store.dbs[bucket] = true |
|||
} |
|||
|
|||
func (store *YdbStore) OnBucketDeletion(bucket string) { |
|||
store.dbsLock.Lock() |
|||
defer store.dbsLock.Unlock() |
|||
|
|||
if err := store.deleteTable(context.Background(), |
|||
path.Join(store.tablePathPrefix, bucket)); err != nil { |
|||
glog.Errorf("deleteTable %s: %v", bucket, err) |
|||
} |
|||
|
|||
if store.dbs == nil { |
|||
return |
|||
} |
|||
delete(store.dbs, bucket) |
|||
} |
|||
|
|||
func (store *YdbStore) createTable(ctx context.Context, prefix string) error { |
|||
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { |
|||
return s.CreateTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE), createTableOptions()...) |
|||
}) |
|||
} |
|||
|
|||
func (store *YdbStore) deleteTable(ctx context.Context, prefix string) error { |
|||
if !store.SupportBucketTable { |
|||
return nil |
|||
} |
|||
if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { |
|||
return s.DropTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE)) |
|||
}); err != nil { |
|||
return err |
|||
} |
|||
glog.V(4).Infof("deleted table %s", prefix) |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPrefix *string, shortDir *string) { |
|||
tablePathPrefix = &store.tablePathPrefix |
|||
shortDir = dir |
|||
if !store.SupportBucketTable { |
|||
return |
|||
} |
|||
|
|||
prefixBuckets := store.dirBuckets + "/" |
|||
if strings.HasPrefix(*dir, prefixBuckets) { |
|||
// detect bucket
|
|||
bucketAndDir := (*dir)[len(prefixBuckets):] |
|||
var bucket string |
|||
if t := strings.Index(bucketAndDir, "/"); t > 0 { |
|||
bucket = bucketAndDir[:t] |
|||
} else if t < 0 { |
|||
bucket = bucketAndDir |
|||
} |
|||
if bucket == "" { |
|||
return |
|||
} |
|||
|
|||
store.dbsLock.Lock() |
|||
defer store.dbsLock.Unlock() |
|||
|
|||
tablePathPrefixWithBucket := path.Join(store.tablePathPrefix, bucket) |
|||
if _, found := store.dbs[bucket]; !found { |
|||
if err := store.createTable(ctx, tablePathPrefixWithBucket); err == nil { |
|||
store.dbs[bucket] = true |
|||
glog.V(4).Infof("created table %s", tablePathPrefixWithBucket) |
|||
} else { |
|||
glog.Errorf("createTable %s: %v", tablePathPrefixWithBucket, err) |
|||
} |
|||
} |
|||
tablePathPrefix = &tablePathPrefixWithBucket |
|||
} |
|||
return |
|||
} |
@ -0,0 +1,79 @@ |
|||
//go:build ydb
|
|||
// +build ydb
|
|||
|
|||
package ydb |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/ydb-platform/ydb-go-sdk/v3/table/options" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/ydb-platform/ydb-go-sdk/v3/table" |
|||
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" |
|||
"github.com/ydb-platform/ydb-go-sdk/v3/table/types" |
|||
) |
|||
|
|||
func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { |
|||
dirStr, dirHash, name := abstract_sql.GenDirAndName(key) |
|||
fileMeta := FileMeta{dirHash, name, dirStr, value} |
|||
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { |
|||
_, _, err = s.Execute(ctx, rwTX, *withPragma(&store.tablePathPrefix, insertQuery), |
|||
fileMeta.queryParameters(0), |
|||
options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) |
|||
if err != nil { |
|||
return fmt.Errorf("kv put execute %s: %v", util.NewFullPath(dirStr, name).Name(), err) |
|||
} |
|||
return nil |
|||
}) |
|||
} |
|||
|
|||
func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { |
|||
dirStr, dirHash, name := abstract_sql.GenDirAndName(key) |
|||
valueFound := false |
|||
err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { |
|||
_, res, err := s.Execute(ctx, roTX, *withPragma(&store.tablePathPrefix, findQuery), |
|||
table.NewQueryParameters( |
|||
table.ValueParam("$dir_hash", types.Int64Value(dirHash)), |
|||
table.ValueParam("$name", types.UTF8Value(name))), |
|||
options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) |
|||
if err != nil { |
|||
return fmt.Errorf("kv get execute %s: %v", util.NewFullPath(dirStr, name).Name(), err) |
|||
} |
|||
defer func() { _ = res.Close() }() |
|||
for res.NextResultSet(ctx) { |
|||
for res.NextRow() { |
|||
if err := res.ScanNamed(named.OptionalWithDefault("meta", &value)); err != nil { |
|||
return fmt.Errorf("scanNamed %s : %v", util.NewFullPath(dirStr, name).Name(), err) |
|||
} |
|||
valueFound = true |
|||
return nil |
|||
} |
|||
} |
|||
return res.Err() |
|||
}) |
|||
|
|||
if !valueFound { |
|||
return nil, filer.ErrKvNotFound |
|||
} |
|||
|
|||
return value, nil |
|||
} |
|||
|
|||
func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) { |
|||
dirStr, dirHash, name := abstract_sql.GenDirAndName(key) |
|||
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { |
|||
_, _, err = s.Execute(ctx, rwTX, *withPragma(&store.tablePathPrefix, insertQuery), |
|||
table.NewQueryParameters( |
|||
table.ValueParam("$dir_hash", types.Int64Value(dirHash)), |
|||
table.ValueParam("$name", types.UTF8Value(name))), |
|||
options.WithQueryCachePolicy(options.WithQueryCachePolicyKeepInCache())) |
|||
if err != nil { |
|||
return fmt.Errorf("kv delete %s: %v", util.NewFullPath(dirStr, name).Name(), err) |
|||
} |
|||
return nil |
|||
}) |
|||
|
|||
} |
@ -0,0 +1,60 @@ |
|||
//go:build ydb
|
|||
// +build ydb
|
|||
|
|||
package ydb |
|||
|
|||
import ( |
|||
"fmt" |
|||
"github.com/ydb-platform/ydb-go-sdk/v3/table" |
|||
"github.com/ydb-platform/ydb-go-sdk/v3/table/options" |
|||
"github.com/ydb-platform/ydb-go-sdk/v3/table/types" |
|||
) |
|||
|
|||
//go:generate ydbgen
|
|||
|
|||
//ydb:gen
|
|||
type FileMeta struct { |
|||
DirHash int64 `ydb:"type:int64"` |
|||
Name string `ydb:"type:utf8"` |
|||
Directory string `ydb:"type:utf8"` |
|||
Meta []byte `ydb:"type:string"` |
|||
} |
|||
|
|||
//ydb:gen scan,value
|
|||
type FileMetas []FileMeta |
|||
|
|||
func (fm *FileMeta) queryParameters(ttlSec int32) *table.QueryParameters { |
|||
var expireAtValue types.Value |
|||
if ttlSec > 0 { |
|||
expireAtValue = types.Uint32Value(uint32(ttlSec)) |
|||
} else { |
|||
expireAtValue = types.NullValue(types.TypeUint32) |
|||
} |
|||
return table.NewQueryParameters( |
|||
table.ValueParam("$dir_hash", types.Int64Value(fm.DirHash)), |
|||
table.ValueParam("$directory", types.UTF8Value(fm.Directory)), |
|||
table.ValueParam("$name", types.UTF8Value(fm.Name)), |
|||
table.ValueParam("$meta", types.StringValue(fm.Meta)), |
|||
table.ValueParam("$expire_at", expireAtValue)) |
|||
} |
|||
|
|||
func createTableOptions() []options.CreateTableOption { |
|||
columnUnit := options.TimeToLiveUnitSeconds |
|||
return []options.CreateTableOption{ |
|||
options.WithColumn("dir_hash", types.Optional(types.TypeInt64)), |
|||
options.WithColumn("directory", types.Optional(types.TypeUTF8)), |
|||
options.WithColumn("name", types.Optional(types.TypeUTF8)), |
|||
options.WithColumn("meta", types.Optional(types.TypeString)), |
|||
options.WithColumn("expire_at", types.Optional(types.TypeUint32)), |
|||
options.WithPrimaryKeyColumn("dir_hash", "name"), |
|||
options.WithTimeToLiveSettings(options.TimeToLiveSettings{ |
|||
ColumnName: "expire_at", |
|||
ColumnUnit: &columnUnit, |
|||
Mode: options.TimeToLiveModeValueSinceUnixEpoch}, |
|||
), |
|||
} |
|||
} |
|||
func withPragma(prefix *string, query string) *string { |
|||
queryWithPragma := fmt.Sprintf(query, *prefix) |
|||
return &queryWithPragma |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue