Browse Source

Merge pull request #1237 from trapexit/threadfix

Initialize readdir threadpool after daemonizing
pull/1238/head
trapexit 1 year ago
committed by GitHub
parent
commit
a927a15e9c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      Makefile
  2. 15
      man/mergerfs.1
  3. 2
      src/fuse_init.cpp
  4. 32
      src/fuse_readdir.cpp
  5. 9
      src/fuse_readdir.hpp
  6. 20
      src/unbounded_thread_pool.hpp

2
Makefile

@ -42,7 +42,7 @@ UGID_USE_RWLOCK = 0
ifeq ($(DEBUG),1) ifeq ($(DEBUG),1)
OPT_FLAGS := -O0 -g -fsanitize=undefined OPT_FLAGS := -O0 -g -fsanitize=undefined
else else
OPT_FLAGS := -O2
OPT_FLAGS := -O2 -DNDEBUG
endif endif
ifeq ($(STATIC),1) ifeq ($(STATIC),1)

15
man/mergerfs.1

@ -391,7 +391,9 @@ to enable page caching for when \f[C]cache.files=per-process\f[R].
.IP \[bu] 2 .IP \[bu] 2
\f[B]parallel-direct-writes=BOOL\f[R]: Allow the kernel to dispatch \f[B]parallel-direct-writes=BOOL\f[R]: Allow the kernel to dispatch
multiple, parallel (non-extending) write requests for files opened with multiple, parallel (non-extending) write requests for files opened with
\f[C]direct_io=true\f[R] (if supported by the kernel)
\f[C]cache.files=per-process\f[R] (if the process is not in
\f[C]process-names\f[R]) or \f[C]cache.files=off\f[R].
(This requires kernel support, and was added in v6.2)
.IP \[bu] 2 .IP \[bu] 2
\f[B]direct_io\f[R]: deprecated - Bypass page cache. \f[B]direct_io\f[R]: deprecated - Bypass page cache.
Use \f[C]cache.files=off\f[R] instead. Use \f[C]cache.files=off\f[R] instead.
@ -859,8 +861,8 @@ calculations.
The reason is that it doesn\[cq]t really work for non-path preserving The reason is that it doesn\[cq]t really work for non-path preserving
policies and can lead to non-obvious behaviors. policies and can lead to non-obvious behaviors.
.PP .PP
NOTE: While any policy can be assigned to a function or category though
some may not be very useful in practice.
NOTE: While any policy can be assigned to a function or category, some
may not be very useful in practice.
For instance: \f[B]rand\f[R] (random) may be useful for file creation For instance: \f[B]rand\f[R] (random) may be useful for file creation
(create) but could lead to very odd behavior if used for \f[C]chmod\f[R] (create) but could lead to very odd behavior if used for \f[C]chmod\f[R]
if there were more than one copy of the file. if there were more than one copy of the file.
@ -1753,9 +1755,12 @@ size are unchanged since previous open.
cache.files=libfuse: follow traditional libfuse \f[C]direct_io\f[R], cache.files=libfuse: follow traditional libfuse \f[C]direct_io\f[R],
\f[C]kernel_cache\f[R], and \f[C]auto_cache\f[R] arguments. \f[C]kernel_cache\f[R], and \f[C]auto_cache\f[R] arguments.
.IP \[bu] 2 .IP \[bu] 2
cache.files=per-process: Enable page caching only for processes which
`comm' name matches one of the values defined in
cache.files=per-process: Enable page caching (equivalent to
\f[C]cache.files=partial\f[R]) only for processes whose `comm' name
matches one of the values defined in
\f[C]cache.files.process-names\f[R]. \f[C]cache.files.process-names\f[R].
If the name does not match the file open is equivalent to
\f[C]cache.files=off\f[R].
.PP .PP
FUSE, which mergerfs uses, offers a number of page caching modes. FUSE, which mergerfs uses, offers a number of page caching modes.
mergerfs tries to simplify their use via the \f[C]cache.files\f[R] mergerfs tries to simplify their use via the \f[C]cache.files\f[R]

2
src/fuse_init.cpp

@ -137,6 +137,8 @@ namespace FUSE
ugid::init(); ugid::init();
cfg->readdir.initialize();
l::want_if_capable(conn_,FUSE_CAP_ASYNC_DIO); l::want_if_capable(conn_,FUSE_CAP_ASYNC_DIO);
l::want_if_capable(conn_,FUSE_CAP_ASYNC_READ,&cfg->async_read); l::want_if_capable(conn_,FUSE_CAP_ASYNC_READ,&cfg->async_read);
l::want_if_capable(conn_,FUSE_CAP_ATOMIC_O_TRUNC); l::want_if_capable(conn_,FUSE_CAP_ATOMIC_O_TRUNC);

32
src/fuse_readdir.cpp

