You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

94 lines
2.4 KiB

  1. package cassandra_store
  2. import (
  3. "fmt"
  4. "strings"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/gocql/gocql"
  8. )
  9. /*
  10. Basically you need a table just like this:
  11. CREATE TABLE seaweed_files (
  12. path varchar,
  13. fids list<varchar>,
  14. PRIMARY KEY (path)
  15. );
  16. Need to match flat_namespace.FlatNamespaceStore interface
  17. Put(fullFileName string, fid string) (err error)
  18. Get(fullFileName string) (fid string, err error)
  19. Delete(fullFileName string) (fid string, err error)
  20. */
  21. type CassandraStore struct {
  22. cluster *gocql.ClusterConfig
  23. session *gocql.Session
  24. }
  25. func NewCassandraStore(keyspace string, hosts ...string) (c *CassandraStore, err error) {
  26. c = &CassandraStore{}
  27. s := strings.Split(hosts, ",")
  28. if len(s) == 1 {
  29. c.cluster = gocql.NewCluster(hosts...)
  30. } else if len(s) > 1 {
  31. c.cluster = gocql.NewCluster(s[0], s[1])
  32. }
  33. c.cluster.Keyspace = keyspace
  34. c.cluster.Consistency = gocql.Quorum
  35. c.session, err = c.cluster.CreateSession()
  36. if err != nil {
  37. glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
  38. }
  39. return
  40. }
  41. func (c *CassandraStore) Put(fullFileName string, fid string) (err error) {
  42. var input []string
  43. input = append(input, fid)
  44. if err := c.session.Query(
  45. `INSERT INTO seaweed_files (path, fids) VALUES (?, ?)`,
  46. fullFileName, input).Exec(); err != nil {
  47. glog.V(0).Infof("Failed to save file %s with id %s: %v", fullFileName, fid, err)
  48. return err
  49. }
  50. return nil
  51. }
  52. func (c *CassandraStore) Get(fullFileName string) (fid string, err error) {
  53. var output []string
  54. if err := c.session.Query(
  55. `select fids FROM seaweed_files WHERE path = ? LIMIT 1`,
  56. fullFileName).Consistency(gocql.One).Scan(&output); err != nil {
  57. if err != gocql.ErrNotFound {
  58. glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err)
  59. return "", filer.ErrNotFound
  60. }
  61. }
  62. if len(output) == 0 {
  63. return "", fmt.Errorf("No file id found for %s", fullFileName)
  64. }
  65. return output[0], nil
  66. }
  67. // Currently the fid is not returned
  68. func (c *CassandraStore) Delete(fullFileName string) (err error) {
  69. if err := c.session.Query(
  70. `DELETE FROM seaweed_files WHERE path = ?`,
  71. fullFileName).Exec(); err != nil {
  72. if err != gocql.ErrNotFound {
  73. glog.V(0).Infof("Failed to delete file %s: %v", fullFileName, err)
  74. }
  75. return err
  76. }
  77. return nil
  78. }
  79. func (c *CassandraStore) Close() {
  80. if c.session != nil {
  81. c.session.Close()
  82. }
  83. }