You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

94 lines
2.4 KiB

  1. package cassandra_store
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/filer"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/gocql/gocql"
  7. )
  8. /*
  9. Basically you need a table just like this:
  10. CREATE TABLE seaweed_files (
  11. path varchar,
  12. fids list<varchar>,
  13. PRIMARY KEY (path)
  14. );
  15. Need to match flat_namespace.FlatNamespaceStore interface
  16. Put(fullFileName string, fid string) (err error)
  17. Get(fullFileName string) (fid string, err error)
  18. Delete(fullFileName string) (fid string, err error)
  19. */
  20. type CassandraStore struct {
  21. cluster *gocql.ClusterConfig
  22. session *gocql.Session
  23. }
  24. func NewCassandraStore(keyspace string, hosts ...string) (c *CassandraStore, err error) {
  25. c = &CassandraStore{}
  26. s := strings.Split(hosts, ",")
  27. if len(s) == 1 {
  28. c.cluster = gocql.NewCluster(hosts...)
  29. } else if len(s) > 1 {
  30. c.cluster = gocql.NewCluster(s[0], s[1])
  31. }
  32. c.cluster.Keyspace = keyspace
  33. c.cluster.Consistency = gocql.Quorum
  34. c.session, err = c.cluster.CreateSession()
  35. if err != nil {
  36. glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
  37. }
  38. return
  39. }
  40. func (c *CassandraStore) Put(fullFileName string, fid string) (err error) {
  41. var input []string
  42. input = append(input, fid)
  43. if err := c.session.Query(
  44. `INSERT INTO seaweed_files (path, fids) VALUES (?, ?)`,
  45. fullFileName, input).Exec(); err != nil {
  46. glog.V(0).Infof("Failed to save file %s with id %s: %v", fullFileName, fid, err)
  47. return err
  48. }
  49. return nil
  50. }
  51. func (c *CassandraStore) Get(fullFileName string) (fid string, err error) {
  52. var output []string
  53. if err := c.session.Query(
  54. `select fids FROM seaweed_files WHERE path = ? LIMIT 1`,
  55. fullFileName).Consistency(gocql.One).Scan(&output); err != nil {
  56. if err != gocql.ErrNotFound {
  57. glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err)
  58. return "", filer.ErrNotFound
  59. }
  60. }
  61. if len(output) == 0 {
  62. return "", fmt.Errorf("No file id found for %s", fullFileName)
  63. }
  64. return output[0], nil
  65. }
  66. // Currently the fid is not returned
  67. func (c *CassandraStore) Delete(fullFileName string) (err error) {
  68. if err := c.session.Query(
  69. `DELETE FROM seaweed_files WHERE path = ?`,
  70. fullFileName).Exec(); err != nil {
  71. if err != gocql.ErrNotFound {
  72. glog.V(0).Infof("Failed to delete file %s: %v", fullFileName, err)
  73. }
  74. return err
  75. }
  76. return nil
  77. }
  78. func (c *CassandraStore) Close() {
  79. if c.session != nil {
  80. c.session.Close()
  81. }
  82. }