Browse Source

Add readdir policies

pull/1224/head
Antonio SJ Musumeci 10 months ago
parent
commit
c92a10021e
  1. 49
      README.md
  2. 86
      man/mergerfs.1
  3. 4
      src/config.cpp
  4. 6
      src/config.hpp
  5. 13
      src/fs_devid.hpp
  6. 4
      src/fs_dirfd.hpp
  7. 13
      src/fs_inode.cpp
  8. 6
      src/fs_inode.hpp
  9. 75
      src/fuse_readdir.cpp
  10. 33
      src/fuse_readdir.hpp
  11. 34
      src/fuse_readdir_base.hpp
  12. 194
      src/fuse_readdir_cor.cpp
  13. 48
      src/fuse_readdir_cor.hpp
  14. 173
      src/fuse_readdir_cosr.cpp
  15. 39
      src/fuse_readdir_cosr.hpp
  16. 75
      src/fuse_readdir_factory.cpp
  17. 18
      src/fuse_readdir_factory.hpp
  18. 30
      src/fuse_readdir_plus.cpp
  19. 43
      src/fuse_readdir_seq.cpp
  20. 19
      src/fuse_readdir_seq.hpp
  21. 161
      src/unbounded_queue.hpp
  22. 124
      src/unbounded_thread_pool.hpp

49
README.md

