Browse Source

thread_pool.hpp

Antonio SJ Musumeci 1 month ago
parent
commit
44163385fd
  1. 2
      Makefile
  2. 2
      libfuse/Makefile
  3. 346
      libfuse/include/invocable.h
  4. 493
      libfuse/include/thread_pool.hpp
  5. 114
      libfuse/lib/fuse_loop.cpp
  6. 14
      mkdocs/docs/config/func_readdir.md
  7. 32
      mkdocs/docs/config/threads.md
  8. 1
      mkdocs/mkdocs.yml
  9. 2
      src/config.cpp
  10. 2
      src/fuse_readdir_base.hpp
  11. 5
      src/fuse_readdir_cor.cpp
  12. 4
      src/fuse_readdir_cor.hpp
  13. 4
      src/fuse_readdir_cosr.cpp
  14. 73
      src/fuse_readdir_factory.cpp

2
Makefile

@ -75,7 +75,7 @@ CFLAGS := ${CFLAGS} \
CXXFLAGS ?= ${OPT_FLAGS}
CXXFLAGS := \
${CXXFLAGS} \
-std=c++11 \
-std=c++17 \
$(STATIC_FLAGS) \
$(LTO_FLAGS) \
-Wall \

2
libfuse/Makefile

@ -76,7 +76,7 @@ CXXFLAGS ?= \
CXXFLAGS := \
${CXXFLAGS} \
$(LTO_FLAGS) \
-std=c++11 \
-std=c++17 \
-Wall \
-pipe \
-MMD

346
libfuse/include/invocable.h

