You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

130 lines
3.4 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package cassandra
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/filer2"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/gocql/gocql"
  7. )
  8. func init() {
  9. filer2.Stores = append(filer2.Stores, &CassandraStore{})
  10. }
  11. type CassandraStore struct {
  12. cluster *gocql.ClusterConfig
  13. session *gocql.Session
  14. }
  15. func (store *CassandraStore) GetName() string {
  16. return "cassandra"
  17. }
  18. func (store *CassandraStore) Initialize(configuration filer2.Configuration) (err error) {
  19. return store.initialize(
  20. configuration.GetString("keyspace"),
  21. configuration.GetStringSlice("hosts"),
  22. )
  23. }
  24. func (store *CassandraStore) initialize(keyspace string, hosts []string) (err error) {
  25. store.cluster = gocql.NewCluster(hosts...)
  26. store.cluster.Keyspace = keyspace
  27. store.cluster.Consistency = gocql.LocalQuorum
  28. store.session, err = store.cluster.CreateSession()
  29. if err != nil {
  30. glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
  31. }
  32. return
  33. }
  34. func (store *CassandraStore) InsertEntry(entry *filer2.Entry) (err error) {
  35. dir, name := entry.FullPath.DirAndName()
  36. meta, err := entry.EncodeAttributesAndChunks()
  37. if err != nil {
  38. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  39. }
  40. if err := store.session.Query(
  41. "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?)",
  42. dir, name, meta).Exec(); err != nil {
  43. return fmt.Errorf("insert %s: %s", entry.FullPath, err)
  44. }
  45. return nil
  46. }
  47. func (store *CassandraStore) UpdateEntry(entry *filer2.Entry) (err error) {
  48. return store.InsertEntry(entry)
  49. }
  50. func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
  51. dir, name := fullpath.DirAndName()
  52. var data []byte
  53. if err := store.session.Query(
  54. "SELECT meta FROM filemeta WHERE directory=? AND name=?",
  55. dir, name).Consistency(gocql.One).Scan(&data); err != nil {
  56. if err != gocql.ErrNotFound {
  57. return nil, fmt.Errorf("read entry %s: %v", fullpath, err)
  58. }
  59. }
  60. if len(data) == 0 {
  61. return nil, fmt.Errorf("not found: %s", fullpath)
  62. }
  63. entry = &filer2.Entry{
  64. FullPath: fullpath,
  65. }
  66. err = entry.DecodeAttributesAndChunks(data)
  67. if err != nil {
  68. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  69. }
  70. return entry, nil
  71. }
  72. func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) error {
  73. dir, name := fullpath.DirAndName()
  74. if err := store.session.Query(
  75. "DELETE FROM filemeta WHERE directory=? AND name=?",
  76. dir, name).Exec(); err != nil {
  77. return fmt.Errorf("delete %s : %v", fullpath, err)
  78. }
  79. return nil
  80. }
  81. func (store *CassandraStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool,
  82. limit int) (entries []*filer2.Entry, err error) {
  83. cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
  84. if inclusive {
  85. cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
  86. }
  87. var data []byte
  88. var name string
  89. iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
  90. for iter.Scan(&name, &data) {
  91. entry := &filer2.Entry{
  92. FullPath: filer2.NewFullPath(string(fullpath), name),
  93. }
  94. if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
  95. err = decodeErr
  96. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  97. break
  98. }
  99. entries = append(entries, entry)
  100. }
  101. if err := iter.Close(); err != nil {
  102. glog.V(0).Infof("list iterator close: %v", err)
  103. }
  104. return entries, err
  105. }