@ -256,6 +256,9 @@ These options are the same regardless of whether you use them with the
concatenated together with the longest common prefix removed.
* **func.FUNC=POLICY**: Sets the specific FUSE function's policy. See
below for the list of value types. Example: **func.getattr=newest**
* **func.readdir=seq|cosr|cor|cosr:INT|cor:INT**: Sets `readdir`
policy. INT value sets the number of threads to use for
concurrency. (default: seq)
* **category.action=POLICY**: Sets policy of all FUSE functions in the
action category. (default: epall)
* **category.create=POLICY**: Sets policy of all FUSE functions in the
@ -682,9 +685,8 @@ rather than file paths, which were created by `open` or `create`. That
said many times the current FUSE kernel driver will not always provide
the file handle when a client calls `fgetattr`, `fchown`, `fchmod`,
`futimens`, `ftruncate`, etc. This means it will call the regular,
path based, versions. `readdir` has no real need for a policy given
the purpose is merely to return a list of entries in a
directory. `statfs`'s behavior can be modified via other options.
path based, versions. `statfs`'s behavior can be modified via other
options.
When using policies which are based on a branch's available space the
base path provided is used. Not the full path to the file in
@ -715,8 +717,7 @@ In cases where something may be searched for (such as a path to clone)
### Policies
A policy is the algorithm used to choose a branch or branches for a
function to work on. Think of them as ways to filter and sort
branches.
function to work on or generally how the function behaves.
Any function in the `create` category will clone the relative path if
needed. Some other functions (`rename`,`link`,`ioctl`) have special
@ -725,12 +726,11 @@ requirements or behaviors which you can read more about below.
#### Filtering
Policies basically search branches and create a list of files / paths
Most policies basically search branches and create a list of files / paths
for functions to work on. The policy is responsible for filtering and
sorting the branches. Filters include **minfreespace**, whether or not
a branch is mounted read-only, and the branch tagging
(RO,NC,RW). These filters are applied across all policies unless
otherwise noted.
(RO,NC,RW). These filters are applied across most policies.
* No **search** function policies filter.
* All **action** function policies filter out branches which are
@ -823,6 +823,26 @@ policies is not appropriate.
| search | ff |
#### func.readdir
examples: `fuse.readdir=seq`, `fuse.readdir=cor:4`
`readdir` has policies to control how it manages reading directory
content.
| Policy | Description |
|--------|-------------|
| seq | "sequential" : Iterate over branches in the order defined. This is the default and traditional behavior found prior to the readdir policy introduction. |
| cosr | "concurrent open, sequential read" : Concurrently open branch directories using a thread pool and process them in order of definition. This keeps memory and CPU usage low while also reducing the time spent waiting on branches to respond. Number of threads defaults to the number of logical cores. Can be overwritten via the syntax `fuse.readdir=cosr:N` where `N` is the number of threads. |
| cor | "concurrent open and read" : Concurrently open branch directories and immediately start reading their contents using a thread pool. This will result in slightly higher memory and CPU usage but reduced latency. Particularly when using higher latency / slower speed network filesystem branches. Unlike `seq` and `cosr` the order of files could change due the async nature of the thread pool. Number of threads defaults to the number of logical cores. Can be overwritten via the syntax `fuse.readdir=cor:N` where `N` is the number of threads.
Keep in mind that `readdir` mostly just provides a list of file names
in a directory and possibly some basic metadata about said files. To
know details about the files, as one would see from commands like
`find` or `ls`, it is required to call `stat` on the file which is
controlled by `fuse.getattr`.
#### ioctl
When `ioctl` is used with an open file then it will use the file
@ -891,19 +911,6 @@ returned but it will still be possible.
**link** uses the same strategy but without the removals.
#### readdir ####
[readdir](http://linux.die.net/man/3/readdir) is different from all
other filesystem functions. While it could have its own set of
policies to tweak its behavior at this time it provides a simple union
of files and directories found. Remember that any action or
information queried about these files and directories come from the
respective function. For instance: an **ls** is a **readdir** and for
each file/directory returned **getattr** is called. Meaning the policy
of **getattr** is responsible for choosing the file/directory which is
the source of the metadata you see in an **ls**.
#### statfs / statvfs ####
[statvfs](http://linux.die.net/man/2/statvfs) normalizes the source

86
man/mergerfs.1

@ -337,6 +337,11 @@ policy.
See below for the list of value types.
Example: \f[B]func.getattr=newest\f[R]
.IP \[bu] 2
\f[B]func.readdir=seq|cosr|cor|cosr:INT|cor:INT\f[R]: Sets
\f[C]readdir\f[R] policy.
INT value sets the number of threads to use for concurrency.
(default: seq)
.IP \[bu] 2
\f[B]category.action=POLICY\f[R]: Sets policy of all FUSE functions in
the action category.
(default: epall)
@ -844,8 +849,6 @@ provide the file handle when a client calls \f[C]fgetattr\f[R],
\f[C]fchown\f[R], \f[C]fchmod\f[R], \f[C]futimens\f[R],
\f[C]ftruncate\f[R], etc.
This means it will call the regular, path based, versions.
\f[C]readdir\f[R] has no real need for a policy given the purpose is
merely to return a list of entries in a directory.
\f[C]statfs\f[R]\[cq]s behavior can be modified via other options.
.PP
When using policies which are based on a branch\[cq]s available space
@ -902,8 +905,7 @@ In cases where something may be searched for (such as a path to clone)
.SS Policies
.PP
A policy is the algorithm used to choose a branch or branches for a
function to work on.
Think of them as ways to filter and sort branches.
function to work on or generally how the function behaves.
.PP
Any function in the \f[C]create\f[R] category will clone the relative
path if needed.
@ -912,12 +914,12 @@ have special requirements or behaviors which you can read more about
below.
.SS Filtering
.PP
Policies basically search branches and create a list of files / paths
for functions to work on.
Most policies basically search branches and create a list of files /
paths for functions to work on.
The policy is responsible for filtering and sorting the branches.
Filters include \f[B]minfreespace\f[R], whether or not a branch is
mounted read-only, and the branch tagging (RO,NC,RW).
These filters are applied across all policies unless otherwise noted.
These filters are applied across most policies.
.IP \[bu] 2
No \f[B]search\f[R] function policies filter.
.IP \[bu] 2
@ -1134,6 +1136,63 @@ T}@T{
ff
T}
.TE
.SS func.readdir
.PP
examples: \f[C]fuse.readdir=seq\f[R], \f[C]fuse.readdir=cor:4\f[R]
.PP
\f[C]readdir\f[R] has policies to control how it manages reading
directory content.
.PP
.TS
tab(@);
lw(26.7n) lw(43.3n).
T{
Policy
T}@T{
Description
T}
_
T{
seq
T}@T{
\[lq]sequential\[rq] : Iterate over branches in the order defined.
This is the default and traditional behavior found prior to the readdir
policy introduction.
T}
T{
cosr
T}@T{
\[lq]concurrent open, sequential read\[rq] : Concurrently open branch
directories using a thread pool and process them in order of definition.
This keeps memory and CPU usage low while also reducing the time spent
waiting on branches to respond.
Number of threads defaults to the number of logical cores.
Can be overwritten via the syntax \f[C]fuse.readdir=cosr:N\f[R] where
\f[C]N\f[R] is the number of threads.
T}
T{
cor
T}@T{
\[lq]concurrent open and read\[rq] : Concurrently open branch
directories and immediately start reading their contents using a thread
pool.
This will result in slightly higher memory and CPU usage but reduced
latency.
Particularly when using higher latency / slower speed network filesystem
branches.
Unlike \f[C]seq\f[R] and \f[C]cosr\f[R] the order of files could change
due the async nature of the thread pool.
Number of threads defaults to the number of logical cores.
Can be overwritten via the syntax \f[C]fuse.readdir=cor:N\f[R] where
\f[C]N\f[R] is the number of threads.
T}
.TE
.PP
Keep in mind that \f[C]readdir\f[R] mostly just provides a list of file
names in a directory and possibly some basic metadata about said files.
To know details about the files, as one would see from commands like
\f[C]find\f[R] or \f[C]ls\f[R], it is required to call \f[C]stat\f[R] on
the file which is controlled by \f[C]fuse.getattr\f[R].
.SS ioctl
.PP
When \f[C]ioctl\f[R] is used with an open file then it will use the file
@ -1246,19 +1305,6 @@ The above behavior will help minimize the likelihood of EXDEV being
returned but it will still be possible.
.PP
\f[B]link\f[R] uses the same strategy but without the removals.
.SS readdir
.PP
readdir (http://linux.die.net/man/3/readdir) is different from all other
filesystem functions.
While it could have its own set of policies to tweak its behavior at
this time it provides a simple union of files and directories found.
Remember that any action or information queried about these files and
directories come from the respective function.
For instance: an \f[B]ls\f[R] is a \f[B]readdir\f[R] and for each
file/directory returned \f[B]getattr\f[R] is called.
Meaning the policy of \f[B]getattr\f[R] is responsible for choosing the
file/directory which is the source of the metadata you see in an
\f[B]ls\f[R].
.SS statfs / statvfs
.PP
statvfs (http://linux.die.net/man/2/statvfs) normalizes the source

4
src/config.cpp

@ -112,7 +112,7 @@ Config::Config()
pid(::getpid()),
posix_acl(false),
readahead(0),
readdir(ReadDir::ENUM::POSIX),
readdir("seq"),
readdirplus(false),
rename_exdev(RenameEXDEV::ENUM::PASSTHROUGH),
scheduling_priority(-10),
@ -162,6 +162,7 @@ Config::Config()
_map["func.mkdir"] = &func.mkdir;
_map["func.mknod"] = &func.mknod;
_map["func.open"] = &func.open;
_map["func.readdir"] = &readdir;
_map["func.readlink"] = &func.readlink;
_map["func.removexattr"] = &func.removexattr;
_map["func.rename"] = &func.rename;
@ -189,7 +190,6 @@ Config::Config()
_map["pin-threads"] = &fuse_pin_threads;
_map["posix_acl"] = &posix_acl;
_map["readahead"] = &readahead;
// _map["readdir"] = &readdir;
_map["readdirplus"] = &readdirplus;
_map["rename-exdev"] = &rename_exdev;
_map["scheduling-priority"] = &scheduling_priority;

6
src/config.hpp

@ -25,15 +25,15 @@
#include "config_log_metrics.hpp"
#include "config_moveonenospc.hpp"
#include "config_nfsopenhack.hpp"
#include "config_readdir.hpp"
#include "config_rename_exdev.hpp"
#include "config_set.hpp"
#include "config_statfs.hpp"
#include "config_statfsignore.hpp"
#include "config_xattr.hpp"
#include "config_set.hpp"
#include "enum.hpp"
#include "errno.hpp"
#include "funcs.hpp"
#include "fuse_readdir.hpp"
#include "policy.hpp"
#include "rwlock.hpp"
#include "tofrom_wrapper.hpp"
@ -135,7 +135,7 @@ public:
ConfigUINT64 pid;
ConfigBOOL posix_acl;
ConfigUINT64 readahead;
ReadDir readdir;
FUSE::ReadDir readdir;
ConfigBOOL readdirplus;
RenameEXDEV rename_exdev;
ConfigINT scheduling_priority;

13
src/fs_devid.hpp

@ -19,6 +19,7 @@
#pragma once
#include "fs_fstat.hpp"
#include "fs_dirfd.hpp"
namespace fs
@ -37,4 +38,16 @@ namespace fs
return st.st_dev;
}
static
inline
dev_t
devid(DIR *dh_)
{
int dirfd;
dirfd = fs::dirfd(dh_);
return fs::devid(dirfd);
}
}

4
src/fs_dirfd.hpp

@ -27,8 +27,8 @@ namespace fs
static
inline
int
dirfd(DIR *dirp_)
dirfd(DIR *dh_)
{
return ::dirfd(dirp_);
return ::dirfd(dh_);
}
}

