You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

78 lines
2.8 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package ydb
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. "github.com/ydb-platform/ydb-go-sdk/v3/table"
  9. "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
  10. "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
  11. )
  12. func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  13. dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
  14. fileMeta := FileMeta{dirHash, name, dirStr, value}
  15. return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) {
  16. stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dirStr), insertQuery))
  17. if err != nil {
  18. return fmt.Errorf("kv put prepare %s: %v", util.NewFullPath(dirStr, name).Name(), err)
  19. }
  20. _, _, err = stmt.Execute(ctx, rwTX, fileMeta.queryParameters())
  21. if err != nil {
  22. return fmt.Errorf("kv put execute %s: %v", util.NewFullPath(dirStr, name).Name(), err)
  23. }
  24. return nil
  25. })
  26. }
  27. func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  28. dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
  29. valueFound := false
  30. err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
  31. stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dirStr), findQuery))
  32. if err != nil {
  33. return fmt.Errorf("kv get prepare %s: %v", util.NewFullPath(dirStr, name), err)
  34. }
  35. _, res, err := stmt.Execute(ctx, roTX, table.NewQueryParameters(
  36. table.ValueParam("$dir_hash", types.Int64Value(dirHash)),
  37. table.ValueParam("$name", types.UTF8Value(name))))
  38. if err != nil {
  39. return fmt.Errorf("kv get execute %s: %v", util.NewFullPath(dirStr, name).Name(), err)
  40. }
  41. defer func() { _ = res.Close() }()
  42. for res.NextRow() {
  43. if err := res.ScanNamed(named.Required("meta", &value)); err != nil {
  44. return fmt.Errorf("scanNamed %s : %v", util.NewFullPath(dirStr, name).Name(), err)
  45. }
  46. valueFound = true
  47. return nil
  48. }
  49. return res.Err()
  50. })
  51. if !valueFound {
  52. return nil, filer.ErrKvNotFound
  53. }
  54. return value, nil
  55. }
  56. func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) {
  57. dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
  58. return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) {
  59. stmt, err := s.Prepare(ctx, withPragma(store.getPrefix(ctx, dirStr), insertQuery))
  60. if err != nil {
  61. return fmt.Errorf("Prepare %s: %v", util.NewFullPath(dirStr, name).Name(), err)
  62. }
  63. _, _, err = stmt.Execute(ctx, rwTX, table.NewQueryParameters(
  64. table.ValueParam("$dir_hash", types.Int64Value(dirHash)),
  65. table.ValueParam("$name", types.UTF8Value(name))))
  66. if err != nil {
  67. return fmt.Errorf("kv delete %s: %v", util.NewFullPath(dirStr, name).Name(), err)
  68. }
  69. return nil
  70. })
  71. }