You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

132 lines
3.6 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package cassandra
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer2"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. "github.com/gocql/gocql"
  9. )
  10. func init() {
  11. filer2.Stores = append(filer2.Stores, &CassandraStore{})
  12. }
  13. type CassandraStore struct {
  14. cluster *gocql.ClusterConfig
  15. session *gocql.Session
  16. }
  17. func (store *CassandraStore) GetName() string {
  18. return "cassandra"
  19. }
  20. func (store *CassandraStore) Initialize(configuration util.Configuration) (err error) {
  21. return store.initialize(
  22. configuration.GetString("keyspace"),
  23. configuration.GetStringSlice("hosts"),
  24. )
  25. }
  26. func (store *CassandraStore) initialize(keyspace string, hosts []string) (err error) {
  27. store.cluster = gocql.NewCluster(hosts...)
  28. store.cluster.Keyspace = keyspace
  29. store.cluster.Consistency = gocql.LocalQuorum
  30. store.session, err = store.cluster.CreateSession()
  31. if err != nil {
  32. glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
  33. }
  34. return
  35. }
  36. func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  37. dir, name := entry.FullPath.DirAndName()
  38. meta, err := entry.EncodeAttributesAndChunks()
  39. if err != nil {
  40. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  41. }
  42. if err := store.session.Query(
  43. "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
  44. dir, name, meta, entry.TtlSec).Exec(); err != nil {
  45. return fmt.Errorf("insert %s: %s", entry.FullPath, err)
  46. }
  47. return nil
  48. }
  49. func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  50. return store.InsertEntry(ctx, entry)
  51. }
  52. func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
  53. dir, name := fullpath.DirAndName()
  54. var data []byte
  55. if err := store.session.Query(
  56. "SELECT meta FROM filemeta WHERE directory=? AND name=?",
  57. dir, name).Consistency(gocql.One).Scan(&data); err != nil {
  58. if err != gocql.ErrNotFound {
  59. return nil, filer2.ErrNotFound
  60. }
  61. }
  62. if len(data) == 0 {
  63. return nil, fmt.Errorf("not found: %s", fullpath)
  64. }
  65. entry = &filer2.Entry{
  66. FullPath: fullpath,
  67. }
  68. err = entry.DecodeAttributesAndChunks(data)
  69. if err != nil {
  70. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  71. }
  72. return entry, nil
  73. }
  74. func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
  75. dir, name := fullpath.DirAndName()
  76. if err := store.session.Query(
  77. "DELETE FROM filemeta WHERE directory=? AND name=?",
  78. dir, name).Exec(); err != nil {
  79. return fmt.Errorf("delete %s : %v", fullpath, err)
  80. }
  81. return nil
  82. }
  83. func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
  84. limit int) (entries []*filer2.Entry, err error) {
  85. cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
  86. if inclusive {
  87. cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
  88. }
  89. var data []byte
  90. var name string
  91. iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
  92. for iter.Scan(&name, &data) {
  93. entry := &filer2.Entry{
  94. FullPath: filer2.NewFullPath(string(fullpath), name),
  95. }
  96. if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
  97. err = decodeErr
  98. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  99. break
  100. }
  101. entries = append(entries, entry)
  102. }
  103. if err := iter.Close(); err != nil {
  104. glog.V(0).Infof("list iterator close: %v", err)
  105. }
  106. return entries, err
  107. }