13
src/fs_inode.cpp

@ -208,6 +208,19 @@ namespace fs
return g_func(fusepath_,fusepath_len_,mode_,dev_,ino_);
}
uint64_t
calc(std::string const &fusepath_,
const mode_t mode_,
const dev_t dev_,
const ino_t ino_)
{
return calc(fusepath_.c_str(),
fusepath_.size(),
mode_,
dev_,
ino_);
}
void
calc(const char *fusepath_,
const uint64_t fusepath_len_,

6
src/fs_inode.hpp

@ -37,7 +37,11 @@ namespace fs
const uint64_t fusepath_len,
const mode_t mode,
const dev_t dev,
const ino_t ion);
const ino_t ino);
uint64_t calc(std::string const &fusepath,
mode_t const mode,
dev_t const dev,
ino_t ino);
void calc(const char *fusepath,
const uint64_t fusepath_len,
struct stat *st);

75
src/fuse_readdir.cpp

@ -16,35 +16,64 @@
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include "fuse_readdir_posix.hpp"
#include "fuse_readdir_linux.hpp"
#include "fuse_readdir.hpp"
#include "fuse_readdir_factory.hpp"
#include "config.hpp"
#include "dirinfo.hpp"
#include "rwlock.hpp"
#include "ugid.hpp"
#include "fuse.h"
int
FUSE::readdir(const fuse_file_info_t *ffi_,
fuse_dirents_t *buf_)
{
Config::Write cfg;
return cfg->readdir(ffi_,buf_);
}
FUSE::ReadDir::ReadDir(std::string const s_)
{
from_string(s_);
assert(_readdir);
}
namespace FUSE
std::string
FUSE::ReadDir::to_string() const
{
int
readdir(const fuse_file_info_t *ffi_,
fuse_dirents_t *buf_)
std::lock_guard<std::mutex> lg(_mutex);
return _type;
}
int
FUSE::ReadDir::from_string(std::string const &str_)
{
std::shared_ptr<FUSE::ReadDirBase> tmp;
tmp = FUSE::ReadDirFactory::make(str_);
if(!tmp)
return -EINVAL;
{
std::lock_guard<std::mutex> lg(_mutex);
_type = str_;
_readdir = tmp;
}
return 0;
}
int
FUSE::ReadDir::operator()(fuse_file_info_t const *ffi_,
fuse_dirents_t *buf_)
{
std::shared_ptr<FUSE::ReadDirBase> readdir;
{
Config::Read cfg;
DirInfo *di = reinterpret_cast<DirInfo*>(ffi_->fh);
const fuse_context *fc = fuse_get_context();
const ugid::Set ugid(fc->uid,fc->gid);
switch(cfg->readdir)
{
case ReadDir::ENUM::LINUX:
return FUSE::readdir_linux(cfg->branches,di->fusepath.c_str(),buf_);
default:
case ReadDir::ENUM::POSIX:
return FUSE::readdir_posix(cfg->branches,di->fusepath.c_str(),buf_);
}
std::lock_guard<std::mutex> lg(_mutex);
readdir = _readdir;
}
return (*readdir)(ffi_,buf_);
}

