Browse Source

filer: redis, redis cluster, cassandra support super large directory

pull/1699/head
Chris Lu 4 years ago
parent
commit
90d785a15f
  1. 6
      weed/command/scaffold.go
  2. 44
      weed/filer/cassandra/cassandra_store.go
  3. 8
      weed/filer/filerstore.go
  4. 4
      weed/filer/redis2/redis_cluster_store.go
  5. 4
      weed/filer/redis2/redis_store.go
  6. 32
      weed/filer/redis2/universal_redis_store.go

6
weed/command/scaffold.go

@ -138,12 +138,16 @@ hosts=[
]
username=""
password=""
# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
superLargeDirectories = []
[redis2]
enabled = false
address = "localhost:6379"
password = ""
database = 0
# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
superLargeDirectories = []
[redis_cluster2]
enabled = false
@ -160,6 +164,8 @@ password = ""
readOnly = true
# automatically use the closest Redis server for reads
routeByLatency = true
# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
superLargeDirectories = []
[etcd]
enabled = false

44
weed/filer/cassandra/cassandra_store.go

@ -16,8 +16,9 @@ func init() {
}
type CassandraStore struct {
cluster *gocql.ClusterConfig
session *gocql.Session
cluster *gocql.ClusterConfig
session *gocql.Session
superLargeDirectoryHash map[string]string
}
func (store *CassandraStore) GetName() string {
@ -30,10 +31,16 @@ func (store *CassandraStore) Initialize(configuration util.Configuration, prefix
configuration.GetStringSlice(prefix+"hosts"),
configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"),
configuration.GetStringSlice(prefix+"superLargeDirectories"),
)
}
func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string) (err error) {
func (store *CassandraStore) isSuperLargeDirectory(dir string) (dirHash string, isSuperLargeDirectory bool) {
dirHash, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
return
}
func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string) (err error) {
store.cluster = gocql.NewCluster(hosts...)
if username != "" && password != "" {
store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password}
@ -44,6 +51,19 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string, usernam
if err != nil {
glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
}
// set directory hash
store.superLargeDirectoryHash = make(map[string]string)
existingHash := make(map[string]string)
for _, dir := range superLargeDirectories {
// adding dir hash to avoid duplicated names
dirHash := util.Md5String([]byte(dir))[:4]
store.superLargeDirectoryHash[dir] = dirHash
if existingDir, found := existingHash[dirHash]; found {
glog.Fatalf("directory %s has the same hash as %s", dir, existingDir)
}
existingHash[dirHash] = dir
}
return
}
@ -60,6 +80,10 @@ func (store *CassandraStore) RollbackTransaction(ctx context.Context) error {
func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
dir, name := entry.FullPath.DirAndName()
if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
dir, name = dirHash+name, ""
}
meta, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
@ -86,6 +110,10 @@ func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer.Entry
func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
dir, name := fullpath.DirAndName()
if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
dir, name = dirHash+name, ""
}
var data []byte
if err := store.session.Query(
"SELECT meta FROM filemeta WHERE directory=? AND name=?",
@ -113,6 +141,9 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa
func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
dir, name := fullpath.DirAndName()
if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
dir, name = dirHash+name, ""
}
if err := store.session.Query(
"DELETE FROM filemeta WHERE directory=? AND name=?",
@ -124,6 +155,9 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.Full
}
func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
return nil // filer.ErrUnsupportedSuperLargeDirectoryListing
}
if err := store.session.Query(
"DELETE FROM filemeta WHERE directory=?",
@ -141,6 +175,10 @@ func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, f
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer.Entry, err error) {
if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
return // nil, filer.ErrUnsupportedSuperLargeDirectoryListing
}
cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
if inclusive {
cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"

8
weed/filer/filerstore.go

@ -7,10 +7,10 @@ import (
)
var (
ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
ErrKvNotImplemented = errors.New("kv not implemented yet")
ErrKvNotFound = errors.New("kv: not found")
ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing")
ErrKvNotImplemented = errors.New("kv not implemented yet")
ErrKvNotFound = errors.New("kv: not found")
)
type FilerStore interface {

4
weed/filer/redis2/redis_cluster_store.go

@ -28,15 +28,17 @@ func (store *RedisCluster2Store) Initialize(configuration util.Configuration, pr
configuration.GetString(prefix+"password"),
configuration.GetBool(prefix+"useReadOnly"),
configuration.GetBool(prefix+"routeByLatency"),
configuration.GetStringSlice(prefix+"superLargeDirectories"),
)
}
func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) {
store.Client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addresses,
Password: password,
ReadOnly: readOnly,
RouteByLatency: routeByLatency,
})
store.loadSuperLargeDirectories(superLargeDirectories)
return
}

4
weed/filer/redis2/redis_store.go

@ -23,14 +23,16 @@ func (store *Redis2Store) Initialize(configuration util.Configuration, prefix st
configuration.GetString(prefix+"address"),
configuration.GetString(prefix+"password"),
configuration.GetInt(prefix+"database"),
configuration.GetStringSlice(prefix+"superLargeDirectories"),
)
}
func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) {
func (store *Redis2Store) initialize(hostPort string, password string, database int, superLargeDirectories []string) (err error) {
store.Client = redis.NewClient(&redis.Options{
Addr: hostPort,
Password: password,
DB: database,
})
store.loadSuperLargeDirectories(superLargeDirectories)
return
}

32
weed/filer/redis2/universal_redis_store.go

@ -19,6 +19,27 @@ const (
type UniversalRedis2Store struct {
Client redis.UniversalClient
superLargeDirectoryHash map[string]string
}
func (store *UniversalRedis2Store) isSuperLargeDirectory(dir string) (dirHash string, isSuperLargeDirectory bool) {
dirHash, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
return
}
func (store *UniversalRedis2Store) loadSuperLargeDirectories(superLargeDirectories []string) {
// set directory hash
store.superLargeDirectoryHash = make(map[string]string)
existingHash := make(map[string]string)
for _, dir := range superLargeDirectories {
// adding dir hash to avoid duplicated names
dirHash := util.Md5String([]byte(dir))[:4]
store.superLargeDirectoryHash[dir] = dirHash
if existingDir, found := existingHash[dirHash]; found {
glog.Fatalf("directory %s has the same hash as %s", dir, existingDir)
}
existingHash[dirHash] = dir
}
}
func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
@ -47,6 +68,10 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
}
dir, name := entry.FullPath.DirAndName()
if _, found := store.isSuperLargeDirectory(dir); found {
return nil
}
if name != "" {
if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
@ -96,6 +121,9 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
}
dir, name := fullpath.DirAndName()
if _, found := store.isSuperLargeDirectory(dir); found {
return nil
}
if name != "" {
_, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result()
if err != nil {
@ -108,6 +136,10 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
if _, found := store.isSuperLargeDirectory(string(fullpath)); found {
return nil
}
members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)

Loading…
Cancel
Save