You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

142 lines
3.8 KiB

7 years ago
7 years ago
6 years ago
7 years ago
7 years ago
7 years ago
6 years ago
6 years ago
6 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package cassandra
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/joeslay/seaweedfs/weed/filer2"
  6. "github.com/joeslay/seaweedfs/weed/glog"
  7. "github.com/joeslay/seaweedfs/weed/util"
  8. "github.com/gocql/gocql"
  9. )
  10. func init() {
  11. filer2.Stores = append(filer2.Stores, &CassandraStore{})
  12. }
  13. type CassandraStore struct {
  14. cluster *gocql.ClusterConfig
  15. session *gocql.Session
  16. }
  17. func (store *CassandraStore) GetName() string {
  18. return "cassandra"
  19. }
  20. func (store *CassandraStore) Initialize(configuration util.Configuration) (err error) {
  21. return store.initialize(
  22. configuration.GetString("keyspace"),
  23. configuration.GetStringSlice("hosts"),
  24. )
  25. }
  26. func (store *CassandraStore) initialize(keyspace string, hosts []string) (err error) {
  27. store.cluster = gocql.NewCluster(hosts...)
  28. store.cluster.Keyspace = keyspace
  29. store.cluster.Consistency = gocql.LocalQuorum
  30. store.session, err = store.cluster.CreateSession()
  31. if err != nil {
  32. glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
  33. }
  34. return
  35. }
  36. func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  37. return ctx, nil
  38. }
  39. func (store *CassandraStore) CommitTransaction(ctx context.Context) error {
  40. return nil
  41. }
  42. func (store *CassandraStore) RollbackTransaction(ctx context.Context) error {
  43. return nil
  44. }
  45. func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  46. dir, name := entry.FullPath.DirAndName()
  47. meta, err := entry.EncodeAttributesAndChunks()
  48. if err != nil {
  49. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  50. }
  51. if err := store.session.Query(
  52. "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
  53. dir, name, meta, entry.TtlSec).Exec(); err != nil {
  54. return fmt.Errorf("insert %s: %s", entry.FullPath, err)
  55. }
  56. return nil
  57. }
  58. func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  59. return store.InsertEntry(ctx, entry)
  60. }
  61. func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
  62. dir, name := fullpath.DirAndName()
  63. var data []byte
  64. if err := store.session.Query(
  65. "SELECT meta FROM filemeta WHERE directory=? AND name=?",
  66. dir, name).Consistency(gocql.One).Scan(&data); err != nil {
  67. if err != gocql.ErrNotFound {
  68. return nil, filer2.ErrNotFound
  69. }
  70. }
  71. if len(data) == 0 {
  72. return nil, filer2.ErrNotFound
  73. }
  74. entry = &filer2.Entry{
  75. FullPath: fullpath,
  76. }
  77. err = entry.DecodeAttributesAndChunks(data)
  78. if err != nil {
  79. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  80. }
  81. return entry, nil
  82. }
  83. func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
  84. dir, name := fullpath.DirAndName()
  85. if err := store.session.Query(
  86. "DELETE FROM filemeta WHERE directory=? AND name=?",
  87. dir, name).Exec(); err != nil {
  88. return fmt.Errorf("delete %s : %v", fullpath, err)
  89. }
  90. return nil
  91. }
  92. func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
  93. limit int) (entries []*filer2.Entry, err error) {
  94. cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
  95. if inclusive {
  96. cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
  97. }
  98. var data []byte
  99. var name string
  100. iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
  101. for iter.Scan(&name, &data) {
  102. entry := &filer2.Entry{
  103. FullPath: filer2.NewFullPath(string(fullpath), name),
  104. }
  105. if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
  106. err = decodeErr
  107. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  108. break
  109. }
  110. entries = append(entries, entry)
  111. }
  112. if err := iter.Close(); err != nil {
  113. glog.V(0).Infof("list iterator close: %v", err)
  114. }
  115. return entries, err
  116. }