From 8f2e4be074a709d82096cac973b1b19fd3e83089 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 18 Sep 2021 14:04:30 -0700 Subject: [PATCH] wip --- weed/filer/redis3/kv_directory_children.go | 49 +++++ weed/filer/redis3/redis_cluster_store.go | 42 +++++ weed/filer/redis3/redis_store.go | 36 ++++ weed/filer/redis3/universal_redis_store.go | 175 ++++++++++++++++++ weed/filer/redis3/universal_redis_store_kv.go | 42 +++++ weed/util/bptree/bptree_store_test.go | 14 +- weed/util/bptree/serde.go | 10 + 7 files changed, 366 insertions(+), 2 deletions(-) create mode 100644 weed/filer/redis3/kv_directory_children.go create mode 100644 weed/filer/redis3/redis_cluster_store.go create mode 100644 weed/filer/redis3/redis_store.go create mode 100644 weed/filer/redis3/universal_redis_store.go create mode 100644 weed/filer/redis3/universal_redis_store_kv.go create mode 100644 weed/util/bptree/serde.go diff --git a/weed/filer/redis3/kv_directory_children.go b/weed/filer/redis3/kv_directory_children.go new file mode 100644 index 000000000..f3152c970 --- /dev/null +++ b/weed/filer/redis3/kv_directory_children.go @@ -0,0 +1,49 @@ +package redis3 + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/util/bptree" + "github.com/go-redis/redis/v8" + "github.com/golang/protobuf/proto" +) + +func insertChild(ctx context.Context, client redis.UniversalClient, key string, name string) error { + data, err := client.Get(ctx, key).Result() + if err != nil { + if err != redis.Nil { + return fmt.Errorf("read %s: %v", key, err) + } + } + rootNode := &bptree.ProtoNode{} + if err := proto.UnmarshalMerge([]byte(data), rootNode); err != nil { + return fmt.Errorf("decoding root for %s: %v", key, err) + } + tree := rootNode.ToBpTree() + tree.Add(bptree.String(name), nil) + return nil +} + +func removeChild(ctx context.Context, client redis.UniversalClient, key string, name string) error { + data, err := client.Get(ctx, key).Result() + if err != nil { + if err != redis.Nil { + return fmt.Errorf("read %s: %v", key, err) + } + } + rootNode := &bptree.ProtoNode{} + if err := proto.UnmarshalMerge([]byte(data), rootNode); err != nil { + return fmt.Errorf("decoding root for %s: %v", key, err) + } + tree := rootNode.ToBpTree() + tree.Add(bptree.String(name), nil) + return nil +} + +func removeChildren(ctx context.Context, client redis.UniversalClient, key string, onDeleteFn func(name string) error) error { + return nil +} + +func iterateChildren(ctx context.Context, client redis.UniversalClient, key string, eachFn func(name string) error) error { + return nil +} diff --git a/weed/filer/redis3/redis_cluster_store.go b/weed/filer/redis3/redis_cluster_store.go new file mode 100644 index 000000000..e0c620450 --- /dev/null +++ b/weed/filer/redis3/redis_cluster_store.go @@ -0,0 +1,42 @@ +package redis3 + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis/v8" +) + +func init() { + filer.Stores = append(filer.Stores, &RedisCluster3Store{}) +} + +type RedisCluster3Store struct { + UniversalRedis3Store +} + +func (store *RedisCluster3Store) GetName() string { + return "redis_cluster3" +} + +func (store *RedisCluster3Store) Initialize(configuration util.Configuration, prefix string) (err error) { + + configuration.SetDefault(prefix+"useReadOnly", false) + configuration.SetDefault(prefix+"routeByLatency", false) + + return store.initialize( + configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"password"), + configuration.GetBool(prefix+"useReadOnly"), + configuration.GetBool(prefix+"routeByLatency"), + ) +} + +func (store *RedisCluster3Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) { + store.Client = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addresses, + Password: password, + ReadOnly: readOnly, + RouteByLatency: routeByLatency, + }) + return +} diff --git a/weed/filer/redis3/redis_store.go b/weed/filer/redis3/redis_store.go new file mode 100644 index 000000000..fdbf994ec --- /dev/null +++ b/weed/filer/redis3/redis_store.go @@ -0,0 +1,36 @@ +package redis3 + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/go-redis/redis/v8" +) + +func init() { + filer.Stores = append(filer.Stores, &Redis3Store{}) +} + +type Redis3Store struct { + UniversalRedis3Store +} + +func (store *Redis3Store) GetName() string { + return "redis3" +} + +func (store *Redis3Store) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"address"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), + ) +} + +func (store *Redis3Store) initialize(hostPort string, password string, database int) (err error) { + store.Client = redis.NewClient(&redis.Options{ + Addr: hostPort, + Password: password, + DB: database, + }) + return +} diff --git a/weed/filer/redis3/universal_redis_store.go b/weed/filer/redis3/universal_redis_store.go new file mode 100644 index 000000000..958338afe --- /dev/null +++ b/weed/filer/redis3/universal_redis_store.go @@ -0,0 +1,175 @@ +package redis3 + +import ( + "context" + "fmt" + "time" + + "github.com/go-redis/redis/v8" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + DIR_LIST_MARKER = "\x00" +) + +type UniversalRedis3Store struct { + Client redis.UniversalClient +} + +func (store *UniversalRedis3Store) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *UniversalRedis3Store) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *UniversalRedis3Store) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + if len(entry.Chunks) > 50 { + value = util.MaybeGzipData(value) + } + + if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + dir, name := entry.FullPath.DirAndName() + + if name != "" { + if err = insertChild(ctx, store.Client, genDirectoryListKey(dir), name); err != nil { + return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) + } + } + + return nil +} + +func (store *UniversalRedis3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *UniversalRedis3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { + + data, err := store.Client.Get(ctx, string(fullpath)).Result() + if err == redis.Nil { + return nil, filer_pb.ErrNotFound + } + + if err != nil { + return nil, fmt.Errorf("get %s : %v", fullpath, err) + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *UniversalRedis3Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { + + _, err = store.Client.Del(ctx, genDirectoryListKey(string(fullpath))).Result() + if err != nil { + return fmt.Errorf("delete dir list %s : %v", fullpath, err) + } + + _, err = store.Client.Del(ctx, string(fullpath)).Result() + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + dir, name := fullpath.DirAndName() + + if name != "" { + if err = removeChild(ctx, store.Client, genDirectoryListKey(dir), name); err != nil { + return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err) + } + } + + return nil +} + +func (store *UniversalRedis3Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + + return removeChildren(ctx, store.Client, genDirectoryListKey(string(fullpath)), func(name string) error { + path := util.NewFullPath(string(fullpath), name) + _, err = store.Client.Del(ctx, string(path)).Result() + if err != nil { + return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err) + } + return nil + }) + +} + +func (store *UniversalRedis3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + + dirListKey := genDirectoryListKey(string(dirPath)) + start := int64(0) + if startFileName != "" { + start, _ = store.Client.ZRank(ctx, dirListKey, startFileName).Result() + if !includeStartFile { + start++ + } + } + members, err := store.Client.ZRange(ctx, dirListKey, start, start+int64(limit)-1).Result() + if err != nil { + return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) + } + + // fetch entry meta + for _, fileName := range members { + path := util.NewFullPath(string(dirPath), fileName) + entry, err := store.FindEntry(ctx, path) + lastFileName = fileName + if err != nil { + glog.V(0).Infof("list %s : %v", path, err) + if err == filer_pb.ErrNotFound { + continue + } + } else { + if entry.TtlSec > 0 { + if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + store.Client.Del(ctx, string(path)).Result() + store.Client.ZRem(ctx, dirListKey, fileName).Result() + continue + } + } + if !eachEntryFunc(entry) { + break + } + } + } + + return lastFileName, err +} + +func genDirectoryListKey(dir string) (dirList string) { + return dir + DIR_LIST_MARKER +} + +func (store *UniversalRedis3Store) Shutdown() { + store.Client.Close() +} diff --git a/weed/filer/redis3/universal_redis_store_kv.go b/weed/filer/redis3/universal_redis_store_kv.go new file mode 100644 index 000000000..a9c440a37 --- /dev/null +++ b/weed/filer/redis3/universal_redis_store_kv.go @@ -0,0 +1,42 @@ +package redis3 + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/go-redis/redis/v8" +) + +func (store *UniversalRedis3Store) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + + _, err = store.Client.Set(ctx, string(key), value, 0).Result() + + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + + return nil +} + +func (store *UniversalRedis3Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + + data, err := store.Client.Get(ctx, string(key)).Result() + + if err == redis.Nil { + return nil, filer.ErrKvNotFound + } + + return []byte(data), err +} + +func (store *UniversalRedis3Store) KvDelete(ctx context.Context, key []byte) (err error) { + + _, err = store.Client.Del(ctx, string(key)).Result() + + if err != nil { + return fmt.Errorf("kv delete: %v", err) + } + + return nil +} diff --git a/weed/util/bptree/bptree_store_test.go b/weed/util/bptree/bptree_store_test.go index 6ed4abca8..82dcbbf55 100644 --- a/weed/util/bptree/bptree_store_test.go +++ b/weed/util/bptree/bptree_store_test.go @@ -15,11 +15,21 @@ func TestAddRemove(t *testing.T) { println("delete", node.protoNodeId) return nil } - for i:=0;i<1024;i++{ + for i:=0;i<32;i++{ println("++++++++++", i) - tree.Add(String(fmt.Sprintf("%02d", i)), String(fmt.Sprintf("%02d", i))) + tree.Add(String(fmt.Sprintf("%02d", i)), nil) printTree(tree.root, "") } + + if !tree.Has(String("30")) { + t.Errorf("lookup error") + } + tree.RemoveWhere(String("30"), func(value ItemValue) bool { + return true + }) + if tree.Has(String("30")) { + t.Errorf("remove error") + } } func printTree(node *BpNode, prefix string) { diff --git a/weed/util/bptree/serde.go b/weed/util/bptree/serde.go new file mode 100644 index 000000000..2a98a774a --- /dev/null +++ b/weed/util/bptree/serde.go @@ -0,0 +1,10 @@ +package bptree + +func (protoNode *ProtoNode) ToBpTree() *BpTree { + node := protoNode.ToBpNode() + return &BpTree{root: node} +} + +func (protoNode *ProtoNode) ToBpNode() *BpNode { + return nil +} \ No newline at end of file