@ -17,10 +17,15 @@
*/ */
#include "fuse_readdir.hpp" #include "fuse_readdir.hpp"
#include "fuse_readdir_factory.hpp"
#include "config.hpp" #include "config.hpp"
#include "fuse_readdir_factory.hpp"
/*
The _initialized stuff is not pretty but easiest way to deal with
the fact that mergerfs is doing arg parsing and setting up of things
(including thread pools) before the daemonizing
*/
int int
FUSE::readdir(const fuse_file_info_t *ffi_, FUSE::readdir(const fuse_file_info_t *ffi_,
@ -32,8 +37,10 @@ FUSE::readdir(const fuse_file_info_t *ffi_,
} }
FUSE::ReadDir::ReadDir(std::string const s_) FUSE::ReadDir::ReadDir(std::string const s_)
: _initialized(false)
{ {
from_string(s_); from_string(s_);
if(_initialized)
assert(_readdir); assert(_readdir);
} }
@ -45,9 +52,18 @@ FUSE::ReadDir::to_string() const
return _type; return _type;
} }
void
FUSE::ReadDir::initialize()
{
_initialized = true;
from_string(_type);
}
int int
FUSE::ReadDir::from_string(std::string const &str_) FUSE::ReadDir::from_string(std::string const &str_)
{ {
if(_initialized)
{
std::shared_ptr<FUSE::ReadDirBase> tmp; std::shared_ptr<FUSE::ReadDirBase> tmp;
tmp = FUSE::ReadDirFactory::make(str_); tmp = FUSE::ReadDirFactory::make(str_);
@ -58,12 +74,24 @@ FUSE::ReadDir::from_string(std::string const &str_)
std::lock_guard<std::mutex> lg(_mutex); std::lock_guard<std::mutex> lg(_mutex);
_type = str_; _type = str_;
_readdir = tmp;
std::swap(_readdir,tmp);
}
}
else
{
std::lock_guard<std::mutex> lg(_mutex);
_type = str_;
} }
return 0; return 0;
} }
/*
Yeah... if not initialized it will crash... ¯\_()_/¯
This will be resolved once initialization of internal objects and
handling of input is better seperated.
*/
int int
FUSE::ReadDir::operator()(fuse_file_info_t const *ffi_, FUSE::ReadDir::operator()(fuse_file_info_t const *ffi_,
fuse_dirents_t *buf_) fuse_dirents_t *buf_)

9
src/fuse_readdir.hpp

@ -25,7 +25,10 @@
#include <assert.h> #include <assert.h>
// The initialization behavior is not pretty but for the moment
// needed due to the daemonizing function of the libfuse library when
// not using foreground mode. The threads need to be created after the
// fork, not before.
namespace FUSE namespace FUSE
{ {
int readdir(fuse_file_info_t const *ffi, int readdir(fuse_file_info_t const *ffi,
@ -44,10 +47,14 @@ namespace FUSE
int operator()(fuse_file_info_t const *ffi, int operator()(fuse_file_info_t const *ffi,
fuse_dirents_t *buf); fuse_dirents_t *buf);
public:
void initialize();
private: private:
mutable std::mutex _mutex; mutable std::mutex _mutex;
private: private:
bool _initialized;
std::string _type; std::string _type;
std::shared_ptr<FUSE::ReadDirBase> _readdir; std::shared_ptr<FUSE::ReadDirBase> _readdir;
}; };

20
src/unbounded_thread_pool.hpp

@ -2,6 +2,8 @@
#include "unbounded_queue.hpp" #include "unbounded_queue.hpp"
#include "syslog.hpp"
#include <atomic> #include <atomic>
#include <csignal> #include <csignal>
#include <functional> #include <functional>
@ -21,8 +23,13 @@ public:
ThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(), ThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(),
std::string const name_ = {}) std::string const name_ = {})
: _queues(thread_count_), : _queues(thread_count_),
_count(thread_count_)
_count(thread_count_),
_name(name_)
{ {
syslog_info("threadpool: spawning %zu threads named '%s'",
_count,
_name.c_str());
auto worker = [this](std::size_t i) auto worker = [this](std::size_t i)
{ {
while(true) while(true)
@ -51,10 +58,10 @@ public:
_threads.reserve(thread_count_); _threads.reserve(thread_count_);
for(std::size_t i = 0; i < thread_count_; ++i) for(std::size_t i = 0; i < thread_count_; ++i)
_threads.emplace_back(worker, i); _threads.emplace_back(worker, i);
if(!name_.empty())
if(!_name.empty())
{ {
for(auto &t : _threads) for(auto &t : _threads)
pthread_setname_np(t.native_handle(),name_.c_str());
pthread_setname_np(t.native_handle(),_name.c_str());
} }
pthread_sigmask(SIG_SETMASK,&oldset,NULL); pthread_sigmask(SIG_SETMASK,&oldset,NULL);
@ -62,6 +69,10 @@ public:
~ThreadPool() ~ThreadPool()
{ {
syslog_info("threadpool: destroying %zu threads named '%s'",
_count,
_name.c_str());
for(auto& queue : _queues) for(auto& queue : _queues)
queue.unblock(); queue.unblock();
for(auto& thread : _threads) for(auto& thread : _threads)
@ -133,8 +144,9 @@ private:
std::vector<std::thread> _threads; std::vector<std::thread> _threads;
private: private:
const std::size_t _count;
std::size_t const _count;
std::atomic_uint _index; std::atomic_uint _index;
std::string const _name;
static const unsigned int K = 2; static const unsigned int K = 2;
}; };
Loading…
Cancel
Save