From ab86d263c91d496e2659f494ace9749392f157a4 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 27 May 2018 00:01:15 -0700 Subject: [PATCH] add Cassandra --- weed/filer2/cassandra/README.txt | 12 ++ weed/filer2/cassandra/cassandra_store.go | 136 +++++++++++++++++++++++ weed/filer2/configuration.go | 13 +++ weed/server/filer_server.go | 1 + 4 files changed, 162 insertions(+) create mode 100644 weed/filer2/cassandra/README.txt create mode 100644 weed/filer2/cassandra/cassandra_store.go diff --git a/weed/filer2/cassandra/README.txt b/weed/filer2/cassandra/README.txt new file mode 100644 index 000000000..2d176229f --- /dev/null +++ b/weed/filer2/cassandra/README.txt @@ -0,0 +1,12 @@ +1. create a keyspace + +CREATE KEYSPACE seaweedfs WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1}; + +2. create filemeta table + + CREATE TABLE filemeta ( + directory varchar, + name varchar, + meta blob, + PRIMARY KEY (directory, name) + ) WITH CLUSTERING ORDER BY (name ASC); diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go new file mode 100644 index 000000000..316b6cce0 --- /dev/null +++ b/weed/filer2/cassandra/cassandra_store.go @@ -0,0 +1,136 @@ +package cassandra + +import ( + "fmt" + "github.com/gocql/gocql" + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/spf13/viper" +) + +func init() { + filer2.Stores = append(filer2.Stores, &CassandraStore{}) +} + +type CassandraStore struct { + cluster *gocql.ClusterConfig + session *gocql.Session +} + +func (store *CassandraStore) GetName() string { + return "cassandra" +} + +func (store *CassandraStore) Initialize(viper *viper.Viper) (err error) { + return store.initialize( + viper.GetString("keyspace"), + viper.GetStringSlice("hosts"), + ) +} + +func (store *CassandraStore) initialize(keyspace string, hosts []string) (err error) { + store.cluster = gocql.NewCluster(hosts...) + store.cluster.Keyspace = keyspace + store.cluster.Consistency = gocql.LocalQuorum + store.session, err = store.cluster.CreateSession() + if err != nil { + glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace) + } + return +} + +func (store *CassandraStore) InsertEntry(entry *filer2.Entry) (err error) { + + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + if err := store.session.Query( + "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?)", + dir, name, meta).Exec(); err != nil { + return fmt.Errorf("insert %s: %s", entry.FullPath, err) + } + + return nil +} + +func (store *CassandraStore) UpdateEntry(entry *filer2.Entry) (err error) { + + return store.InsertEntry(entry) +} + +func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) { + + dir, name := fullpath.DirAndName() + var data []byte + if err := store.session.Query( + "SELECT meta FROM filemeta WHERE directory=? AND name=?", + dir, name).Consistency(gocql.One).Scan(&data); err != nil { + if err != gocql.ErrNotFound { + return nil, fmt.Errorf("read entry %s: %v", fullpath, err) + } + } + + if len(data) == 0 { + return nil, fmt.Errorf("not found: %s", fullpath) + } + + entry = &filer2.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(data) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) { + + entry, err = store.FindEntry(fullpath) + if err != nil { + return nil, nil + } + + dir, name := fullpath.DirAndName() + + if err := store.session.Query( + "DELETE FROM filemeta WHERE directory=? AND name=?", + dir, name).Exec(); err != nil { + return entry, fmt.Errorf("delete %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *CassandraStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, + limit int) (entries []*filer2.Entry, err error) { + + cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?" + if inclusive { + cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?" + } + + var data []byte + var name string + iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter() + for iter.Scan(&name, &data) { + entry := &filer2.Entry{ + FullPath: filer2.NewFullPath(string(fullpath), name), + } + if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + entries = append(entries, entry) + } + if err := iter.Close(); err != nil { + glog.V(0).Infof("list iterator close: %v", err) + } + + return entries, err +} diff --git a/weed/filer2/configuration.go b/weed/filer2/configuration.go index 91a0d6f46..0112a883b 100644 --- a/weed/filer2/configuration.go +++ b/weed/filer2/configuration.go @@ -61,6 +61,19 @@ sslmode = "disable" connection_max_idle = 100 connection_max_open = 100 +[cassandra] +# CREATE TABLE filemeta ( +# directory varchar, +# name varchar, +# meta blob, +# PRIMARY KEY (directory, name) +# ) WITH CLUSTERING ORDER BY (name ASC); +enabled = false +keyspace="seaweedfs" +hosts=[ + "localhost:9042", +] + ` ) diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 4517733ae..6c8a2c079 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -12,6 +12,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/filer2" + _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb" _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"