Browse Source

Batch and pipeline multiple Redis calls together

pull/4508/head
Patrick Schmidt 2 years ago
parent
commit
5ee984b338
No known key found for this signature in database GPG Key ID: A2237A6FDD420655
  1. 251
      weed/filer/redis2/universal_redis_store.go

251
weed/filer/redis2/universal_redis_store.go

@ -14,7 +14,7 @@ import (
)
const (
DIR_LIST_MARKER = "\x00"
DirListMarker = "\x00"
)
type UniversalRedis2Store struct {
@ -38,104 +38,156 @@ func (store *UniversalRedis2Store) loadSuperLargeDirectories(superLargeDirectori
func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
return ctx, nil
}
func (store *UniversalRedis2Store) CommitTransaction(ctx context.Context) error {
func (store *UniversalRedis2Store) CommitTransaction(context.Context) error {
return nil
}
func (store *UniversalRedis2Store) RollbackTransaction(ctx context.Context) error {
func (store *UniversalRedis2Store) RollbackTransaction(context.Context) error {
return nil
}
func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
var insertCmd *redis.StatusCmd
var addCmd *redis.IntCmd
if err = store.doInsertEntry(ctx, entry); err != nil {
return err
}
_, err = store.Client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
insertCmd, err = store.insertOrUpdate(ctx, pipe, entry)
if err != nil {
return err
}
dir, name := entry.FullPath.DirAndName()
if store.isSuperLargeDirectory(dir) {
return nil
}
dir, name := entry.FullPath.DirAndName()
if store.isSuperLargeDirectory(dir) {
return nil
}
if name != "" {
if err = store.Client.ZAddNX(ctx, genDirectoryListKey(dir), &redis.Z{Score: 0, Member: name}).Err(); err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
if name != "" {
addCmd = pipe.ZAddNX(ctx, genDirectoryListKey(dir), &redis.Z{Score: 0, Member: name})
}
return err
})
if err != nil {
return err
}
if insertCmd.Err() != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
if addCmd.Err() != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
return nil
}
func (store *UniversalRedis2Store) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
value, err := entry.EncodeAttributesAndChunks()
func (store *UniversalRedis2Store) insertOrUpdate(ctx context.Context, client redis.Cmdable, entry *filer.Entry) (*redis.StatusCmd, error) {
data, err := encodeEntry(entry)
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
return nil, fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
// Don't use Redis' native TTL support because we need to clean the entry from its parent directory set as well
return client.Set(ctx, string(entry.FullPath), data, 0), nil
}
if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
cmd, err := store.insertOrUpdate(ctx, store.Client, entry)
if err != nil {
return err
}
return nil
if cmd.Err() != nil {
err = fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
return err
}
func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
res, err := store.Client.Get(ctx, string(fullpath)).Result()
if err == redis.Nil {
return nil, filer_pb.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
return store.doInsertEntry(ctx, entry)
return store.dataToEntry(res, fullpath)
}
func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
func (store *UniversalRedis2Store) FindEntries(ctx context.Context, fullpaths []util.FullPath) ([]*filer.Entry, error) {
paths := make([]string, len(fullpaths))
for i, fullpath := range fullpaths {
paths[i] = string(fullpath)
}
data, err := store.Client.Get(ctx, string(fullpath)).Result()
if err == redis.Nil {
return nil, filer_pb.ErrNotFound
cmds := make([]*redis.StringCmd, len(fullpaths))
_, err := store.Client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
for i, fullpath := range fullpaths {
cmds[i] = pipe.Get(ctx, string(fullpath))
}
return nil
})
if err != nil && err != redis.Nil {
return nil, fmt.Errorf("get %s : %w", fullpaths, err)
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", fullpath, err)
// Convert results to filer entries
entries := make([]*filer.Entry, 0, len(fullpaths))
for i, cmd := range cmds {
if cmd.Err() == redis.Nil {
glog.V(0).Infof("get %s : %v", fullpaths[i], filer_pb.ErrNotFound)
continue
}
entry, err := store.dataToEntry(cmd.Val(), fullpaths[i])
if err != nil {
return nil, fmt.Errorf("get %s : %w", fullpaths[i], err)
}
entries = append(entries, entry)
}
return entries, nil
}
func (store *UniversalRedis2Store) dataToEntry(data string, fullpath util.FullPath) (entry *filer.Entry, err error) {
entry = &filer.Entry{
FullPath: fullpath,
}
err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
return entry, nil
return
}
func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
cmds := make([]*redis.IntCmd, 3)
_, err := store.Client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
cmds[0] = pipe.Del(ctx, genDirectoryListKey(string(fullpath)))
cmds[1] = pipe.Del(ctx, string(fullpath))
_, err = store.Client.Del(ctx, genDirectoryListKey(string(fullpath))).Result()
if err != nil {
return fmt.Errorf("delete dir list %s : %v", fullpath, err)
}
dir, name := fullpath.DirAndName()
if store.isSuperLargeDirectory(dir) {
return nil
}
_, err = store.Client.Del(ctx, string(fullpath)).Result()
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
if name != "" {
cmds[2] = pipe.ZRem(ctx, genDirectoryListKey(dir), name)
}
return nil
})
if err != nil && err != redis.Nil {
return fmt.Errorf("delete entry %s : %v", fullpath, err)
}
dir, name := fullpath.DirAndName()
if store.isSuperLargeDirectory(dir) {
return nil
if cmds[0].Err() != nil && cmds[0].Err() != redis.Nil {
return fmt.Errorf("delete dir list %s : %v", fullpath, err)
}
if name != "" {
_, err = store.Client.ZRem(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err)
}
if cmds[1].Err() != nil && cmds[1].Err() != redis.Nil {
return fmt.Errorf("delete entry %s : %v", fullpath, err)
}
if cmds[2] != nil && cmds[2].Err() != nil && cmds[2].Err() != redis.Nil {
return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err)
}
return nil
}
func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
if store.isSuperLargeDirectory(string(fullpath)) {
return nil
}
@ -147,21 +199,33 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
}
if len(members) == 0 {
return
}
for _, fileName := range members {
path := util.NewFullPath(string(fullpath), fileName)
_, err = store.Client.Del(ctx, string(path)).Result()
if err != nil {
cmds := make([]*redis.IntCmd, len(members)*2)
_, err = store.Client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
for i, fileName := range members {
path := string(util.NewFullPath(string(fullpath), fileName))
cmds[i*2] = pipe.Del(ctx, path)
// not efficient, but need to remove if it is a directory
cmds[i*2+1] = pipe.Del(ctx, genDirectoryListKey(path))
}
return nil
})
if err != nil && err != redis.Nil {
return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err)
}
for _, cmd := range cmds {
if cmd.Err() != nil && cmd.Err() != redis.Nil {
return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err)
}
// not efficient, but need to remove if it is a directory
store.Client.Del(ctx, genDirectoryListKey(string(path)))
}
return nil
return
}
func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(context.Context, util.FullPath, string, bool, int64, string, filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
@ -187,38 +251,65 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dir
if err != nil {
return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
if len(members) == 0 {
return lastFileName, nil
}
// fetch entry meta
var paths []util.FullPath
for _, fileName := range members {
path := util.NewFullPath(string(dirPath), fileName)
entry, err := store.FindEntry(ctx, path)
lastFileName = fileName
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
if err == filer_pb.ErrNotFound {
continue
}
paths = append(paths, path)
}
entries, err := store.FindEntries(ctx, paths)
if err != nil {
glog.V(0).Infof("list %s : %v", dirPath, err)
return lastFileName, err
}
// Clean expired keys
var expiredEntries []*filer.Entry
for _, entry := range entries {
if entry.TtlSec > 0 && entry.Attr.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) {
expiredEntries = append(expiredEntries, entry)
} else {
if entry.TtlSec > 0 {
if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
store.Client.Del(ctx, string(path)).Result()
store.Client.ZRem(ctx, dirListKey, fileName).Result()
continue
}
}
lastFileName = entry.Name()
if !eachEntryFunc(entry) {
break
}
}
}
if len(expiredEntries) == 0 {
return lastFileName, nil
}
return lastFileName, err
_, _ = store.Client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
for _, entry := range expiredEntries {
pipe.Del(ctx, string(entry.FullPath))
pipe.ZRem(ctx, dirListKey, entry.Name())
}
return nil
})
return lastFileName, nil
}
func (store *UniversalRedis2Store) Shutdown() {
_ = store.Client.Close()
}
func genDirectoryListKey(dir string) (dirList string) {
return dir + DIR_LIST_MARKER
return dir + DirListMarker
}
func (store *UniversalRedis2Store) Shutdown() {
store.Client.Close()
func encodeEntry(entry *filer.Entry) ([]byte, error) {
data, err := entry.EncodeAttributesAndChunks()
if err != nil {
return nil, fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
data = util.MaybeGzipData(data)
}
return data, nil
}
Loading…
Cancel
Save