You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

131 lines
3.4 KiB

7 years ago
6 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package cassandra
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/filer2"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/util"
  7. "github.com/gocql/gocql"
  8. )
  9. func init() {
  10. filer2.Stores = append(filer2.Stores, &CassandraStore{})
  11. }
  12. type CassandraStore struct {
  13. cluster *gocql.ClusterConfig
  14. session *gocql.Session
  15. }
  16. func (store *CassandraStore) GetName() string {
  17. return "cassandra"
  18. }
  19. func (store *CassandraStore) Initialize(configuration util.Configuration) (err error) {
  20. return store.initialize(
  21. configuration.GetString("keyspace"),
  22. configuration.GetStringSlice("hosts"),
  23. )
  24. }
  25. func (store *CassandraStore) initialize(keyspace string, hosts []string) (err error) {
  26. store.cluster = gocql.NewCluster(hosts...)
  27. store.cluster.Keyspace = keyspace
  28. store.cluster.Consistency = gocql.LocalQuorum
  29. store.session, err = store.cluster.CreateSession()
  30. if err != nil {
  31. glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
  32. }
  33. return
  34. }
  35. func (store *CassandraStore) InsertEntry(entry *filer2.Entry) (err error) {
  36. dir, name := entry.FullPath.DirAndName()
  37. meta, err := entry.EncodeAttributesAndChunks()
  38. if err != nil {
  39. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  40. }
  41. if err := store.session.Query(
  42. "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
  43. dir, name, meta, entry.TtlSec).Exec(); err != nil {
  44. return fmt.Errorf("insert %s: %s", entry.FullPath, err)
  45. }
  46. return nil
  47. }
  48. func (store *CassandraStore) UpdateEntry(entry *filer2.Entry) (err error) {
  49. return store.InsertEntry(entry)
  50. }
  51. func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
  52. dir, name := fullpath.DirAndName()
  53. var data []byte
  54. if err := store.session.Query(
  55. "SELECT meta FROM filemeta WHERE directory=? AND name=?",
  56. dir, name).Consistency(gocql.One).Scan(&data); err != nil {
  57. if err != gocql.ErrNotFound {
  58. return nil, filer2.ErrNotFound
  59. }
  60. }
  61. if len(data) == 0 {
  62. return nil, fmt.Errorf("not found: %s", fullpath)
  63. }
  64. entry = &filer2.Entry{
  65. FullPath: fullpath,
  66. }
  67. err = entry.DecodeAttributesAndChunks(data)
  68. if err != nil {
  69. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  70. }
  71. return entry, nil
  72. }
  73. func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) error {
  74. dir, name := fullpath.DirAndName()
  75. if err := store.session.Query(
  76. "DELETE FROM filemeta WHERE directory=? AND name=?",
  77. dir, name).Exec(); err != nil {
  78. return fmt.Errorf("delete %s : %v", fullpath, err)
  79. }
  80. return nil
  81. }
  82. func (store *CassandraStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool,
  83. limit int) (entries []*filer2.Entry, err error) {
  84. cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
  85. if inclusive {
  86. cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
  87. }
  88. var data []byte
  89. var name string
  90. iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
  91. for iter.Scan(&name, &data) {
  92. entry := &filer2.Entry{
  93. FullPath: filer2.NewFullPath(string(fullpath), name),
  94. }
  95. if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
  96. err = decodeErr
  97. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  98. break
  99. }
  100. entries = append(entries, entry)
  101. }
  102. if err := iter.Close(); err != nil {
  103. glog.V(0).Infof("list iterator close: %v", err)
  104. }
  105. return entries, err
  106. }