You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

97 lines
2.5 KiB

  1. package cassandra_store
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/filer"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/gocql/gocql"
  7. )
  8. /*
  9. Basically you need a table just like this:
  10. CREATE TABLE seaweed_files (
  11. path varchar,
  12. fids list<varchar>,
  13. PRIMARY KEY (path)
  14. );
  15. Need to match flat_namespace.FlatNamespaceStore interface
  16. Put(fullFileName string, fid string) (err error)
  17. Get(fullFileName string) (fid string, err error)
  18. Delete(fullFileName string) (fid string, err error)
  19. */
  20. type CassandraStore struct {
  21. cluster *gocql.ClusterConfig
  22. session *gocql.Session
  23. }
  24. func NewCassandraStore(keyspace string, hosts ...string) (c *CassandraStore, err error) {
  25. c = &CassandraStore{}
  26. s := strings.Split(hosts, ",")
  27. if len(s) == 1 {
  28. fmt.Println("000")
  29. c.cluster = gocql.NewCluster(hosts...)
  30. } else if len(s) > 1 {
  31. fmt.Println("111",s[0])
  32. fmt.Println("222",s[1])
  33. c.cluster = gocql.NewCluster(s[0], s[1])
  34. }
  35. c.cluster.Keyspace = keyspace
  36. c.cluster.Consistency = gocql.Quorum
  37. c.session, err = c.cluster.CreateSession()
  38. if err != nil {
  39. glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
  40. }
  41. return
  42. }
  43. func (c *CassandraStore) Put(fullFileName string, fid string) (err error) {
  44. var input []string
  45. input = append(input, fid)
  46. if err := c.session.Query(
  47. `INSERT INTO seaweed_files (path, fids) VALUES (?, ?)`,
  48. fullFileName, input).Exec(); err != nil {
  49. glog.V(0).Infof("Failed to save file %s with id %s: %v", fullFileName, fid, err)
  50. return err
  51. }
  52. return nil
  53. }
  54. func (c *CassandraStore) Get(fullFileName string) (fid string, err error) {
  55. var output []string
  56. if err := c.session.Query(
  57. `select fids FROM seaweed_files WHERE path = ? LIMIT 1`,
  58. fullFileName).Consistency(gocql.One).Scan(&output); err != nil {
  59. if err != gocql.ErrNotFound {
  60. glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err)
  61. return "", filer.ErrNotFound
  62. }
  63. }
  64. if len(output) == 0 {
  65. return "", fmt.Errorf("No file id found for %s", fullFileName)
  66. }
  67. return output[0], nil
  68. }
  69. // Currently the fid is not returned
  70. func (c *CassandraStore) Delete(fullFileName string) (err error) {
  71. if err := c.session.Query(
  72. `DELETE FROM seaweed_files WHERE path = ?`,
  73. fullFileName).Exec(); err != nil {
  74. if err != gocql.ErrNotFound {
  75. glog.V(0).Infof("Failed to delete file %s: %v", fullFileName, err)
  76. }
  77. return err
  78. }
  79. return nil
  80. }
  81. func (c *CassandraStore) Close() {
  82. if c.session != nil {
  83. c.session.Close()
  84. }
  85. }