You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

289 lines
8.8 KiB

  1. package ydb
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. "github.com/yandex-cloud/ydb-go-sdk/v2"
  10. "github.com/yandex-cloud/ydb-go-sdk/v2/connect"
  11. "github.com/yandex-cloud/ydb-go-sdk/v2/table"
  12. "path"
  13. "strings"
  14. "time"
  15. )
  16. const (
  17. defaultConnectionTimeOut = 10
  18. )
  19. var (
  20. roTX = table.TxControl(
  21. table.BeginTx(table.WithOnlineReadOnly()),
  22. table.CommitTx(),
  23. )
  24. rwTX = table.TxControl(
  25. table.BeginTx(table.WithSerializableReadWrite()),
  26. table.CommitTx(),
  27. )
  28. )
  29. type YdbStore struct {
  30. SupportBucketTable bool
  31. DB *connect.Connection
  32. dirBuckets string
  33. tablePathPrefix string
  34. }
  35. func init() {
  36. filer.Stores = append(filer.Stores, &YdbStore{})
  37. }
  38. func (store *YdbStore) GetName() string {
  39. return "ydb"
  40. }
  41. func (store *YdbStore) Initialize(configuration util.Configuration, prefix string) (err error) {
  42. return store.initialize(
  43. configuration.GetString("filer.options.buckets_folder"),
  44. configuration.GetString(prefix+"coonectionUrl"),
  45. configuration.GetString(prefix+"tablePathPrefix"),
  46. configuration.GetBool(prefix+"useBucketPrefix"),
  47. configuration.GetInt(prefix+"connectionTimeOut"),
  48. )
  49. }
  50. func (store *YdbStore) initialize(dirBuckets string, sqlUrl string, tablePathPrefix string, useBucketPrefix bool, connectionTimeOut int) (err error) {
  51. store.dirBuckets = dirBuckets
  52. store.tablePathPrefix = tablePathPrefix
  53. store.SupportBucketTable = useBucketPrefix
  54. if connectionTimeOut == 0 {
  55. connectionTimeOut = defaultConnectionTimeOut
  56. }
  57. var cancel context.CancelFunc
  58. connCtx, cancel := context.WithTimeout(context.Background(), time.Duration(connectionTimeOut)*time.Second)
  59. defer cancel()
  60. connParams := connect.MustConnectionString(sqlUrl)
  61. store.DB, err = connect.New(connCtx, connParams)
  62. if err != nil {
  63. store.DB.Close()
  64. store.DB = nil
  65. return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
  66. }
  67. defer store.DB.Close()
  68. if err = store.DB.EnsurePathExists(connCtx, connParams.Database()); err != nil {
  69. return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
  70. }
  71. return nil
  72. }
  73. func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Entry, query string) (err error) {
  74. dir, name := entry.FullPath.DirAndName()
  75. meta, err := entry.EncodeAttributesAndChunks()
  76. if err != nil {
  77. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  78. }
  79. if len(entry.Chunks) > filer.CountEntryChunksForGzip {
  80. meta = util.MaybeGzipData(meta)
  81. }
  82. fileMeta := FileMeta{util.HashStringToLong(dir), name, dir, meta}
  83. return table.Retry(ctx, store.DB.Table().Pool(),
  84. table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) {
  85. stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), query))
  86. if err != nil {
  87. return err
  88. }
  89. _, _, err = stmt.Execute(ctx, rwTX, fileMeta.QueryParameters())
  90. return err
  91. }),
  92. )
  93. }
  94. func (store *YdbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  95. return store.insertOrUpdateEntry(ctx, entry, insertQuery)
  96. }
  97. func (store *YdbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  98. return store.insertOrUpdateEntry(ctx, entry, updateQuery)
  99. }
  100. func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
  101. dir, name := fullpath.DirAndName()
  102. var res *table.Result
  103. err = table.Retry(ctx, store.DB.Table().Pool(),
  104. table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) {
  105. stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), findQuery))
  106. if err != nil {
  107. return err
  108. }
  109. _, res, err = stmt.Execute(ctx, roTX, table.NewQueryParameters(
  110. table.ValueParam("$dir_hash", ydb.Int64Value(util.HashStringToLong(dir))),
  111. table.ValueParam("$name", ydb.UTF8Value(name))))
  112. return err
  113. }),
  114. )
  115. if err != nil {
  116. return nil, err
  117. }
  118. defer res.Close()
  119. for res.NextResultSet(ctx) {
  120. for res.NextRow() {
  121. res.SeekItem("meta")
  122. entry.FullPath = fullpath
  123. if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(res.String())); err != nil {
  124. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  125. }
  126. return entry, nil
  127. }
  128. }
  129. return nil, filer_pb.ErrNotFound
  130. }
  131. func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
  132. dir, name := fullpath.DirAndName()
  133. return table.Retry(ctx, store.DB.Table().Pool(),
  134. table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) {
  135. stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), deleteQuery))
  136. if err != nil {
  137. return err
  138. }
  139. _, _, err = stmt.Execute(ctx, rwTX, table.NewQueryParameters(
  140. table.ValueParam("$dir_hash", ydb.Int64Value(util.HashStringToLong(dir))),
  141. table.ValueParam("$name", ydb.UTF8Value(name))))
  142. return err
  143. }),
  144. )
  145. }
  146. func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
  147. dir, _ := fullpath.DirAndName()
  148. return table.Retry(ctx, store.DB.Table().Pool(),
  149. table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) {
  150. stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), deleteFolderChildrenQuery))
  151. if err != nil {
  152. return err
  153. }
  154. _, _, err = stmt.Execute(ctx, rwTX, table.NewQueryParameters(
  155. table.ValueParam("$dir_hash", ydb.Int64Value(util.HashStringToLong(dir))),
  156. table.ValueParam("$directory", ydb.UTF8Value(dir))))
  157. return err
  158. }),
  159. )
  160. }
  161. func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  162. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
  163. }
  164. func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  165. dir := string(dirPath)
  166. var res *table.Result
  167. startFileCompOp := ">"
  168. if includeStartFile {
  169. startFileCompOp = ">="
  170. }
  171. err = table.Retry(ctx, store.DB.Table().Pool(),
  172. table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) {
  173. stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), fmt.Sprintf(ListDirectoryQuery, startFileCompOp)))
  174. if err != nil {
  175. return err
  176. }
  177. _, res, err = stmt.Execute(ctx, roTX, table.NewQueryParameters(
  178. table.ValueParam("$dir_hash", ydb.Int64Value(util.HashStringToLong(dir))),
  179. table.ValueParam("$directory", ydb.UTF8Value(dir)),
  180. table.ValueParam("$start_name", ydb.UTF8Value(startFileName)),
  181. table.ValueParam("$prefix", ydb.UTF8Value(prefix)),
  182. table.ValueParam("$limit", ydb.Int64Value(limit)),
  183. ))
  184. return err
  185. }),
  186. )
  187. if err != nil {
  188. return lastFileName, err
  189. }
  190. defer res.Close()
  191. for res.NextSet() {
  192. for res.NextRow() {
  193. res.SeekItem("name")
  194. name := res.UTF8()
  195. res.SeekItem("meta")
  196. data := res.String()
  197. if res.Err() != nil {
  198. glog.V(0).Infof("scan %s : %v", dirPath, err)
  199. return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
  200. }
  201. lastFileName = name
  202. entry := &filer.Entry{
  203. FullPath: util.NewFullPath(dir, name),
  204. }
  205. if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
  206. glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
  207. return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
  208. }
  209. if !eachEntryFunc(entry) {
  210. break
  211. }
  212. }
  213. }
  214. return lastFileName, nil
  215. }
  216. func (store *YdbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  217. session, err := store.DB.Table().Pool().Create(ctx)
  218. if err != nil {
  219. return ctx, err
  220. }
  221. tx, err := session.BeginTransaction(ctx, table.TxSettings(table.WithSerializableReadWrite()))
  222. if err != nil {
  223. return ctx, err
  224. }
  225. return context.WithValue(ctx, "tx", tx), nil
  226. }
  227. func (store *YdbStore) CommitTransaction(ctx context.Context) error {
  228. if tx, ok := ctx.Value("tx").(*table.Transaction); ok {
  229. return tx.Commit(ctx)
  230. }
  231. return nil
  232. }
  233. func (store *YdbStore) RollbackTransaction(ctx context.Context) error {
  234. if tx, ok := ctx.Value("tx").(*table.Transaction); ok {
  235. return tx.Rollback(ctx)
  236. }
  237. return nil
  238. }
  239. func (store *YdbStore) Shutdown() {
  240. store.DB.Close()
  241. }
  242. func (store *YdbStore) getPrefix(dir string) string {
  243. if !store.SupportBucketTable {
  244. return store.tablePathPrefix
  245. }
  246. prefixBuckets := store.dirBuckets + "/"
  247. if strings.HasPrefix(dir, prefixBuckets) {
  248. // detect bucket
  249. bucketAndDir := dir[len(prefixBuckets):]
  250. if t := strings.Index(bucketAndDir, "/"); t > 0 {
  251. return path.Join(bucketAndDir[:t], store.tablePathPrefix)
  252. }
  253. }
  254. return store.tablePathPrefix
  255. }
  256. func (store *YdbStore) withPragma(prefix, query string) string {
  257. return `PRAGMA TablePathPrefix("` + prefix + `");` + query
  258. }