You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

72 lines
2.3 KiB

  1. package ydb
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
  7. "github.com/yandex-cloud/ydb-go-sdk/v2"
  8. "github.com/yandex-cloud/ydb-go-sdk/v2/table"
  9. )
  10. func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  11. dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
  12. fileMeta := FileMeta{dirHash, name, dirStr, value}
  13. return table.Retry(ctx, store.DB.Table().Pool(),
  14. table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) {
  15. stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dirStr), insertQuery))
  16. if err != nil {
  17. return fmt.Errorf("kv put: %v", err)
  18. }
  19. _, _, err = stmt.Execute(ctx, rwTX, fileMeta.QueryParameters())
  20. return fmt.Errorf("kv put: %v", err)
  21. }),
  22. )
  23. }
  24. func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  25. dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
  26. var res *table.Result
  27. err = table.Retry(ctx, store.DB.Table().Pool(),
  28. table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) {
  29. stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dirStr), findQuery))
  30. if err != nil {
  31. return err
  32. }
  33. _, res, err = stmt.Execute(ctx, roTX, table.NewQueryParameters(
  34. table.ValueParam("$dir_hash", ydb.Int64Value(dirHash)),
  35. table.ValueParam("$name", ydb.UTF8Value(name))))
  36. return err
  37. }),
  38. )
  39. if err != nil {
  40. return nil, fmt.Errorf("kv get: %v", err)
  41. }
  42. defer res.Close()
  43. for res.NextResultSet(ctx) {
  44. for res.NextRow() {
  45. if err = res.Scan(&value); err != nil {
  46. return nil, fmt.Errorf("kv get: %v", err)
  47. }
  48. return
  49. }
  50. }
  51. return nil, filer.ErrKvNotFound
  52. }
  53. func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) {
  54. dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
  55. return table.Retry(ctx, store.DB.Table().Pool(),
  56. table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) {
  57. stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dirStr), deleteQuery))
  58. if err != nil {
  59. return fmt.Errorf("kv delete: %s", err)
  60. }
  61. _, _, err = stmt.Execute(ctx, rwTX, table.NewQueryParameters(
  62. table.ValueParam("$dir_hash", ydb.Int64Value(dirHash)),
  63. table.ValueParam("$name", ydb.UTF8Value(name))))
  64. return fmt.Errorf("kv delete: %s", err)
  65. }),
  66. )
  67. }