33
src/fuse_readdir.hpp

@ -17,11 +17,38 @@
#pragma once
#include "fuse.h"
#include "tofrom_string.hpp"
#include "fuse_readdir_base.hpp"
#include <memory>
#include <mutex>
#include <assert.h>
namespace FUSE
{
int
readdir(const fuse_file_info_t *ffi,
fuse_dirents_t *buf);
int readdir(fuse_file_info_t const *ffi,
fuse_dirents_t *buf);
class ReadDir : public ToFromString
{
public:
ReadDir(std::string const s_);
public:
std::string to_string() const;
int from_string(const std::string &);
public:
int operator()(fuse_file_info_t const *ffi,
fuse_dirents_t *buf);
private:
mutable std::mutex _mutex;
private:
std::string _type;
std::shared_ptr<FUSE::ReadDirBase> _readdir;
};
}

34
src/fuse_readdir_base.hpp

@ -0,0 +1,34 @@
/*
Copyright (c) 2023, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include "fuse.h"
namespace FUSE
{
class ReadDirBase
{
public:
ReadDirBase() {};
virtual ~ReadDirBase() {};
public:
virtual int operator()(fuse_file_info_t const *ffi,
fuse_dirents_t *buf) = 0;
};
}

194
src/fuse_readdir_cor.cpp

@ -0,0 +1,194 @@
/*
Copyright (c) 2023, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include "fuse_readdir_cor.hpp"
#include "config.hpp"
#include "dirinfo.hpp"
#include "errno.hpp"
#include "fs_closedir.hpp"
#include "fs_devid.hpp"
#include "fs_inode.hpp"
#include "fs_opendir.hpp"
#include "fs_path.hpp"
#include "fs_readdir.hpp"
#include "hashset.hpp"
#include "ugid.hpp"
#include "fuse_dirents.h"
FUSE::ReadDirCOR::ReadDirCOR(unsigned concurrency_)
: _tp(concurrency_)
{
}
FUSE::ReadDirCOR::~ReadDirCOR()
{
}
namespace l
{
static
inline
uint64_t
dirent_exact_namelen(const struct dirent *d_)
{
#ifdef _D_EXACT_NAMLEN
return _D_EXACT_NAMLEN(d_);
#elif defined _DIRENT_HAVE_D_NAMLEN
return d_->d_namlen;
#else
return strlen(d_->d_name);
#endif
}
static
inline
int
readdir(std::string basepath_,
HashSet &names_,
std::mutex &names_mutex_,
fuse_dirents_t *buf_,
std::mutex &dirents_mutex_)
{
int rv;
int err;
DIR *dh;
dev_t dev;
std::string filepath;
dh = fs::opendir(basepath_);
if(dh == NULL)
return -errno;
dev = fs::devid(dh);
rv = 0;
err = 0;
for(struct dirent *de = fs::readdir(dh); de && !rv; de = fs::readdir(dh))
{
std::uint64_t namelen;
namelen = l::dirent_exact_namelen(de);
{
std::lock_guard<std::mutex> lk(names_mutex_);
rv = names_.put(de->d_name,namelen);
if(rv == 0)
continue;
}
filepath = fs::path::make(basepath_,de->d_name);
de->d_ino = fs::inode::calc(filepath,
DTTOIF(de->d_type),
dev,
de->d_ino);
{
std::lock_guard<std::mutex> lk(dirents_mutex_);
rv = fuse_dirents_add(buf_,de,namelen);
if(rv == 0)
continue;
}
err = -ENOMEM;
}
fs::closedir(dh);
return err;
}
static
std::vector<int>
concurrent_readdir(ThreadPool &tp_,
const Branches::CPtr &branches_,
const char *dirname_,
fuse_dirents_t *buf_)
{
HashSet names;
std::mutex names_mutex;
std::mutex dirents_mutex;
std::vector<int> rv;
std::vector<std::future<int>> futures;
for(auto const &branch : *branches_)
{
auto func = [&]()
{
std::string basepath = fs::path::make(branch.path,dirname_);
return l::readdir(basepath,names,names_mutex,buf_,dirents_mutex);
};
auto rv = tp_.enqueue_task(func);
futures.emplace_back(std::move(rv));
}
for(auto &future : futures)
rv.push_back(future.get());
return rv;
}
static
int
calc_rv(std::vector<int> rvs_)
{
for(auto rv : rvs_)
{
if(rv == 0)
return 0;
}
if(rvs_.empty())
return -ENOENT;
return rvs_[0];
}
static
int
readdir(ThreadPool &tp_,
const Branches::CPtr &branches_,
const char *dirname_,
fuse_dirents_t *buf_)
{
std::vector<int> rvs;
fuse_dirents_reset(buf_);
rvs = l::concurrent_readdir(tp_,branches_,dirname_,buf_);
return l::calc_rv(rvs);
}
}
int
FUSE::ReadDirCOR::operator()(fuse_file_info_t const *ffi_,
fuse_dirents_t *buf_)
{
Config::Read cfg;
DirInfo *di = reinterpret_cast<DirInfo*>(ffi_->fh);
const fuse_context *fc = fuse_get_context();
const ugid::Set ugid(fc->uid,fc->gid);
return l::readdir(_tp,cfg->branches,di->fusepath.c_str(),buf_);
}

48
src/config_readdir.cpp → src/fuse_readdir_cor.hpp

@ -1,7 +1,7 @@
/*
ISC License
Copyright (c) 2020, Antonio SJ Musumeci <trapexit@spawn.link>
Copyright (c) 2023, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@ -16,36 +16,24 @@
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include "config_readdir.hpp"
#include "ef.hpp"
#include "errno.hpp"
#pragma once
#include "fuse_readdir_base.hpp"
#include "unbounded_thread_pool.hpp"
template<>
int
ReadDir::from_string(const std::string &s_)
// concurrent open & read
namespace FUSE
{
if(s_ == "posix")
_data = ReadDir::ENUM::POSIX;
ef(s_ == "linux")
_data = ReadDir::ENUM::LINUX;
else
return -EINVAL;
return 0;
}
template<>
std::string
ReadDir::to_string(void) const
{
switch(_data)
{
case ReadDir::ENUM::POSIX:
return "posix";
case ReadDir::ENUM::LINUX:
return "linux";
}
return "invalid";
class ReadDirCOR final : public FUSE::ReadDirBase
{
public:
ReadDirCOR(unsigned concurrency);
~ReadDirCOR();
int operator()(fuse_file_info_t const *ffi,
fuse_dirents_t *buf);
private:
ThreadPool _tp;
};
}

173
src/fuse_readdir_cosr.cpp

@ -0,0 +1,173 @@
/*
Copyright (c) 2023, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include "fuse_readdir_cosr.hpp"
#include "config.hpp"
#include "dirinfo.hpp"
#include "errno.hpp"
#include "fs_closedir.hpp"
#include "fs_devid.hpp"
#include "fs_dirfd.hpp"
#include "fs_inode.hpp"
#include "fs_opendir.hpp"
#include "fs_path.hpp"
#include "fs_readdir.hpp"
#include "fs_stat.hpp"
#include "hashset.hpp"
#include "ugid.hpp"
#include "fuse_dirents.h"
FUSE::ReadDirCOSR::ReadDirCOSR(unsigned concurrency_)
: _tp(concurrency_)
{
}
FUSE::ReadDirCOSR::~ReadDirCOSR()
{
}
namespace l
{
static
inline
uint64_t
dirent_exact_namelen(const struct dirent *d_)
{
#ifdef _D_EXACT_NAMLEN
return _D_EXACT_NAMLEN(d_);
#elif defined _DIRENT_HAVE_D_NAMLEN
return d_->d_namlen;
#else
return strlen(d_->d_name);
#endif
}
static
inline
std::vector<std::future<DIR*>>
opendir(ThreadPool &tp_,
const Branches::CPtr &branches_,
char const *dirname_)
{
std::vector<std::future<DIR*>> futures;
for(auto const &branch : *branches_)
{
auto func = [&branch,dirname_]()
{
std::string basepath = fs::path::make(branch.path,dirname_);
return fs::opendir(basepath);
};
auto rv = tp_.enqueue_task(func);
futures.emplace_back(std::move(rv));
}
return futures;
}
static
inline
int
readdir(std::vector<std::future<DIR*>> &dh_futures_,
char const *dirname_,
fuse_dirents_t *buf_)
{
int err;
HashSet names;
std::string fullpath;
err = 0;
for(auto &dh_future : dh_futures_)
{
int rv;
DIR *dh;
dev_t dev;
dh = dh_future.get();
if(dh == NULL)
continue;
dev = fs::devid(dh);
rv = 0;
for(struct dirent *de = fs::readdir(dh); de && !rv; de = fs::readdir(dh))
{
std::uint64_t namelen;
namelen = l::dirent_exact_namelen(de);
rv = names.put(de->d_name,namelen);
if(rv == 0)
continue;
fullpath = fs::path::make(dirname_,de->d_name);
de->d_ino = fs::inode::calc(fullpath,
DTTOIF(de->d_type),
dev,
de->d_ino);
rv = fuse_dirents_add(buf_,de,namelen);
if(rv == 0)
continue;
err = -ENOMEM;
}
fs::closedir(dh);
}
return err;
}
static
inline
int
readdir(ThreadPool &tp_,
const Branches::CPtr &branches_,
const char *dirname_,
fuse_dirents_t *buf_)
{
int rv;
std::vector<std::future<DIR*>> dh_futures;
fuse_dirents_reset(buf_);
dh_futures = l::opendir(tp_,branches_,dirname_);
rv = l::readdir(dh_futures,dirname_,buf_);
return rv;
}
}
int
FUSE::ReadDirCOSR::operator()(fuse_file_info_t const *ffi_,
fuse_dirents_t *buf_)
{
Config::Read cfg;
DirInfo *di = reinterpret_cast<DirInfo*>(ffi_->fh);
const fuse_context *fc = fuse_get_context();
const ugid::Set ugid(fc->uid,fc->gid);
return l::readdir(_tp,cfg->branches,di->fusepath.c_str(),buf_);
}

39
src/fuse_readdir_cosr.hpp

@ -0,0 +1,39 @@
/*
ISC License
Copyright (c) 2023, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include "fuse_readdir_base.hpp"
#include "unbounded_thread_pool.hpp"
// concurrent open, sequential read
namespace FUSE
{
class ReadDirCOSR final : public FUSE::ReadDirBase
{
public:
ReadDirCOSR(unsigned concurrency);
~ReadDirCOSR();
int operator()(fuse_file_info_t const *ffi,
fuse_dirents_t *buf);
private:
ThreadPool _tp;
};
}

75
src/fuse_readdir_factory.cpp

@ -0,0 +1,75 @@
/*
ISC License
Copyright (c) 2023, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include "fuse_readdir_factory.hpp"
#include "fuse_readdir_cor.hpp"
#include "fuse_readdir_cosr.hpp"
#include "fuse_readdir_seq.hpp"
#include <cassert>
#include <cmath>
#include <cstdio>
#include <cstdlib>
namespace l
{
static
void
read_cfg(std::string const cfgstr_,
std::string &type_,
int &concurrency_)
{
char type[16];
int concurrency;
concurrency = 0;
std::sscanf(cfgstr_.c_str(),"%15[a-z]:%u",type,&concurrency);
if(concurrency == 0)
concurrency = std::thread::hardware_concurrency();
else if(concurrency < 0)
concurrency = (std::thread::hardware_concurrency() / std::abs(concurrency));
if(concurrency == 0)
concurrency = 1;
type_ = type;
concurrency_ = concurrency;
}
}
std::shared_ptr<FUSE::ReadDirBase>
FUSE::ReadDirFactory::make(std::string const cfgstr_)
{
int concurrency;
std::string type;
l::read_cfg(cfgstr_,type,concurrency);
assert(concurrency);
if(type == "seq")
return std::make_shared<FUSE::ReadDirSeq>();
if(type == "cosr")
return std::make_shared<FUSE::ReadDirCOSR>(concurrency);
if(type == "cor")
return std::make_shared<FUSE::ReadDirCOR>(concurrency);
return {};
}

18
src/config_readdir.hpp → src/fuse_readdir_factory.hpp

@ -1,7 +1,7 @@
/*
ISC License
Copyright (c) 2020, Antonio SJ Musumeci <trapexit@spawn.link>
Copyright (c) 2023, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@ -18,13 +18,17 @@
#pragma once
#include "enum.hpp"
#include "fuse_readdir_base.hpp"
#include <string>
#include <memory>
enum class ReadDirEnum
namespace FUSE
{
class ReadDirFactory
{
POSIX,
LINUX
public:
static std::shared_ptr<ReadDirBase> make(std::string const cfgstr);
};
typedef Enum<ReadDirEnum> ReadDir;
}

30
src/fuse_readdir_plus.cpp

@ -16,13 +16,7 @@
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include "fuse_readdir_plus_linux.hpp"
#include "fuse_readdir_plus_posix.hpp"
#include "config.hpp"
#include "dirinfo.hpp"
#include "rwlock.hpp"
#include "ugid.hpp"
#include "errno.hpp"
#include "fuse.h"
@ -33,26 +27,6 @@ namespace FUSE
readdir_plus(const fuse_file_info_t *ffi_,
fuse_dirents_t *buf_)
{
Config::Read cfg;
DirInfo *di = reinterpret_cast<DirInfo*>(ffi_->fh);
const fuse_context *fc = fuse_get_context();
const ugid::Set ugid(fc->uid,fc->gid);
switch(cfg->readdir)
{
case ReadDir::ENUM::LINUX:
return FUSE::readdir_plus_linux(cfg->branches,
di->fusepath.c_str(),
cfg->cache_entry,
cfg->cache_attr,
buf_);
default:
case ReadDir::ENUM::POSIX:
return FUSE::readdir_plus_posix(cfg->branches,
di->fusepath.c_str(),
cfg->cache_entry,
cfg->cache_attr,
buf_);
}
return -ENOTSUP;
}
}

43
src/fuse_readdir_posix.cpp → src/fuse_readdir_seq.cpp

@ -16,7 +16,11 @@
#define _DEFAULT_SOURCE
#include "fuse_readdir_seq.hpp"
#include "branches.hpp"
#include "config.hpp"
#include "dirinfo.hpp"
#include "errno.hpp"
#include "fs_closedir.hpp"
#include "fs_devid.hpp"
@ -27,6 +31,7 @@
#include "fs_readdir.hpp"
#include "fs_stat.hpp"
#include "hashset.hpp"
#include "ugid.hpp"
#include "fuse.h"
#include "fuse_dirents.h"
@ -34,11 +39,6 @@
#include <string>
#include <vector>
#include <dirent.h>
using std::string;
using std::vector;
namespace l
{
@ -61,19 +61,17 @@ namespace l
const char *dirname_,
fuse_dirents_t *buf_)
{
dev_t dev;
HashSet names;
string basepath;
string fullpath;
uint64_t namelen;
std::string basepath;
std::string fullpath;
fuse_dirents_reset(buf_);
for(const auto &branch : *branches_)
{
int rv;
int dirfd;
DIR *dh;
dev_t dev;
basepath = fs::path::make(branch.path,dirname_);
@ -81,12 +79,13 @@ namespace l
if(!dh)
continue;
dirfd = fs::dirfd(dh);
dev = fs::devid(dirfd);
dev = fs::devid(dh);
rv = 0;
for(struct dirent *de = fs::readdir(dh); de && !rv; de = fs::readdir(dh))
{
std::uint64_t namelen;
namelen = l::dirent_exact_namelen(de);
rv = names.put(de->d_name,namelen);
@ -94,8 +93,7 @@ namespace l
continue;
fullpath = fs::path::make(dirname_,de->d_name);
de->d_ino = fs::inode::calc(fullpath.c_str(),
fullpath.size(),
de->d_ino = fs::inode::calc(fullpath,
DTTOIF(de->d_type),
dev,
de->d_ino);
@ -112,13 +110,14 @@ namespace l
}
}
namespace FUSE
int
FUSE::ReadDirSeq::operator()(fuse_file_info_t const *ffi_,
fuse_dirents_t *buf_)
{
int
readdir_posix(const Branches::CPtr &branches_,
const char *dirname_,
fuse_dirents_t *buf_)
{
return l::readdir(branches_,dirname_,buf_);
}
Config::Read cfg;
DirInfo *di = reinterpret_cast<DirInfo*>(ffi_->fh);
const fuse_context *fc = fuse_get_context();
const ugid::Set ugid(fc->uid,fc->gid);
return l::readdir(cfg->branches,di->fusepath.c_str(),buf_);
}

19
src/fuse_readdir_posix.hpp → src/fuse_readdir_seq.hpp

@ -18,17 +18,18 @@
#pragma once
#include "branches.hpp"
#include "fuse.h"
#include <cstdint>
#include "fuse_readdir_base.hpp"
namespace FUSE
{
int
readdir_posix(const Branches::CPtr &branches,
const char *dirname,
fuse_dirents_t *buf);
class ReadDirSeq final : public FUSE::ReadDirBase
{
public:
ReadDirSeq() {}
~ReadDirSeq() {}
int operator()(fuse_file_info_t const *ffi,
fuse_dirents_t *buf);
};
}

161
src/unbounded_queue.hpp

@ -0,0 +1,161 @@
#pragma once
#include <condition_variable>
#include <mutex>
#include <queue>
#include <utility>
template<typename T>
class UnboundedQueue
{
public:
explicit
UnboundedQueue(bool block_ = true)
: _block(block_)
{
}
void
push(const T& item_)
{
{
std::lock_guard<std::mutex> guard(_queue_lock);
_queue.push(item_);
}
_condition.notify_one();
}
void
push(T&& item_)
{
{
std::lock_guard<std::mutex> guard(_queue_lock);
_queue.push(std::move(item_));
}
_condition.notify_one();
}
template<typename... Args>
void
emplace(Args&&... args_)
{
{
std::lock_guard<std::mutex> guard(_queue_lock);
_queue.emplace(std::forward<Args>(args_)...);
}
_condition.notify_one();
}
bool
try_push(const T& item_)
{
{
std::unique_lock<std::mutex> lock(_queue_lock, std::try_to_lock);
if(!lock)
return false;
_queue.push(item_);
}
_condition.notify_one();
return true;
}
bool
try_push(T&& item_)
{
{
std::unique_lock<std::mutex> lock(_queue_lock, std::try_to_lock);
if(!lock)
return false;
_queue.push(std::move(item_));
}
_condition.notify_one();
return true;
}
//TODO: push multiple T at once
bool
pop(T& item_)
{
std::unique_lock<std::mutex> guard(_queue_lock);
_condition.wait(guard, [&]() { return !_queue.empty() || !_block; });
if(_queue.empty())
return false;
item_ = std::move(_queue.front());
_queue.pop();
return true;
}
bool
try_pop(T& item_)
{
std::unique_lock<std::mutex> lock(_queue_lock, std::try_to_lock);
if(!lock || _queue.empty())
return false;
item_ = std::move(_queue.front());
_queue.pop();
return true;
}
std::size_t
size() const
{
std::lock_guard<std::mutex> guard(_queue_lock);
return _queue.size();
}
bool
empty() const
{
std::lock_guard<std::mutex> guard(_queue_lock);
return _queue.empty();
}
void
block()
{
std::lock_guard<std::mutex> guard(_queue_lock);
_block = true;
}
void
unblock()
{
{
std::lock_guard<std::mutex> guard(_queue_lock);
_block = false;
}
_condition.notify_all();
}
bool
blocking() const
{
std::lock_guard<std::mutex> guard(_queue_lock);
return _block;
}
private:
mutable std::mutex _queue_lock;
private:
bool _block;
std::queue<T> _queue;
std::condition_variable _condition;
};

124
src/unbounded_thread_pool.hpp

@ -0,0 +1,124 @@
#pragma once
#include "unbounded_queue.hpp"
#include <tuple>
#include <atomic>
#include <vector>
#include <thread>
#include <memory>
#include <future>
#include <utility>
#include <functional>
#include <type_traits>
class ThreadPool
{
public:
explicit
ThreadPool(const std::size_t thread_count_ = std::thread::hardware_concurrency())
: _queues(thread_count_),
_count(thread_count_)
{
auto worker = [this](std::size_t i)
{
while(true)
{
Proc f;
for(std::size_t n = 0; n < (_count * K); ++n)
{
if(_queues[(i + n) % _count].try_pop(f))
break;
}
if(!f && !_queues[i].pop(f))
break;
f();
}
};
_threads.reserve(thread_count_);
for(std::size_t i = 0; i < thread_count_; ++i)
_threads.emplace_back(worker, i);
}
~ThreadPool()
{
for(auto& queue : _queues)
queue.unblock();
for(auto& thread : _threads)
thread.join();
}
template<typename F>
void
enqueue_work(F&& f_)
{
auto i = _index++;
for(std::size_t n = 0; n < (_count * K); ++n)
{
if(_queues[(i + n) % _count].try_push(f_))
return;
}
_queues[i % _count].push(std::move(f_));
}
template<typename F>
[[nodiscard]]
std::future<typename std::result_of<F()>::type>
enqueue_task(F&& f_)
{
using TaskReturnType = typename std::result_of<F()>::type;
using Promise = std::promise<TaskReturnType>;
auto i = _index++;
auto promise = std::make_shared<Promise>();
auto future = promise->get_future();
auto work = [=]() {
auto rv = f_();
promise->set_value(rv);
};
for(std::size_t n = 0; n < (_count * K); ++n)
{
if(_queues[(i + n) % _count].try_push(work))
return future;
}
_queues[i % _count].push(std::move(work));
return future;
}
public:
std::vector<pthread_t>
threads()
{
std::vector<pthread_t> rv;
for(auto &thread : _threads)
rv.push_back(thread.native_handle());
return rv;
}
private:
using Proc = std::function<void(void)>;
using Queue = UnboundedQueue<Proc>;
using Queues = std::vector<Queue>;
Queues _queues;
private:
std::vector<std::thread> _threads;
private:
const std::size_t _count;
std::atomic_uint _index;
static const unsigned int K = 2;
};
Loading…
Cancel
Save