|
|
@ -4,17 +4,20 @@ import ( |
|
|
|
"context" |
|
|
|
"fmt" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/filer" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/util" |
|
|
|
environ "github.com/ydb-platform/ydb-go-sdk-auth-environ" |
|
|
|
ydb "github.com/ydb-platform/ydb-go-sdk/v3" |
|
|
|
"github.com/ydb-platform/ydb-go-sdk-auth-environ" |
|
|
|
"github.com/ydb-platform/ydb-go-sdk/v3" |
|
|
|
"github.com/ydb-platform/ydb-go-sdk/v3/sugar" |
|
|
|
"github.com/ydb-platform/ydb-go-sdk/v3/table" |
|
|
|
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" |
|
|
|
"github.com/ydb-platform/ydb-go-sdk/v3/table/types" |
|
|
|
"os" |
|
|
|
"path" |
|
|
|
"strings" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
) |
|
|
|
|
|
|
@ -31,10 +34,12 @@ var ( |
|
|
|
) |
|
|
|
|
|
|
|
type YdbStore struct { |
|
|
|
SupportBucketTable bool |
|
|
|
DB ydb.Connection |
|
|
|
dirBuckets string |
|
|
|
tablePathPrefix string |
|
|
|
SupportBucketTable bool |
|
|
|
dbs map[string]bool |
|
|
|
dbsLock sync.Mutex |
|
|
|
} |
|
|
|
|
|
|
|
func init() { |
|
|
@ -60,6 +65,7 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix |
|
|
|
store.dirBuckets = dirBuckets |
|
|
|
store.tablePathPrefix = tablePathPrefix |
|
|
|
store.SupportBucketTable = useBucketPrefix |
|
|
|
store.dbs = make(map[string]bool) |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
|
|
defer cancel() |
|
|
|
if connectionTimeOut == 0 { |
|
|
@ -72,6 +78,9 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix |
|
|
|
if poolSizeLimit > 0 { |
|
|
|
opts = append(opts, ydb.WithSessionPoolSizeLimit(poolSizeLimit)) |
|
|
|
} |
|
|
|
if dsn == "" { |
|
|
|
dsn = os.Getenv("YDB_CONNECTION_STRING") |
|
|
|
} |
|
|
|
store.DB, err = ydb.Open(ctx, dsn, opts...) |
|
|
|
if err != nil { |
|
|
|
_ = store.DB.Close(ctx) |
|
|
@ -79,6 +88,7 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix |
|
|
|
return fmt.Errorf("can not connect to %s error:%v", dsn, err) |
|
|
|
} |
|
|
|
defer func() { _ = store.DB.Close(ctx) }() |
|
|
|
|
|
|
|
store.tablePathPrefix = path.Join(store.DB.Name(), tablePathPrefix) |
|
|
|
if err = sugar.RemoveRecursive(ctx, store.DB, store.tablePathPrefix); err != nil { |
|
|
|
return fmt.Errorf("RemoveRecursive %s : %v", store.tablePathPrefix, err) |
|
|
@ -86,6 +96,18 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix |
|
|
|
if err = sugar.MakeRecursive(ctx, store.DB, store.tablePathPrefix); err != nil { |
|
|
|
return fmt.Errorf("MakeRecursive %s : %v", store.tablePathPrefix, err) |
|
|
|
} |
|
|
|
|
|
|
|
whoAmI, err := store.DB.Discovery().WhoAmI(ctx) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("connect to %s error:%v", dsn, err) |
|
|
|
} |
|
|
|
glog.V(0).Infof("connected to ydb: %s", whoAmI.String()) |
|
|
|
|
|
|
|
tablePath := path.Join(store.tablePathPrefix, abstract_sql.DEFAULT_TABLE) |
|
|
|
if err := store.createTable(ctx, tablePath); err != nil { |
|
|
|
glog.Errorf("createTable %s: %v", tablePath, err) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
@ -102,11 +124,11 @@ func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Ent |
|
|
|
|
|
|
|
fileMeta := FileMeta{util.HashStringToLong(dir), name, dir, meta} |
|
|
|
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { |
|
|
|
stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), query)) |
|
|
|
stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dir), query)) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("Prepare %s : %v", dir, err) |
|
|
|
} |
|
|
|
_, _, err = stmt.Execute(ctx, rwTX, fileMeta.QueryParameters()) |
|
|
|
_, _, err = stmt.Execute(ctx, rwTX, fileMeta.queryParameters()) |
|
|
|
return err |
|
|
|
}) |
|
|
|
} |
|
|
@ -124,7 +146,7 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e |
|
|
|
var data []byte |
|
|
|
entryFound := false |
|
|
|
err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { |
|
|
|
stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), findQuery)) |
|
|
|
stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dir), findQuery)) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("Prepare %s : %v", entry.FullPath, err) |
|
|
|
} |
|
|
@ -163,7 +185,7 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e |
|
|
|
func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { |
|
|
|
dir, name := fullpath.DirAndName() |
|
|
|
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { |
|
|
|
stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), deleteQuery)) |
|
|
|
stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dir), deleteQuery)) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("Prepare %s : %v", dir, err) |
|
|
|
} |
|
|
@ -177,7 +199,7 @@ func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) |
|
|
|
func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { |
|
|
|
dir, _ := fullpath.DirAndName() |
|
|
|
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { |
|
|
|
stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), deleteFolderChildrenQuery)) |
|
|
|
stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dir), deleteFolderChildrenQuery)) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("Prepare %s : %v", dir, err) |
|
|
|
} |
|
|
@ -199,7 +221,7 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath |
|
|
|
startFileCompOp = ">=" |
|
|
|
} |
|
|
|
err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { |
|
|
|
stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), fmt.Sprintf(ListDirectoryQuery, startFileCompOp))) |
|
|
|
stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dir), fmt.Sprintf(listDirectoryQuery, startFileCompOp))) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("Prepare %s : %v", dir, err) |
|
|
|
} |
|
|
@ -277,22 +299,91 @@ func (store *YdbStore) Shutdown() { |
|
|
|
_ = store.DB.Close(context.Background()) |
|
|
|
} |
|
|
|
|
|
|
|
func (store *YdbStore) getPrefix(dir string) string { |
|
|
|
func (store *YdbStore) CanDropWholeBucket() bool { |
|
|
|
return store.SupportBucketTable |
|
|
|
} |
|
|
|
|
|
|
|
func (store *YdbStore) OnBucketCreation(bucket string) { |
|
|
|
store.dbsLock.Lock() |
|
|
|
defer store.dbsLock.Unlock() |
|
|
|
|
|
|
|
if err := store.createTable(context.Background(), bucket); err != nil { |
|
|
|
glog.Errorf("createTable %s: %v", bucket, err) |
|
|
|
} |
|
|
|
|
|
|
|
if store.dbs == nil { |
|
|
|
return |
|
|
|
} |
|
|
|
store.dbs[bucket] = true |
|
|
|
} |
|
|
|
|
|
|
|
func (store *YdbStore) OnBucketDeletion(bucket string) { |
|
|
|
store.dbsLock.Lock() |
|
|
|
defer store.dbsLock.Unlock() |
|
|
|
|
|
|
|
if err := store.deleteTable(context.Background(), bucket); err != nil { |
|
|
|
glog.Errorf("deleteTable %s: %v", bucket, err) |
|
|
|
} |
|
|
|
|
|
|
|
if store.dbs == nil { |
|
|
|
return |
|
|
|
} |
|
|
|
delete(store.dbs, bucket) |
|
|
|
} |
|
|
|
|
|
|
|
func (store *YdbStore) createTable(ctx context.Context, prefix string) error { |
|
|
|
e, err := store.DB.Scheme().DescribePath(ctx, prefix) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("describe path %s error:%v", prefix, err) |
|
|
|
} |
|
|
|
if e.IsTable() { |
|
|
|
return nil |
|
|
|
} |
|
|
|
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { |
|
|
|
return s.CreateTable(ctx, prefix, createTableOptions()...) |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
func (store *YdbStore) deleteTable(ctx context.Context, prefix string) error { |
|
|
|
if !store.SupportBucketTable { |
|
|
|
return store.tablePathPrefix |
|
|
|
return nil |
|
|
|
} |
|
|
|
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error { |
|
|
|
return s.DropTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE)) |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
func (store *YdbStore) getPrefix(ctx context.Context, dir string) (tablePathPrefix string) { |
|
|
|
tablePathPrefix = store.tablePathPrefix |
|
|
|
if !store.SupportBucketTable { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
prefixBuckets := store.dirBuckets + "/" |
|
|
|
if strings.HasPrefix(dir, prefixBuckets) { |
|
|
|
// detect bucket
|
|
|
|
bucketAndDir := dir[len(prefixBuckets):] |
|
|
|
if t := strings.Index(bucketAndDir, "/"); t > 0 { |
|
|
|
return path.Join(bucketAndDir[:t], store.tablePathPrefix) |
|
|
|
t := strings.Index(bucketAndDir, "/") |
|
|
|
if t < 0 { |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
return store.tablePathPrefix |
|
|
|
} |
|
|
|
bucket := bucketAndDir[:t] |
|
|
|
|
|
|
|
func (store *YdbStore) withPragma(prefix, query string) string { |
|
|
|
return `PRAGMA TablePathPrefix("` + prefix + `");` + query |
|
|
|
if bucket != "" { |
|
|
|
return |
|
|
|
} |
|
|
|
store.dbsLock.Lock() |
|
|
|
defer store.dbsLock.Unlock() |
|
|
|
|
|
|
|
if _, found := store.dbs[bucket]; !found { |
|
|
|
if err := store.createTable(ctx, |
|
|
|
path.Join(store.tablePathPrefix, bucket, abstract_sql.DEFAULT_TABLE)); err == nil { |
|
|
|
store.dbs[bucket] = true |
|
|
|
} else { |
|
|
|
glog.Errorf("createTable %s: %v", bucket, err) |
|
|
|
} |
|
|
|
} |
|
|
|
tablePathPrefix = path.Join(store.tablePathPrefix, bucket) |
|
|
|
} |
|
|
|
return |
|
|
|
} |