From 0c555e71a02314dc2ef9449163f65ba895940709 Mon Sep 17 00:00:00 2001 From: Antonio SJ Musumeci Date: Mon, 28 Aug 2023 00:24:25 -0500 Subject: [PATCH] Initialize readdir threadpool after daemonizing --- Makefile | 2 +- man/mergerfs.1 | 15 +++++++---- src/fuse_init.cpp | 2 ++ src/fuse_readdir.cpp | 50 +++++++++++++++++++++++++++-------- src/fuse_readdir.hpp | 9 ++++++- src/unbounded_thread_pool.hpp | 20 +++++++++++--- 6 files changed, 76 insertions(+), 22 deletions(-) diff --git a/Makefile b/Makefile index 14b551fa..784f41d8 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,7 @@ UGID_USE_RWLOCK = 0 ifeq ($(DEBUG),1) OPT_FLAGS := -O0 -g -fsanitize=undefined else -OPT_FLAGS := -O2 +OPT_FLAGS := -O2 -DNDEBUG endif ifeq ($(STATIC),1) diff --git a/man/mergerfs.1 b/man/mergerfs.1 index 9a448ec9..3ba7e879 100644 --- a/man/mergerfs.1 +++ b/man/mergerfs.1 @@ -391,7 +391,9 @@ to enable page caching for when \f[C]cache.files=per-process\f[R]. .IP \[bu] 2 \f[B]parallel-direct-writes=BOOL\f[R]: Allow the kernel to dispatch multiple, parallel (non-extending) write requests for files opened with -\f[C]direct_io=true\f[R] (if supported by the kernel) +\f[C]cache.files=per-process\f[R] (if the process is not in +\f[C]process-names\f[R]) or \f[C]cache.files=off\f[R]. +(This requires kernel support, and was added in v6.2) .IP \[bu] 2 \f[B]direct_io\f[R]: deprecated - Bypass page cache. Use \f[C]cache.files=off\f[R] instead. @@ -859,8 +861,8 @@ calculations. The reason is that it doesn\[cq]t really work for non-path preserving policies and can lead to non-obvious behaviors. .PP -NOTE: While any policy can be assigned to a function or category though -some may not be very useful in practice. +NOTE: While any policy can be assigned to a function or category, some +may not be very useful in practice. For instance: \f[B]rand\f[R] (random) may be useful for file creation (create) but could lead to very odd behavior if used for \f[C]chmod\f[R] if there were more than one copy of the file. @@ -1753,9 +1755,12 @@ size are unchanged since previous open. cache.files=libfuse: follow traditional libfuse \f[C]direct_io\f[R], \f[C]kernel_cache\f[R], and \f[C]auto_cache\f[R] arguments. .IP \[bu] 2 -cache.files=per-process: Enable page caching only for processes which -`comm' name matches one of the values defined in +cache.files=per-process: Enable page caching (equivalent to +\f[C]cache.files=partial\f[R]) only for processes whose `comm' name +matches one of the values defined in \f[C]cache.files.process-names\f[R]. +If the name does not match the file open is equivalent to +\f[C]cache.files=off\f[R]. .PP FUSE, which mergerfs uses, offers a number of page caching modes. mergerfs tries to simplify their use via the \f[C]cache.files\f[R] diff --git a/src/fuse_init.cpp b/src/fuse_init.cpp index 4fc7864e..27e7ab77 100644 --- a/src/fuse_init.cpp +++ b/src/fuse_init.cpp @@ -137,6 +137,8 @@ namespace FUSE ugid::init(); + cfg->readdir.initialize(); + l::want_if_capable(conn_,FUSE_CAP_ASYNC_DIO); l::want_if_capable(conn_,FUSE_CAP_ASYNC_READ,&cfg->async_read); l::want_if_capable(conn_,FUSE_CAP_ATOMIC_O_TRUNC); diff --git a/src/fuse_readdir.cpp b/src/fuse_readdir.cpp index 47392db5..9ca76fd2 100644 --- a/src/fuse_readdir.cpp +++ b/src/fuse_readdir.cpp @@ -17,10 +17,15 @@ */ #include "fuse_readdir.hpp" -#include "fuse_readdir_factory.hpp" #include "config.hpp" +#include "fuse_readdir_factory.hpp" +/* + The _initialized stuff is not pretty but easiest way to deal with + the fact that mergerfs is doing arg parsing and setting up of things + (including thread pools) before the daemonizing + */ int FUSE::readdir(const fuse_file_info_t *ffi_, @@ -32,9 +37,11 @@ FUSE::readdir(const fuse_file_info_t *ffi_, } FUSE::ReadDir::ReadDir(std::string const s_) + : _initialized(false) { from_string(s_); - assert(_readdir); + if(_initialized) + assert(_readdir); } std::string @@ -45,25 +52,46 @@ FUSE::ReadDir::to_string() const return _type; } +void +FUSE::ReadDir::initialize() +{ + _initialized = true; + from_string(_type); +} + int FUSE::ReadDir::from_string(std::string const &str_) { - std::shared_ptr tmp; + if(_initialized) + { + std::shared_ptr tmp; - tmp = FUSE::ReadDirFactory::make(str_); - if(!tmp) - return -EINVAL; + tmp = FUSE::ReadDirFactory::make(str_); + if(!tmp) + return -EINVAL; - { - std::lock_guard lg(_mutex); + { + std::lock_guard lg(_mutex); - _type = str_; - _readdir = tmp; - } + _type = str_; + std::swap(_readdir,tmp); + } + } + else + { + std::lock_guard lg(_mutex); + + _type = str_; + } return 0; } +/* + Yeah... if not initialized it will crash... ¯\_(ツ)_/¯ + This will be resolved once initialization of internal objects and + handling of input is better seperated. +*/ int FUSE::ReadDir::operator()(fuse_file_info_t const *ffi_, fuse_dirents_t *buf_) diff --git a/src/fuse_readdir.hpp b/src/fuse_readdir.hpp index 20cc1b30..452fbffe 100644 --- a/src/fuse_readdir.hpp +++ b/src/fuse_readdir.hpp @@ -25,7 +25,10 @@ #include - +// The initialization behavior is not pretty but for the moment +// needed due to the daemonizing function of the libfuse library when +// not using foreground mode. The threads need to be created after the +// fork, not before. namespace FUSE { int readdir(fuse_file_info_t const *ffi, @@ -44,10 +47,14 @@ namespace FUSE int operator()(fuse_file_info_t const *ffi, fuse_dirents_t *buf); + public: + void initialize(); + private: mutable std::mutex _mutex; private: + bool _initialized; std::string _type; std::shared_ptr _readdir; }; diff --git a/src/unbounded_thread_pool.hpp b/src/unbounded_thread_pool.hpp index 1bfa1dcd..5020bd85 100644 --- a/src/unbounded_thread_pool.hpp +++ b/src/unbounded_thread_pool.hpp @@ -2,6 +2,8 @@ #include "unbounded_queue.hpp" +#include "syslog.hpp" + #include #include #include @@ -21,8 +23,13 @@ public: ThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(), std::string const name_ = {}) : _queues(thread_count_), - _count(thread_count_) + _count(thread_count_), + _name(name_) { + syslog_info("threadpool: spawning %zu threads named '%s'", + _count, + _name.c_str()); + auto worker = [this](std::size_t i) { while(true) @@ -51,10 +58,10 @@ public: _threads.reserve(thread_count_); for(std::size_t i = 0; i < thread_count_; ++i) _threads.emplace_back(worker, i); - if(!name_.empty()) + if(!_name.empty()) { for(auto &t : _threads) - pthread_setname_np(t.native_handle(),name_.c_str()); + pthread_setname_np(t.native_handle(),_name.c_str()); } pthread_sigmask(SIG_SETMASK,&oldset,NULL); @@ -62,6 +69,10 @@ public: ~ThreadPool() { + syslog_info("threadpool: destroying %zu threads named '%s'", + _count, + _name.c_str()); + for(auto& queue : _queues) queue.unblock(); for(auto& thread : _threads) @@ -133,8 +144,9 @@ private: std::vector _threads; private: - const std::size_t _count; + std::size_t const _count; std::atomic_uint _index; + std::string const _name; static const unsigned int K = 2; };