From 90d785a15fb8c0379e068aab94066cf5e3622071 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 22 Dec 2020 02:26:05 -0800 Subject: [PATCH] filer: redis, redis cluster, cassandra support super large directory --- weed/command/scaffold.go | 6 +++ weed/filer/cassandra/cassandra_store.go | 44 ++++++++++++++++++++-- weed/filer/filerstore.go | 8 ++-- weed/filer/redis2/redis_cluster_store.go | 4 +- weed/filer/redis2/redis_store.go | 4 +- weed/filer/redis2/universal_redis_store.go | 32 ++++++++++++++++ 6 files changed, 89 insertions(+), 9 deletions(-) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 1ab763004..04a988027 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -138,12 +138,16 @@ hosts=[ ] username="" password="" +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] [redis2] enabled = false address = "localhost:6379" password = "" database = 0 +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] [redis_cluster2] enabled = false @@ -160,6 +164,8 @@ password = "" readOnly = true # automatically use the closest Redis server for reads routeByLatency = true +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] [etcd] enabled = false diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go index ae8cb7a86..49f5625d9 100644 --- a/weed/filer/cassandra/cassandra_store.go +++ b/weed/filer/cassandra/cassandra_store.go @@ -16,8 +16,9 @@ func init() { } type CassandraStore struct { - cluster *gocql.ClusterConfig - session *gocql.Session + cluster *gocql.ClusterConfig + session *gocql.Session + superLargeDirectoryHash map[string]string } func (store *CassandraStore) GetName() string { @@ -30,10 +31,16 @@ func (store *CassandraStore) Initialize(configuration util.Configuration, prefix configuration.GetStringSlice(prefix+"hosts"), configuration.GetString(prefix+"username"), configuration.GetString(prefix+"password"), + configuration.GetStringSlice(prefix+"superLargeDirectories"), ) } -func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string) (err error) { +func (store *CassandraStore) isSuperLargeDirectory(dir string) (dirHash string, isSuperLargeDirectory bool) { + dirHash, isSuperLargeDirectory = store.superLargeDirectoryHash[dir] + return +} + +func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string) (err error) { store.cluster = gocql.NewCluster(hosts...) if username != "" && password != "" { store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password} @@ -44,6 +51,19 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string, usernam if err != nil { glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace) } + + // set directory hash + store.superLargeDirectoryHash = make(map[string]string) + existingHash := make(map[string]string) + for _, dir := range superLargeDirectories { + // adding dir hash to avoid duplicated names + dirHash := util.Md5String([]byte(dir))[:4] + store.superLargeDirectoryHash[dir] = dirHash + if existingDir, found := existingHash[dirHash]; found { + glog.Fatalf("directory %s has the same hash as %s", dir, existingDir) + } + existingHash[dirHash] = dir + } return } @@ -60,6 +80,10 @@ func (store *CassandraStore) RollbackTransaction(ctx context.Context) error { func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { dir, name := entry.FullPath.DirAndName() + if dirHash, ok := store.isSuperLargeDirectory(dir); ok { + dir, name = dirHash+name, "" + } + meta, err := entry.EncodeAttributesAndChunks() if err != nil { return fmt.Errorf("encode %s: %s", entry.FullPath, err) @@ -86,6 +110,10 @@ func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer.Entry func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { dir, name := fullpath.DirAndName() + if dirHash, ok := store.isSuperLargeDirectory(dir); ok { + dir, name = dirHash+name, "" + } + var data []byte if err := store.session.Query( "SELECT meta FROM filemeta WHERE directory=? AND name=?", @@ -113,6 +141,9 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { dir, name := fullpath.DirAndName() + if dirHash, ok := store.isSuperLargeDirectory(dir); ok { + dir, name = dirHash+name, "" + } if err := store.session.Query( "DELETE FROM filemeta WHERE directory=? AND name=?", @@ -124,6 +155,9 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.Full } func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { + if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok { + return nil // filer.ErrUnsupportedSuperLargeDirectoryListing + } if err := store.session.Query( "DELETE FROM filemeta WHERE directory=?", @@ -141,6 +175,10 @@ func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, f func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) { + if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok { + return // nil, filer.ErrUnsupportedSuperLargeDirectoryListing + } + cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?" if inclusive { cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?" diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index b5a03c1d1..f1e6c6c35 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -7,10 +7,10 @@ import ( ) var ( - ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing") - ErrKvNotImplemented = errors.New("kv not implemented yet") - ErrKvNotFound = errors.New("kv: not found") - + ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing") + ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing") + ErrKvNotImplemented = errors.New("kv not implemented yet") + ErrKvNotFound = errors.New("kv: not found") ) type FilerStore interface { diff --git a/weed/filer/redis2/redis_cluster_store.go b/weed/filer/redis2/redis_cluster_store.go index d155dbe88..c7742bb19 100644 --- a/weed/filer/redis2/redis_cluster_store.go +++ b/weed/filer/redis2/redis_cluster_store.go @@ -28,15 +28,17 @@ func (store *RedisCluster2Store) Initialize(configuration util.Configuration, pr configuration.GetString(prefix+"password"), configuration.GetBool(prefix+"useReadOnly"), configuration.GetBool(prefix+"routeByLatency"), + configuration.GetStringSlice(prefix+"superLargeDirectories"), ) } -func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) { +func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) { store.Client = redis.NewClusterClient(&redis.ClusterOptions{ Addrs: addresses, Password: password, ReadOnly: readOnly, RouteByLatency: routeByLatency, }) + store.loadSuperLargeDirectories(superLargeDirectories) return } diff --git a/weed/filer/redis2/redis_store.go b/weed/filer/redis2/redis_store.go index ed04c817b..da404ed4c 100644 --- a/weed/filer/redis2/redis_store.go +++ b/weed/filer/redis2/redis_store.go @@ -23,14 +23,16 @@ func (store *Redis2Store) Initialize(configuration util.Configuration, prefix st configuration.GetString(prefix+"address"), configuration.GetString(prefix+"password"), configuration.GetInt(prefix+"database"), + configuration.GetStringSlice(prefix+"superLargeDirectories"), ) } -func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) { +func (store *Redis2Store) initialize(hostPort string, password string, database int, superLargeDirectories []string) (err error) { store.Client = redis.NewClient(&redis.Options{ Addr: hostPort, Password: password, DB: database, }) + store.loadSuperLargeDirectories(superLargeDirectories) return } diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go index 0374314c0..c6b566305 100644 --- a/weed/filer/redis2/universal_redis_store.go +++ b/weed/filer/redis2/universal_redis_store.go @@ -19,6 +19,27 @@ const ( type UniversalRedis2Store struct { Client redis.UniversalClient + superLargeDirectoryHash map[string]string +} + +func (store *UniversalRedis2Store) isSuperLargeDirectory(dir string) (dirHash string, isSuperLargeDirectory bool) { + dirHash, isSuperLargeDirectory = store.superLargeDirectoryHash[dir] + return +} + +func (store *UniversalRedis2Store) loadSuperLargeDirectories(superLargeDirectories []string) { + // set directory hash + store.superLargeDirectoryHash = make(map[string]string) + existingHash := make(map[string]string) + for _, dir := range superLargeDirectories { + // adding dir hash to avoid duplicated names + dirHash := util.Md5String([]byte(dir))[:4] + store.superLargeDirectoryHash[dir] = dirHash + if existingDir, found := existingHash[dirHash]; found { + glog.Fatalf("directory %s has the same hash as %s", dir, existingDir) + } + existingHash[dirHash] = dir + } } func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) { @@ -47,6 +68,10 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer } dir, name := entry.FullPath.DirAndName() + if _, found := store.isSuperLargeDirectory(dir); found { + return nil + } + if name != "" { if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil { return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) @@ -96,6 +121,9 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti } dir, name := fullpath.DirAndName() + if _, found := store.isSuperLargeDirectory(dir); found { + return nil + } if name != "" { _, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result() if err != nil { @@ -108,6 +136,10 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + if _, found := store.isSuperLargeDirectory(string(fullpath)); found { + return nil + } + members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result() if err != nil { return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)