@ -0,0 +1,346 @@
#ifndef _ANY_INVOKABLE_H_
#define _ANY_INVOKABLE_H_
#include <functional>
#include <memory>
#include <type_traits>
// clang-format off
/*
namespace std {
template<class Sig> class any_invocable; // never defined
template<class R, class... ArgTypes>
class any_invocable<R(ArgTypes...) cv ref noexcept(noex)> {
public:
using result_type = R;
// SECTION.3, construct/copy/destroy
any_invocable() noexcept;
any_invocable(nullptr_t) noexcept;
any_invocable(any_invocable&&) noexcept;
template<class F> any_invocable(F&&);
template<class T, class... Args>
explicit any_invocable(in_place_type_t<T>, Args&&...);
template<class T, class U, class... Args>
explicit any_invocable(in_place_type_t<T>, initializer_list<U>, Args&&...);
any_invocable& operator=(any_invocable&&) noexcept;
any_invocable& operator=(nullptr_t) noexcept;
template<class F> any_invocable& operator=(F&&);
template<class F> any_invocable& operator=(reference_wrapper<F>) noexcept;
~any_invocable();
// SECTION.4, any_invocable modifiers
void swap(any_invocable&) noexcept;
// SECTION.5, any_invocable capacity
explicit operator bool() const noexcept;
// SECTION.6, any_invocable invocation
R operator()(ArgTypes...) cv ref noexcept(noex);
// SECTION.7, null pointer comparisons
friend bool operator==(const any_invocable&, nullptr_t) noexcept;
// SECTION.8, specialized algorithms
friend void swap(any_invocable&, any_invocable&) noexcept;
};
}
*/
// clang-format on
namespace ofats {
namespace any_detail {
using buffer = std::aligned_storage_t<sizeof(void*) * 2, alignof(void*)>;
template <class T>
inline constexpr bool is_small_object_v =
sizeof(T) <= sizeof(buffer) && alignof(buffer) % alignof(T) == 0 &&
std::is_nothrow_move_constructible_v<T>;
union storage {
void* ptr_ = nullptr;
buffer buf_;
};
enum class action { destroy, move };
template <class R, class... ArgTypes>
struct handler_traits {
template <class Derived>
struct handler_base {
static void handle(action act, storage* current, storage* other = nullptr) {
switch (act) {
case (action::destroy):
Derived::destroy(*current);
break;
case (action::move):
Derived::move(*current, *other);
break;
}
}
};
template <class T>
struct small_handler : handler_base<small_handler<T>> {
template <class... Args>
static void create(storage& s, Args&&... args) {
new (static_cast<void*>(&s.buf_)) T(std::forward<Args>(args)...);
}
static void destroy(storage& s) noexcept {
T& value = *static_cast<T*>(static_cast<void*>(&s.buf_));
value.~T();
}
static void move(storage& dst, storage& src) noexcept {
create(dst, std::move(*static_cast<T*>(static_cast<void*>(&src.buf_))));
destroy(src);
}
static R call(const storage& s, ArgTypes... args) {
return std::invoke(
*static_cast<T*>(static_cast<void*>(&const_cast<storage&>(s).buf_)),
std::forward<ArgTypes>(args)...);
}
};
template <class T>
struct large_handler : handler_base<large_handler<T>> {
template <class... Args>
static void create(storage& s, Args&&... args) {
s.ptr_ = new T(std::forward<Args>(args)...);
}
static void destroy(storage& s) noexcept { delete static_cast<T*>(s.ptr_); }
static void move(storage& dst, storage& src) noexcept {
dst.ptr_ = src.ptr_;
}
static R call(const storage& s, ArgTypes... args) {
return std::invoke(*static_cast<T*>(s.ptr_),
std::forward<ArgTypes>(args)...);
}
};
template <class T>
using handler = std::conditional_t<is_small_object_v<T>, small_handler<T>,
large_handler<T>>;
};
template <class T>
struct is_in_place_type : std::false_type {};
template <class T>
struct is_in_place_type<std::in_place_type_t<T>> : std::true_type {};
template <class T>
inline constexpr auto is_in_place_type_v = is_in_place_type<T>::value;
template <class R, bool is_noexcept, class... ArgTypes>
class any_invocable_impl {
template <class T>
using handler =
typename any_detail::handler_traits<R, ArgTypes...>::template handler<T>;
using storage = any_detail::storage;
using action = any_detail::action;
using handle_func = void (*)(any_detail::action, any_detail::storage*,
any_detail::storage*);
using call_func = R (*)(const any_detail::storage&, ArgTypes...);
public:
using result_type = R;
any_invocable_impl() noexcept = default;
any_invocable_impl(std::nullptr_t) noexcept {}
any_invocable_impl(any_invocable_impl&& rhs) noexcept {
if (rhs.handle_) {
handle_ = rhs.handle_;
handle_(action::move, &storage_, &rhs.storage_);
call_ = rhs.call_;
rhs.handle_ = nullptr;
}
}
any_invocable_impl& operator=(any_invocable_impl&& rhs) noexcept {
any_invocable_impl{std::move(rhs)}.swap(*this);
return *this;
}
any_invocable_impl& operator=(std::nullptr_t) noexcept {
destroy();
return *this;
}
~any_invocable_impl() { destroy(); }
void swap(any_invocable_impl& rhs) noexcept {
if (handle_) {
if (rhs.handle_) {
storage tmp;
handle_(action::move, &tmp, &storage_);
rhs.handle_(action::move, &storage_, &rhs.storage_);
handle_(action::move, &rhs.storage_, &tmp);
std::swap(handle_, rhs.handle_);
std::swap(call_, rhs.call_);
} else {
rhs.swap(*this);
}
} else if (rhs.handle_) {
rhs.handle_(action::move, &storage_, &rhs.storage_);
handle_ = rhs.handle_;
call_ = rhs.call_;
rhs.handle_ = nullptr;
}
}
explicit operator bool() const noexcept { return handle_ != nullptr; }
protected:
template <class F, class... Args>
void create(Args&&... args) {
using hdl = handler<F>;
hdl::create(storage_, std::forward<Args>(args)...);
handle_ = &hdl::handle;
call_ = &hdl::call;
}
void destroy() noexcept {
if (handle_) {
handle_(action::destroy, &storage_, nullptr);
handle_ = nullptr;
}
}
R call(ArgTypes... args) const noexcept(is_noexcept) {
return call_(storage_, std::forward<ArgTypes>(args)...);
}
friend bool operator==(const any_invocable_impl& f, std::nullptr_t) noexcept {
return !f;
}
friend bool operator==(std::nullptr_t, const any_invocable_impl& f) noexcept {
return !f;
}
friend bool operator!=(const any_invocable_impl& f, std::nullptr_t) noexcept {
return static_cast<bool>(f);
}
friend bool operator!=(std::nullptr_t, const any_invocable_impl& f) noexcept {
return static_cast<bool>(f);
}
friend void swap(any_invocable_impl& lhs, any_invocable_impl& rhs) noexcept {
lhs.swap(rhs);
}
private:
storage storage_;
handle_func handle_ = nullptr;
call_func call_;
};
template <class T>
using remove_cvref_t = std::remove_cv_t<std::remove_reference_t<T>>;
template <class AI, class F, bool noex, class R, class FCall, class... ArgTypes>
using can_convert = std::conjunction<
std::negation<std::is_same<remove_cvref_t<F>, AI>>,
std::negation<any_detail::is_in_place_type<remove_cvref_t<F>>>,
std::is_invocable_r<R, FCall, ArgTypes...>,
std::bool_constant<(!noex ||
std::is_nothrow_invocable_r_v<R, FCall, ArgTypes...>)>,
std::is_constructible<std::decay_t<F>, F>>;
} // namespace any_detail
template <class Signature>
class any_invocable;
#define __OFATS_ANY_INVOCABLE(cv, ref, noex, inv_quals) \
template <class R, class... ArgTypes> \
class any_invocable<R(ArgTypes...) cv ref noexcept(noex)> final \
: public any_detail::any_invocable_impl<R, noex, ArgTypes...> { \
using base_type = any_detail::any_invocable_impl<R, noex, ArgTypes...>; \
\
public: \
using base_type::base_type; \
\
template < \
class F, \
class = std::enable_if_t<any_detail::can_convert< \
any_invocable, F, noex, R, F inv_quals, ArgTypes...>::value>> \
any_invocable(F&& f) { \
base_type::template create<std::decay_t<F>>(std::forward<F>(f)); \
} \
\
template <class T, class... Args, class VT = std::decay_t<T>, \
class = std::enable_if_t< \
std::is_move_constructible_v<VT> && \
std::is_constructible_v<VT, Args...> && \
std::is_invocable_r_v<R, VT inv_quals, ArgTypes...> && \
(!noex || std::is_nothrow_invocable_r_v<R, VT inv_quals, \
ArgTypes...>)>> \
explicit any_invocable(std::in_place_type_t<T>, Args&&... args) { \
base_type::template create<VT>(std::forward<Args>(args)...); \
} \
\
template < \
class T, class U, class... Args, class VT = std::decay_t<T>, \
class = std::enable_if_t< \
std::is_move_constructible_v<VT> && \
std::is_constructible_v<VT, std::initializer_list<U>&, Args...> && \
std::is_invocable_r_v<R, VT inv_quals, ArgTypes...> && \
(!noex || \
std::is_nothrow_invocable_r_v<R, VT inv_quals, ArgTypes...>)>> \
explicit any_invocable(std::in_place_type_t<T>, \
std::initializer_list<U> il, Args&&... args) { \
base_type::template create<VT>(il, std::forward<Args>(args)...); \
} \
\
template <class F, class FDec = std::decay_t<F>> \
std::enable_if_t<!std::is_same_v<FDec, any_invocable> && \
std::is_move_constructible_v<FDec>, \
any_invocable&> \
operator=(F&& f) { \
any_invocable{std::forward<F>(f)}.swap(*this); \
return *this; \
} \
template <class F> \
any_invocable& operator=(std::reference_wrapper<F> f) { \
any_invocable{f}.swap(*this); \
return *this; \
} \
\
R operator()(ArgTypes... args) cv ref noexcept(noex) { \
return base_type::call(std::forward<ArgTypes>(args)...); \
} \
}
// cv -> {`empty`, const}
// ref -> {`empty`, &, &&}
// noex -> {true, false}
// inv_quals -> (is_empty(ref) ? & : ref)
__OFATS_ANY_INVOCABLE(, , false, &); // 000
__OFATS_ANY_INVOCABLE(, , true, &); // 001
__OFATS_ANY_INVOCABLE(, &, false, &); // 010
__OFATS_ANY_INVOCABLE(, &, true, &); // 011
__OFATS_ANY_INVOCABLE(, &&, false, &&); // 020
__OFATS_ANY_INVOCABLE(, &&, true, &&); // 021
__OFATS_ANY_INVOCABLE(const, , false, const&); // 100
__OFATS_ANY_INVOCABLE(const, , true, const&); // 101
__OFATS_ANY_INVOCABLE(const, &, false, const&); // 110
__OFATS_ANY_INVOCABLE(const, &, true, const&); // 111
__OFATS_ANY_INVOCABLE(const, &&, false, const&&); // 120
__OFATS_ANY_INVOCABLE(const, &&, true, const&&); // 121
#undef __OFATS_ANY_INVOCABLE
} // namespace ofats
#endif // _ANY_INVOKABLE_H_

