diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go index 6b0e65c3d..1d16c8169 100644 --- a/weed/filer/redis2/universal_redis_store.go +++ b/weed/filer/redis2/universal_redis_store.go @@ -14,7 +14,7 @@ import ( ) const ( - DIR_LIST_MARKER = "\x00" + DirListMarker = "\x00" ) type UniversalRedis2Store struct { @@ -38,104 +38,156 @@ func (store *UniversalRedis2Store) loadSuperLargeDirectories(superLargeDirectori func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) { return ctx, nil } -func (store *UniversalRedis2Store) CommitTransaction(ctx context.Context) error { + +func (store *UniversalRedis2Store) CommitTransaction(context.Context) error { return nil } -func (store *UniversalRedis2Store) RollbackTransaction(ctx context.Context) error { + +func (store *UniversalRedis2Store) RollbackTransaction(context.Context) error { return nil } func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + var insertCmd *redis.StatusCmd + var addCmd *redis.IntCmd - if err = store.doInsertEntry(ctx, entry); err != nil { - return err - } + _, err = store.Client.Pipelined(ctx, func(pipe redis.Pipeliner) error { + insertCmd, err = store.insertOrUpdate(ctx, pipe, entry) + if err != nil { + return err + } - dir, name := entry.FullPath.DirAndName() - if store.isSuperLargeDirectory(dir) { - return nil - } + dir, name := entry.FullPath.DirAndName() + if store.isSuperLargeDirectory(dir) { + return nil + } - if name != "" { - if err = store.Client.ZAddNX(ctx, genDirectoryListKey(dir), &redis.Z{Score: 0, Member: name}).Err(); err != nil { - return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) + if name != "" { + addCmd = pipe.ZAddNX(ctx, genDirectoryListKey(dir), &redis.Z{Score: 0, Member: name}) } + return err + }) + if err != nil { + return err + } + if insertCmd.Err() != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + if addCmd.Err() != nil { + return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) } - return nil } -func (store *UniversalRedis2Store) doInsertEntry(ctx context.Context, entry *filer.Entry) error { - value, err := entry.EncodeAttributesAndChunks() +func (store *UniversalRedis2Store) insertOrUpdate(ctx context.Context, client redis.Cmdable, entry *filer.Entry) (*redis.StatusCmd, error) { + data, err := encodeEntry(entry) if err != nil { - return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + return nil, fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if len(entry.GetChunks()) > filer.CountEntryChunksForGzip { - value = util.MaybeGzipData(value) - } + // Don't use Redis' native TTL support because we need to clean the entry from its parent directory set as well + return client.Set(ctx, string(entry.FullPath), data, 0), nil +} - if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil { - return fmt.Errorf("persisting %s : %v", entry.FullPath, err) +func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer.Entry) error { + cmd, err := store.insertOrUpdate(ctx, store.Client, entry) + if err != nil { + return err } - return nil + if cmd.Err() != nil { + err = fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + return err } -func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { +func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) { + res, err := store.Client.Get(ctx, string(fullpath)).Result() + if err == redis.Nil { + return nil, filer_pb.ErrNotFound + } + if err != nil { + return nil, fmt.Errorf("get %s : %v", fullpath, err) + } - return store.doInsertEntry(ctx, entry) + return store.dataToEntry(res, fullpath) } -func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { +func (store *UniversalRedis2Store) FindEntries(ctx context.Context, fullpaths []util.FullPath) ([]*filer.Entry, error) { + paths := make([]string, len(fullpaths)) + for i, fullpath := range fullpaths { + paths[i] = string(fullpath) + } - data, err := store.Client.Get(ctx, string(fullpath)).Result() - if err == redis.Nil { - return nil, filer_pb.ErrNotFound + cmds := make([]*redis.StringCmd, len(fullpaths)) + _, err := store.Client.Pipelined(ctx, func(pipe redis.Pipeliner) error { + for i, fullpath := range fullpaths { + cmds[i] = pipe.Get(ctx, string(fullpath)) + } + return nil + }) + if err != nil && err != redis.Nil { + return nil, fmt.Errorf("get %s : %w", fullpaths, err) } - if err != nil { - return nil, fmt.Errorf("get %s : %v", fullpath, err) + // Convert results to filer entries + entries := make([]*filer.Entry, 0, len(fullpaths)) + for i, cmd := range cmds { + if cmd.Err() == redis.Nil { + glog.V(0).Infof("get %s : %v", fullpaths[i], filer_pb.ErrNotFound) + continue + } + + entry, err := store.dataToEntry(cmd.Val(), fullpaths[i]) + if err != nil { + return nil, fmt.Errorf("get %s : %w", fullpaths[i], err) + } + entries = append(entries, entry) } + return entries, nil +} + +func (store *UniversalRedis2Store) dataToEntry(data string, fullpath util.FullPath) (entry *filer.Entry, err error) { entry = &filer.Entry{ FullPath: fullpath, } err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))) - if err != nil { - return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) - } - - return entry, nil + return } -func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { +func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { + cmds := make([]*redis.IntCmd, 3) + _, err := store.Client.Pipelined(ctx, func(pipe redis.Pipeliner) error { + cmds[0] = pipe.Del(ctx, genDirectoryListKey(string(fullpath))) + cmds[1] = pipe.Del(ctx, string(fullpath)) - _, err = store.Client.Del(ctx, genDirectoryListKey(string(fullpath))).Result() - if err != nil { - return fmt.Errorf("delete dir list %s : %v", fullpath, err) - } + dir, name := fullpath.DirAndName() + if store.isSuperLargeDirectory(dir) { + return nil + } - _, err = store.Client.Del(ctx, string(fullpath)).Result() - if err != nil { - return fmt.Errorf("delete %s : %v", fullpath, err) + if name != "" { + cmds[2] = pipe.ZRem(ctx, genDirectoryListKey(dir), name) + } + return nil + }) + if err != nil && err != redis.Nil { + return fmt.Errorf("delete entry %s : %v", fullpath, err) } - dir, name := fullpath.DirAndName() - if store.isSuperLargeDirectory(dir) { - return nil + if cmds[0].Err() != nil && cmds[0].Err() != redis.Nil { + return fmt.Errorf("delete dir list %s : %v", fullpath, err) } - if name != "" { - _, err = store.Client.ZRem(ctx, genDirectoryListKey(dir), name).Result() - if err != nil { - return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err) - } + if cmds[1].Err() != nil && cmds[1].Err() != redis.Nil { + return fmt.Errorf("delete entry %s : %v", fullpath, err) + } + if cmds[2] != nil && cmds[2].Err() != nil && cmds[2].Err() != redis.Nil { + return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err) } - return nil } func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { - if store.isSuperLargeDirectory(string(fullpath)) { return nil } @@ -147,21 +199,33 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful if err != nil { return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err) } + if len(members) == 0 { + return + } - for _, fileName := range members { - path := util.NewFullPath(string(fullpath), fileName) - _, err = store.Client.Del(ctx, string(path)).Result() - if err != nil { + cmds := make([]*redis.IntCmd, len(members)*2) + _, err = store.Client.Pipelined(ctx, func(pipe redis.Pipeliner) error { + for i, fileName := range members { + path := string(util.NewFullPath(string(fullpath), fileName)) + cmds[i*2] = pipe.Del(ctx, path) + // not efficient, but need to remove if it is a directory + cmds[i*2+1] = pipe.Del(ctx, genDirectoryListKey(path)) + } + return nil + }) + if err != nil && err != redis.Nil { + return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err) + } + for _, cmd := range cmds { + if cmd.Err() != nil && cmd.Err() != redis.Nil { return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err) } - // not efficient, but need to remove if it is a directory - store.Client.Del(ctx, genDirectoryListKey(string(path))) } - return nil + return } -func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { +func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(context.Context, util.FullPath, string, bool, int64, string, filer.ListEachEntryFunc) (lastFileName string, err error) { return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed } @@ -187,38 +251,65 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dir if err != nil { return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) } + if len(members) == 0 { + return lastFileName, nil + } // fetch entry meta + var paths []util.FullPath for _, fileName := range members { path := util.NewFullPath(string(dirPath), fileName) - entry, err := store.FindEntry(ctx, path) - lastFileName = fileName - if err != nil { - glog.V(0).Infof("list %s : %v", path, err) - if err == filer_pb.ErrNotFound { - continue - } + paths = append(paths, path) + } + + entries, err := store.FindEntries(ctx, paths) + if err != nil { + glog.V(0).Infof("list %s : %v", dirPath, err) + return lastFileName, err + } + + // Clean expired keys + var expiredEntries []*filer.Entry + for _, entry := range entries { + if entry.TtlSec > 0 && entry.Attr.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) { + expiredEntries = append(expiredEntries, entry) } else { - if entry.TtlSec > 0 { - if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { - store.Client.Del(ctx, string(path)).Result() - store.Client.ZRem(ctx, dirListKey, fileName).Result() - continue - } - } + lastFileName = entry.Name() if !eachEntryFunc(entry) { break } } } + if len(expiredEntries) == 0 { + return lastFileName, nil + } - return lastFileName, err + _, _ = store.Client.Pipelined(ctx, func(pipe redis.Pipeliner) error { + for _, entry := range expiredEntries { + pipe.Del(ctx, string(entry.FullPath)) + pipe.ZRem(ctx, dirListKey, entry.Name()) + } + return nil + }) + return lastFileName, nil +} + +func (store *UniversalRedis2Store) Shutdown() { + _ = store.Client.Close() } func genDirectoryListKey(dir string) (dirList string) { - return dir + DIR_LIST_MARKER + return dir + DirListMarker } -func (store *UniversalRedis2Store) Shutdown() { - store.Client.Close() +func encodeEntry(entry *filer.Entry) ([]byte, error) { + data, err := entry.EncodeAttributesAndChunks() + if err != nil { + return nil, fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + if len(entry.GetChunks()) > filer.CountEntryChunksForGzip { + data = util.MaybeGzipData(data) + } + return data, nil }