493
libfuse/include/thread_pool.hpp

@ -1,6 +1,8 @@
#pragma once
#include "moodycamel/blockingconcurrentqueue.h"
#include "moodycamel/lightweightsemaphore.h"
#include "invocable.h"
#include <algorithm>
#include <atomic>
@ -13,9 +15,22 @@
#include <string>
#include <thread>
#include <vector>
#include <utility>
#include <pthread.h>
#include <syslog.h>
#define SEM
//#undef SEM
#ifdef SEM
#define SEMA_WAIT(S) (S.wait())
#define SEMA_SIGNAL(S) (S.signal())
#else
#define SEMA_WAIT(S)
#define SEMA_SIGNAL(S)
#endif
struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits
{
@ -25,157 +40,217 @@ struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits
class ThreadPool
{
public:
using PToken = moodycamel::ProducerToken;
using CToken = moodycamel::ConsumerToken;
private:
using Func = std::function<void(void)>;
using Func = ofats::any_invocable<void()>;
using Queue = moodycamel::BlockingConcurrentQueue<Func,ThreadPoolTraits>;
public:
explicit
ThreadPool(unsigned const thread_count_ = std::thread::hardware_concurrency(),
unsigned const max_queue_depth_ = std::thread::hardware_concurrency(),
std::string const name_ = {})
: _queue(),
_queue_depth(0),
_max_queue_depth(std::max(thread_count_,max_queue_depth_)),
_name(name_)
{
syslog(LOG_DEBUG,
"threadpool (%s): spawning %u threads w/ max queue depth %u%s",
_name.c_str(),
thread_count_,
_max_queue_depth,
((_max_queue_depth != max_queue_depth_) ? " (adjusted)" : ""));
ThreadPool(const unsigned thread_count_ = std::thread::hardware_concurrency(),
const unsigned max_queue_depth_ = std::thread::hardware_concurrency(),
std::string const name_ = {});
~ThreadPool();
sigset_t oldset;
sigset_t newset;
private:
static void *start_routine(void *arg_);
sigfillset(&newset);
pthread_sigmask(SIG_BLOCK,&newset,&oldset);
public:
int add_thread(std::string const name = {});
int remove_thread(void);
int set_threads(const std::size_t count);
_threads.reserve(thread_count_);
for(std::size_t i = 0; i < thread_count_; ++i)
{
int rv;
pthread_t t;
void shutdown(void);
rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
if(rv != 0)
{
syslog(LOG_WARNING,
"threadpool (%s): error spawning thread - %d (%s)",
_name.c_str(),
rv,
strerror(rv));
continue;
}
public:
template<typename FuncType>
void
enqueue_work(ThreadPool::PToken &ptok_,
FuncType &&func_);
if(!_name.empty())
pthread_setname_np(t,_name.c_str());
template<typename FuncType>
void
enqueue_work(FuncType &&func_);
_threads.push_back(t);
}
template<typename FuncType>
[[nodiscard]]
std::future<typename std::result_of<FuncType()>::type>
enqueue_task(FuncType&& func_);
pthread_sigmask(SIG_SETMASK,&oldset,NULL);
public:
std::vector<pthread_t> threads() const;
ThreadPool::PToken ptoken();
if(_threads.empty())
throw std::runtime_error("threadpool: failed to spawn any threads");
}
private:
Queue _queue;
moodycamel::LightweightSemaphore _sema;
~ThreadPool()
{
syslog(LOG_DEBUG,
"threadpool (%s): destroying %lu threads",
_name.c_str(),
_threads.size());
private:
std::string const _name;
std::vector<pthread_t> _threads;
mutable std::mutex _threads_mutex;
};
auto func = []() { pthread_exit(NULL); };
for(std::size_t i = 0; i < _threads.size(); i++)
_queue.enqueue(func);
for(auto t : _threads)
pthread_cancel(t);
for(auto t : _threads)
pthread_join(t,NULL);
}
inline
ThreadPool::ThreadPool(const unsigned thread_count_,
const unsigned max_queue_depth_,
const std::string name_)
: _queue(),
_sema(max_queue_depth_),
_name(name_)
{
sigset_t oldset;
sigset_t newset;
private:
static
void*
start_routine(void *arg_)
{
ThreadPool *btp = static_cast<ThreadPool*>(arg_);
ThreadPool::Func func;
ThreadPool::Queue &q = btp->_queue;
std::atomic<unsigned> &queue_depth = btp->_queue_depth;
moodycamel::ConsumerToken ctok(btp->_queue);
sigfillset(&newset);
pthread_sigmask(SIG_BLOCK,&newset,&oldset);
while(true)
{
q.wait_dequeue(ctok,func);
_threads.reserve(thread_count_);
for(std::size_t i = 0; i < thread_count_; ++i)
{
int rv;
pthread_t t;
func();
rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
if(rv != 0)
{
syslog(LOG_WARNING,
"threadpool (%s): error spawning thread - %d (%s)",
_name.c_str(),
rv,
strerror(rv));
continue;
}
queue_depth.fetch_sub(1,std::memory_order_release);
}
if(!_name.empty())
pthread_setname_np(t,_name.c_str());
return NULL;
}
_threads.push_back(t);
}
public:
int
add_thread(std::string const name_ = {})
{
int rv;
pthread_t t;
sigset_t oldset;
sigset_t newset;
std::string name;
pthread_sigmask(SIG_SETMASK,&oldset,NULL);
name = (name_.empty() ? _name : name_);
if(_threads.empty())
throw std::runtime_error("threadpool: failed to spawn any threads");
sigfillset(&newset);
pthread_sigmask(SIG_BLOCK,&newset,&oldset);
rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
pthread_sigmask(SIG_SETMASK,&oldset,NULL);
syslog(LOG_DEBUG,
"threadpool (%s): spawned %zu threads w/ max queue depth %u",
_name.c_str(),
_threads.size(),
max_queue_depth_);
}
inline
ThreadPool::~ThreadPool()
{
syslog(LOG_DEBUG,
"threadpool (%s): destroying %zu threads",
_name.c_str(),
_threads.size());
for(auto t : _threads)
pthread_cancel(t);
for(auto t : _threads)
pthread_join(t,NULL);
}
if(rv != 0)
{
syslog(LOG_WARNING,
"threadpool (%s): error spawning thread - %d (%s)",
_name.c_str(),
rv,
strerror(rv));
return -rv;
}
if(!name.empty())
pthread_setname_np(t,name.c_str());
inline
void*
ThreadPool::start_routine(void *arg_)
{
ThreadPool *btp = static_cast<ThreadPool*>(arg_);
bool done;
ThreadPool::Func func;
ThreadPool::Queue &q = btp->_queue;
moodycamel::LightweightSemaphore &sema = btp->_sema;
ThreadPool::CToken ctok(btp->_queue);
done = false;
while(!done)
{
std::lock_guard<std::mutex> lg(_threads_mutex);
_threads.push_back(t);
q.wait_dequeue(ctok,func);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
try
{
func();
}
catch(std::exception &e)
{
done = true;
}
SEMA_SIGNAL(sema);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
}
syslog(LOG_DEBUG,
"threadpool (%s): 1 thread added named '%s'",
_name.c_str(),
name.c_str());
return nullptr;
}
return 0;
}
int
remove_thread(void)
{
inline
int
ThreadPool::add_thread(const std::string name_)
{
int rv;
pthread_t t;
sigset_t oldset;
sigset_t newset;
std::string name;
name = (name_.empty() ? _name : name_);
sigfillset(&newset);
pthread_sigmask(SIG_BLOCK,&newset,&oldset);
rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
pthread_sigmask(SIG_SETMASK,&oldset,NULL);
if(rv != 0)
{
std::lock_guard<std::mutex> lg(_threads_mutex);
if(_threads.size() <= 1)
return -EINVAL;
syslog(LOG_WARNING,
"threadpool (%s): error spawning thread - %d (%s)",
_name.c_str(),
rv,
strerror(rv));
return -rv;
}
std::promise<pthread_t> promise;
auto func = [&]()
if(!name.empty())
pthread_setname_np(t,name.c_str());
{
std::lock_guard<std::mutex> lg(_threads_mutex);
_threads.push_back(t);
}
return 0;
}
inline
int
ThreadPool::remove_thread(void)
{
{
std::lock_guard<std::mutex> lg(_threads_mutex);
if(_threads.size() <= 1)
return -EINVAL;
}
std::promise<pthread_t> promise;
auto func =
[this,&promise]()
{
pthread_t t;
@ -195,124 +270,120 @@ public:
}
}
syslog(LOG_DEBUG,
"threadpool (%s): 1 thread removed",
_name.c_str());
pthread_exit(NULL);
};
enqueue_work(func);
pthread_join(promise.get_future().get(),NULL);
enqueue_work(std::move(func));
pthread_join(promise.get_future().get(),NULL);
return 0;
}
return 0;
}
int
set_threads(std::size_t const count_)
inline
int
ThreadPool::set_threads(std::size_t const count_)
{
int diff;
{
int diff;
{
std::lock_guard<std::mutex> lg(_threads_mutex);
std::lock_guard<std::mutex> lg(_threads_mutex);
diff = ((int)count_ - (int)_threads.size());
}
diff = ((int)count_ - (int)_threads.size());
}
for(auto i = diff; i > 0; --i)
add_thread();
for(auto i = diff; i < 0; ++i)
remove_thread();
for(auto i = diff; i > 0; --i)
add_thread();
for(auto i = diff; i < 0; ++i)
remove_thread();
return diff;
}
return diff;
}
public:
template<typename FuncType>
void
enqueue_work(moodycamel::ProducerToken &ptok_,
FuncType &&f_)
{
timespec ts = {0,1000};
for(unsigned i = 0; i < 1000000; i++)
{
if(_queue_depth.load(std::memory_order_acquire) < _max_queue_depth)
break;
::nanosleep(&ts,NULL);
}
_queue.enqueue(ptok_,f_);
_queue_depth.fetch_add(1,std::memory_order_release);
}
inline
void
ThreadPool::shutdown(void)
{
std::lock_guard<std::mutex> lg(_threads_mutex);
template<typename FuncType>
void
enqueue_work(FuncType &&f_)
{
timespec ts = {0,1000};
for(unsigned i = 0; i < 1000000; i++)
{
if(_queue_depth.load(std::memory_order_acquire) < _max_queue_depth)
break;
::nanosleep(&ts,NULL);
}
for(pthread_t tid : _threads)
pthread_cancel(tid);
for(pthread_t tid : _threads)
pthread_join(tid,NULL);
_queue.enqueue(f_);
_queue_depth.fetch_add(1,std::memory_order_release);
}
_threads.clear();
}
template<typename FuncType>
inline
void
ThreadPool::enqueue_work(ThreadPool::PToken &ptok_,
FuncType &&func_)
{
SEMA_WAIT(_sema);
_queue.enqueue(ptok_,
std::forward<FuncType>(func_));
}
template<typename FuncType>
[[nodiscard]]
std::future<typename std::result_of<FuncType()>::type>
enqueue_task(FuncType&& f_)
{
using TaskReturnType = typename std::result_of<FuncType()>::type;
using Promise = std::promise<TaskReturnType>;
auto promise = std::make_shared<Promise>();
auto future = promise->get_future();
template<typename FuncType>
inline
void
ThreadPool::enqueue_work(FuncType &&func_)
{
SEMA_WAIT(_sema);
_queue.enqueue(std::forward<FuncType>(func_));
}
template<typename FuncType>
[[nodiscard]]
inline
std::future<typename std::result_of<FuncType()>::type>
ThreadPool::enqueue_task(FuncType&& func_)
{
using TaskReturnType = typename std::result_of<FuncType()>::type;
using Promise = std::promise<TaskReturnType>;
auto work = [=]()
Promise promise;
auto future = promise.get_future();
auto work =
[promise_ = std::move(promise),
func_ = std::forward<FuncType>(func_)]() mutable
{
auto rv = f_();
promise->set_value(rv);
try
{
auto rv = func_();
promise_.set_value(std::move(rv));
}
catch(...)
{
promise_.set_exception(std::current_exception());
}
};
timespec ts = {0,1000};
for(unsigned i = 0; i < 1000000; i++)
{
if(_queue_depth.load(std::memory_order_acquire) < _max_queue_depth)
break;
::nanosleep(&ts,NULL);
}
_queue.enqueue(work);
_queue_depth.fetch_add(1,std::memory_order_release);
SEMA_WAIT(_sema);
_queue.enqueue(std::move(work));
return future;
}
return future;
}
public:
std::vector<pthread_t>
threads() const
{
std::lock_guard<std::mutex> lg(_threads_mutex);
return _threads;
}
inline
std::vector<pthread_t>
ThreadPool::threads() const
{
std::lock_guard<std::mutex> lg(_threads_mutex);
moodycamel::ProducerToken
ptoken()
{
return moodycamel::ProducerToken(_queue);
}
return _threads;
}
private:
Queue _queue;
std::atomic<unsigned> _queue_depth;
unsigned const _max_queue_depth;
private:
std::string const _name;
std::vector<pthread_t> _threads;
mutable std::mutex _threads_mutex;
};
inline
ThreadPool::PToken
ThreadPool::ptoken()
{
return ThreadPool::PToken(_queue);
}

114
libfuse/lib/fuse_loop.cpp

@ -4,7 +4,6 @@
#include "cpu.hpp"
#include "fmt/core.h"
#include "make_unique.hpp"
#include "scope_guard.hpp"
#include "thread_pool.hpp"
@ -17,6 +16,10 @@
#include "fuse_msgbuf.hpp"
#include "fuse_ll.hpp"
#include <cassert>
#include <memory>
#include <vector>
#include <errno.h>
#include <pthread.h>
#include <semaphore.h>
@ -28,12 +31,10 @@
#include <syslog.h>
#include <unistd.h>
#include <cassert>
#include <vector>
static
bool
retriable_receive_error(const int err_)
_retriable_receive_error(const int err_)
{
switch(err_)
{
@ -46,20 +47,10 @@ retriable_receive_error(const int err_)
}
}
static
bool
fatal_receive_error(const int err_)
{
return (err_ < 0);
}
static
void
handle_receive_error(const int rv_,
fuse_msgbuf_t *msgbuf_)
_print_error(int rv_)
{
msgbuf_free(msgbuf_);
fmt::print(stderr,
"mergerfs: error reading from /dev/fuse - {} ({})\n",
strerror(-rv_),
@ -72,8 +63,8 @@ struct AsyncWorker
sem_t *_finished;
std::shared_ptr<ThreadPool> _process_tp;
AsyncWorker(fuse_session *se_,
sem_t *finished_,
AsyncWorker(fuse_session *se_,
sem_t *finished_,
std::shared_ptr<ThreadPool> process_tp_)
: _se(se_),
_finished(finished_),
@ -88,7 +79,7 @@ struct AsyncWorker
DEFER{ fuse_session_exit(_se); };
DEFER{ sem_post(_finished); };
moodycamel::ProducerToken ptok(_process_tp->ptoken());
ThreadPool::PToken ptok(_process_tp->ptoken());
while(!fuse_session_exited(_se))
{
int rv;
@ -96,26 +87,30 @@ struct AsyncWorker
msgbuf = msgbuf_alloc();
do
while(true)
{
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
rv = _se->receive_buf(_se,msgbuf);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
if(rv == 0)
return;
if(retriable_receive_error(rv))
if(rv > 0)
break;
if(::_retriable_receive_error(rv))
continue;
if(fatal_receive_error(rv))
return handle_receive_error(rv,msgbuf);
} while(false);
auto const func = [=]
{
_se->process_buf(_se,msgbuf);
msgbuf_free(msgbuf);
};
msgbuf_free(msgbuf);
if((rv == 0) || (rv == -ENODEV))
return;
return ::_print_error(rv);
}
_process_tp->enqueue_work(ptok,func);
_process_tp->enqueue_work(ptok,
[=]()
{
_se->process_buf(_se,msgbuf);
msgbuf_free(msgbuf);
});
}
}
};
@ -147,20 +142,25 @@ struct SyncWorker
msgbuf = msgbuf_alloc();
do
while(true)
{
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
rv = _se->receive_buf(_se,msgbuf);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
if(rv == 0)
return;
if(retriable_receive_error(rv))
if(rv > 0)
break;
if(::_retriable_receive_error(rv))
continue;
if(fatal_receive_error(rv))
return handle_receive_error(rv,msgbuf);
} while(false);
msgbuf_free(msgbuf);
if((rv == 0) || (rv == -ENODEV))
return;
return ::_print_error(rv);
}
_se->process_buf(_se,msgbuf);
msgbuf_free(msgbuf);
}
}
@ -193,7 +193,7 @@ fuse_start_thread(pthread_t *thread_id,
static
int
calculate_thread_count(const int raw_thread_count_)
_calculate_thread_count(const int raw_thread_count_)
{
int thread_count;
@ -213,10 +213,12 @@ calculate_thread_count(const int raw_thread_count_)
static
void
calculate_thread_counts(int *read_thread_count_,
int *process_thread_count_,
int *process_thread_queue_depth_)
_calculate_thread_counts(int *read_thread_count_,
int *process_thread_count_,
int *process_thread_queue_depth_)
{
constexpr int DEFAULT_QUEUE_DEPTH = 3;
if((*read_thread_count_ == -1) && (*process_thread_count_ == -1))
{
int nproc;
@ -225,15 +227,22 @@ calculate_thread_counts(int *read_thread_count_,
*read_thread_count_ = 2;
*process_thread_count_ = std::max(2,(nproc - 2));
}
else if((*read_thread_count_ == 0) && (*process_thread_count_ != -1))
{
*read_thread_count_ = 2;
*process_thread_count_ = ::_calculate_thread_count(*process_thread_count_);
}
else
{
*read_thread_count_ = ::calculate_thread_count(*read_thread_count_);
*read_thread_count_ = ::_calculate_thread_count(*read_thread_count_);
if(*process_thread_count_ != -1)
*process_thread_count_ = ::calculate_thread_count(*process_thread_count_);
*process_thread_count_ = ::_calculate_thread_count(*process_thread_count_);
}
if(*process_thread_queue_depth_ <= 0)
*process_thread_queue_depth_ = *process_thread_count_;
*process_thread_queue_depth_ = DEFAULT_QUEUE_DEPTH;
*process_thread_queue_depth_ *= *process_thread_count_;
}
static
@ -456,8 +465,8 @@ pin_threads(const std::vector<pthread_t> read_threads_,
static
void
wait(fuse_session *se_,
sem_t *finished_sem_)
_wait(fuse_session *se_,
sem_t *finished_sem_)
{
while(!fuse_session_exited(se_))
sem_wait(finished_sem_);
@ -484,14 +493,13 @@ fuse_session_loop_mt(struct fuse_session *se_,
read_thread_count = raw_read_thread_count_;
process_thread_count = raw_process_thread_count_;
process_thread_queue_depth = raw_process_thread_queue_depth_;
::calculate_thread_counts(&read_thread_count,
&process_thread_count,
&process_thread_queue_depth);
::_calculate_thread_counts(&read_thread_count,
&process_thread_count,
&process_thread_queue_depth);
if(process_thread_count > 0)
process_tp = std::make_shared<ThreadPool>(process_thread_count,
(process_thread_count *
process_thread_queue_depth),
process_thread_queue_depth,
"fuse.process");
read_tp = std::make_unique<ThreadPool>(read_thread_count,
@ -526,7 +534,7 @@ fuse_session_loop_mt(struct fuse_session *se_,
process_thread_queue_depth,
pin_threads_type_.c_str());
::wait(se_,&finished);
::_wait(se_,&finished);
sem_destroy(&finished);

14
mkdocs/docs/config/func_readdir.md

@ -1,14 +1,20 @@
# func.readdir
examples: `func.readdir=seq`, `func.readdir=cor:4`
Defaults to `seq`
Examples: `func.readdir=seq`, `func.readdir=cor:4`
`readdir` has policies to control how it reads directory content.
| Policy | Description |
| ------ | ----------- |
| seq | "sequential" : Iterate sequentially over branches in the order defined in `branches`. This is the default and traditional behavior found prior to the readdir policy introduction. This will be increasingly slower as more branches are added to the pool. Especially if needing to wait for drives to spin up or network filesystems to respond. |
| cosr | "concurrent open, sequential read" : Concurrently open branch directories using a thread pool and process them in the order defined in `branches`. This keeps memory and CPU usage low while also reducing the time spent waiting on branches to respond. Number of threads defaults to the number of logical cores. Can be overwritten via the syntax `func.readdir=cosr:N` where `N` is the number of threads. |
| cor | "concurrent open and read" : Concurrently open branch directories and immediately start reading their contents using a thread pool. This will result in slightly higher memory and CPU usage but reduced latency. Particularly when using higher latency / slower speed network filesystem branches. Unlike `seq` and `cosr` the order of files could change due the async nature of the thread pool. This should not be a problem since the order of files listed in not guaranteed. Number of threads defaults to the number of logical cores. Can be overwritten via the syntax `func.readdir=cor:N` where `N` is the number of threads. |
| seq | "sequential" : Iterate sequentially over branches in the order defined in `branches`. This is the default and traditional behavior found prior to the readdir policy introduction. This will be increasingly slower as more branches are added to the pool. Especially if needing to wait for drives to spin up or network filesystems to respond. |
| cosr:N:M | "concurrent open, sequential read" : Concurrently open branch directories using a thread pool and process them in the order defined in `branches`. This keeps memory and CPU usage low while also reducing the time spent waiting on branches to respond. `N` is the number of threads. If negative it will be the core count divided by `abs(N)`. `M` is the queue depth. If either value is `0` it will be decided based on system configuration. |
| cosr:N | cosr:N:M with M = 0 |
| cosr | cosr:N:M with N = 0 and M = 0 |
| cor:N:M | "concurrent open and read" : Concurrently open branch directories and immediately start reading their contents using a thread pool. This will result in slightly higher memory and CPU usage but reduced latency. Particularly when using higher latency / slower speed network filesystem branches. Unlike `seq` and `cosr` the order of files could change due the async nature of the thread pool. This should not be a problem since the order of files listed in not guaranteed. `N` is the number of threads. If negative it will be the core count divided by `abs(N)`. `M` is the queue depth. If either value is `0` it will be decided based on system configuration. |
| cor:N | cosr:N:M with M = 0 |
| cor | cosr:N:M with N = 0 and M = 0 |
Keep in mind that `readdir` mostly just provides a list of file names
in a directory and possibly some basic metadata about said files. To

32
mkdocs/docs/config/threads.md

@ -6,6 +6,8 @@ parallel behaviors.
## read-thread-count
* Defaults to `0`
The number of threads used to read (and possibly process) messages
from the kernel.
@ -14,9 +16,10 @@ from the kernel.
* `read-thread-count=N` where `N>0`: Create a thread pool of `N` threads.
* `read-thread-count=N` where `N<0`: Create a thread pool of `CPUCount /
-N` threads.
* `read-thread-count=0` where `process-thread-count!=-1`: Creates `2`
read threads and number of process threads as defined below.
* `read-thread-count=-1` where `process-thread-count=-1`: Creates `2`
read threads and `max(2,CPUCount-2)` process threads.
* Defaults to `0`.
When `process-thread-count=-1` (the default) this option sets the
number of threads which read and then process requests from the
@ -26,11 +29,14 @@ When `process-thread-count` is set to anything else mergerfs will
create two thread pools. A "read" thread pool which just reads from
the kernel and hands off requests to the "process" thread pool.
Generally, only 1 or 2 "read" threads are necessary.
Generally, only 1 or 2 "read" threads are necessary when used with a
"process" thread pool.
## process-thread-count
* Defaults to `-1`
When enabled this sets the number of threads in the message processing pool.
* `process-thread-count=-1`: Process thread pool is disabled.
@ -39,15 +45,21 @@ When enabled this sets the number of threads in the message processing pool.
* `process-thread-count=N` where `N>0`: Create a thread pool of `N` threads.
* `process-thread-count=N` where `N<-1`: Create a thread pool of `CPUCount /
-N` threads.
* Defaults to `-1`.
## process-thread-queue-depth
* `process-thread-queue-depth=N` where `N>0`: Sets the number of outstanding
requests that a process thread can have to N. If requests come in
faster than can be processed and the max queue depth hit then
queuing the request will block in order to limit memory growth.
* `process-thread-queue-depth=0`: Sets the queue depth to the thread
pool count.
* Defaults to `0`.
* Defaults to `3`
Sets the depth queue for the processing thread queue per
thread. Meaning if the read threads are getting requests faster than
can be processed they will be queued up upto the queue depth. Despite
the calculation being per thread the queue depth however is shared
across all in the pool.
* `process-thread-queue-depth=N` where `N>0`: Sets the number of
outstanding requests that the process thread pool can have to `N *
process-thread-count`. If requests come in faster than can be
processed and the max queue depth hit then queuing the request will
block in order to limit memory growth.
* `process-thread-queue-depth<=0`: Sets the queue depth to 3.

1
mkdocs/mkdocs.yml

@ -11,6 +11,7 @@ plugins:
theme:
name: material
logo: logo.png
favicon: logo.png
features:
- content.action.edit
- content.action.view

2
src/config.cpp

@ -131,7 +131,7 @@ Config::Config()
symlinkify_timeout(3600),
fuse_read_thread_count(0),
fuse_process_thread_count(-1),
fuse_process_thread_queue_depth(0),
fuse_process_thread_queue_depth(3),
fuse_pin_threads("false"),
version(MERGERFS_VERSION),
writeback_cache(false),

2
src/fuse_readdir_base.hpp

@ -28,7 +28,7 @@ namespace FUSE
virtual ~ReadDirBase() {};
public:
virtual int operator()(fuse_file_info_t const *ffi,
virtual int operator()(const fuse_file_info_t *ffi,
fuse_dirents_t *buf) = 0;
};
}

5
src/fuse_readdir_cor.cpp

@ -139,6 +139,7 @@ namespace l
}
static
inline
int
concurrent_readdir(ThreadPool &tp_,
const Branches::CPtr &branches_,
@ -168,7 +169,7 @@ namespace l
mutex);
};
auto rv = tp_.enqueue_task(func);
auto rv = tp_.enqueue_task(std::move(func));
futures.emplace_back(std::move(rv));
}
@ -182,7 +183,7 @@ namespace l
}
int
FUSE::ReadDirCOR::operator()(fuse_file_info_t const *ffi_,
FUSE::ReadDirCOR::operator()(const fuse_file_info_t *ffi_,
fuse_dirents_t *buf_)
{
Config::Read cfg;

4
src/fuse_readdir_cor.hpp

@ -33,10 +33,10 @@ namespace FUSE
unsigned max_queue_depth);
~ReadDirCOR();
int operator()(fuse_file_info_t const *ffi,
int operator()(const fuse_file_info_t *ffi,
fuse_dirents_t *buf);
private:
ThreadPool _tp;
mutable ThreadPool _tp;
};
}

4
src/fuse_readdir_cosr.cpp

@ -107,7 +107,7 @@ namespace l
futures.reserve(branches_->size());
for(auto const &branch : *branches_)
for(const auto &branch : *branches_)
{
auto func =
[&branch,&rel_dirpath_,uid_,gid_]()
@ -124,7 +124,7 @@ namespace l
return DirRV{&branch.path,dir,errno};
};
auto rv = tp_.enqueue_task(func);
auto rv = tp_.enqueue_task(std::move(func));
futures.emplace_back(std::move(rv));
}

73
src/fuse_readdir_factory.cpp

@ -28,50 +28,53 @@
#include <cstdlib>
#include <set>
#define DEFAULT_MAX_QUEUE_DEPTH 3
namespace l
static
void
_read_cfg(std::string const str_,
std::string &type_,
unsigned &concurrency_,
unsigned &max_queue_depth_)
{
static
void
read_cfg(std::string const str_,
std::string &type_,
unsigned &concurrency_,
unsigned &max_queue_depth_)
{
char type[16];
int concurrency;
int max_queue_depth;
concurrency = 0;
max_queue_depth = 0;
std::sscanf(str_.c_str(),
"%15[a-z]:%d:%d",
type,
&concurrency,
&max_queue_depth);
if(concurrency == 0)
char type[16];
int concurrency;
int max_queue_depth;
concurrency = 0;
max_queue_depth = 0;
std::sscanf(str_.c_str(),
"%15[a-z]:%d:%d",
type,
&concurrency,
&max_queue_depth);
if(concurrency == 0)
{
concurrency = std::thread::hardware_concurrency();
else if(concurrency < 0)
if(concurrency > 8)
concurrency = 8;
}
else if(concurrency < 0)
{
concurrency = (std::thread::hardware_concurrency() / std::abs(concurrency));
}
if(concurrency == 0)
concurrency = 1;
if(concurrency <= 0)
concurrency = 1;
if(max_queue_depth == 0)
max_queue_depth = DEFAULT_MAX_QUEUE_DEPTH;
if(max_queue_depth <= 0)
max_queue_depth = 2;
max_queue_depth *= concurrency;
max_queue_depth *= concurrency;
type_ = type;
concurrency_ = concurrency;
max_queue_depth_ = max_queue_depth;
}
type_ = type;
concurrency_ = concurrency;
max_queue_depth_ = max_queue_depth;
}
bool
FUSE::ReadDirFactory::valid(std::string const str_)
FUSE::ReadDirFactory::valid(const std::string str_)
{
unsigned concurrency;
unsigned max_queue_depth;
@ -81,7 +84,7 @@ FUSE::ReadDirFactory::valid(std::string const str_)
"seq", "cosr", "cor"
};
l::read_cfg(str_,type,concurrency,max_queue_depth);
::_read_cfg(str_,type,concurrency,max_queue_depth);
if(types.find(type) == types.end())
return false;
@ -101,7 +104,7 @@ FUSE::ReadDirFactory::make(std::string const str_)
if(!valid(str_))
return {};
l::read_cfg(str_,type,concurrency,max_queue_depth);
::_read_cfg(str_,type,concurrency,max_queue_depth);
if(type == "seq")
return std::make_shared<FUSE::ReadDirSeq>();

Loading…
Cancel
Save