diff --git a/Makefile b/Makefile index 1fd74910..a12854b2 100644 --- a/Makefile +++ b/Makefile @@ -75,7 +75,7 @@ CFLAGS := ${CFLAGS} \ CXXFLAGS ?= ${OPT_FLAGS} CXXFLAGS := \ ${CXXFLAGS} \ - -std=c++11 \ + -std=c++17 \ $(STATIC_FLAGS) \ $(LTO_FLAGS) \ -Wall \ diff --git a/libfuse/Makefile b/libfuse/Makefile index 9913a664..ea2defe7 100644 --- a/libfuse/Makefile +++ b/libfuse/Makefile @@ -76,7 +76,7 @@ CXXFLAGS ?= \ CXXFLAGS := \ ${CXXFLAGS} \ $(LTO_FLAGS) \ - -std=c++11 \ + -std=c++17 \ -Wall \ -pipe \ -MMD diff --git a/libfuse/include/invocable.h b/libfuse/include/invocable.h new file mode 100644 index 00000000..cbb70464 --- /dev/null +++ b/libfuse/include/invocable.h @@ -0,0 +1,346 @@ +#ifndef _ANY_INVOKABLE_H_ +#define _ANY_INVOKABLE_H_ + +#include +#include +#include + +// clang-format off +/* +namespace std { + template class any_invocable; // never defined + + template + class any_invocable { + public: + using result_type = R; + + // SECTION.3, construct/copy/destroy + any_invocable() noexcept; + any_invocable(nullptr_t) noexcept; + any_invocable(any_invocable&&) noexcept; + template any_invocable(F&&); + + template + explicit any_invocable(in_place_type_t, Args&&...); + template + explicit any_invocable(in_place_type_t, initializer_list, Args&&...); + + any_invocable& operator=(any_invocable&&) noexcept; + any_invocable& operator=(nullptr_t) noexcept; + template any_invocable& operator=(F&&); + template any_invocable& operator=(reference_wrapper) noexcept; + + ~any_invocable(); + + // SECTION.4, any_invocable modifiers + void swap(any_invocable&) noexcept; + + // SECTION.5, any_invocable capacity + explicit operator bool() const noexcept; + + // SECTION.6, any_invocable invocation + R operator()(ArgTypes...) cv ref noexcept(noex); + + // SECTION.7, null pointer comparisons + friend bool operator==(const any_invocable&, nullptr_t) noexcept; + + // SECTION.8, specialized algorithms + friend void swap(any_invocable&, any_invocable&) noexcept; + }; +} +*/ +// clang-format on + +namespace ofats { + +namespace any_detail { + +using buffer = std::aligned_storage_t; + +template +inline constexpr bool is_small_object_v = + sizeof(T) <= sizeof(buffer) && alignof(buffer) % alignof(T) == 0 && + std::is_nothrow_move_constructible_v; + +union storage { + void* ptr_ = nullptr; + buffer buf_; +}; + +enum class action { destroy, move }; + +template +struct handler_traits { + template + struct handler_base { + static void handle(action act, storage* current, storage* other = nullptr) { + switch (act) { + case (action::destroy): + Derived::destroy(*current); + break; + case (action::move): + Derived::move(*current, *other); + break; + } + } + }; + + template + struct small_handler : handler_base> { + template + static void create(storage& s, Args&&... args) { + new (static_cast(&s.buf_)) T(std::forward(args)...); + } + + static void destroy(storage& s) noexcept { + T& value = *static_cast(static_cast(&s.buf_)); + value.~T(); + } + + static void move(storage& dst, storage& src) noexcept { + create(dst, std::move(*static_cast(static_cast(&src.buf_)))); + destroy(src); + } + + static R call(const storage& s, ArgTypes... args) { + return std::invoke( + *static_cast(static_cast(&const_cast(s).buf_)), + std::forward(args)...); + } + }; + + template + struct large_handler : handler_base> { + template + static void create(storage& s, Args&&... args) { + s.ptr_ = new T(std::forward(args)...); + } + + static void destroy(storage& s) noexcept { delete static_cast(s.ptr_); } + + static void move(storage& dst, storage& src) noexcept { + dst.ptr_ = src.ptr_; + } + + static R call(const storage& s, ArgTypes... args) { + return std::invoke(*static_cast(s.ptr_), + std::forward(args)...); + } + }; + + template + using handler = std::conditional_t, small_handler, + large_handler>; +}; + +template +struct is_in_place_type : std::false_type {}; + +template +struct is_in_place_type> : std::true_type {}; + +template +inline constexpr auto is_in_place_type_v = is_in_place_type::value; + +template +class any_invocable_impl { + template + using handler = + typename any_detail::handler_traits::template handler; + + using storage = any_detail::storage; + using action = any_detail::action; + using handle_func = void (*)(any_detail::action, any_detail::storage*, + any_detail::storage*); + using call_func = R (*)(const any_detail::storage&, ArgTypes...); + + public: + using result_type = R; + + any_invocable_impl() noexcept = default; + any_invocable_impl(std::nullptr_t) noexcept {} + any_invocable_impl(any_invocable_impl&& rhs) noexcept { + if (rhs.handle_) { + handle_ = rhs.handle_; + handle_(action::move, &storage_, &rhs.storage_); + call_ = rhs.call_; + rhs.handle_ = nullptr; + } + } + + any_invocable_impl& operator=(any_invocable_impl&& rhs) noexcept { + any_invocable_impl{std::move(rhs)}.swap(*this); + return *this; + } + any_invocable_impl& operator=(std::nullptr_t) noexcept { + destroy(); + return *this; + } + + ~any_invocable_impl() { destroy(); } + + void swap(any_invocable_impl& rhs) noexcept { + if (handle_) { + if (rhs.handle_) { + storage tmp; + handle_(action::move, &tmp, &storage_); + rhs.handle_(action::move, &storage_, &rhs.storage_); + handle_(action::move, &rhs.storage_, &tmp); + std::swap(handle_, rhs.handle_); + std::swap(call_, rhs.call_); + } else { + rhs.swap(*this); + } + } else if (rhs.handle_) { + rhs.handle_(action::move, &storage_, &rhs.storage_); + handle_ = rhs.handle_; + call_ = rhs.call_; + rhs.handle_ = nullptr; + } + } + + explicit operator bool() const noexcept { return handle_ != nullptr; } + + protected: + template + void create(Args&&... args) { + using hdl = handler; + hdl::create(storage_, std::forward(args)...); + handle_ = &hdl::handle; + call_ = &hdl::call; + } + + void destroy() noexcept { + if (handle_) { + handle_(action::destroy, &storage_, nullptr); + handle_ = nullptr; + } + } + + R call(ArgTypes... args) const noexcept(is_noexcept) { + return call_(storage_, std::forward(args)...); + } + + friend bool operator==(const any_invocable_impl& f, std::nullptr_t) noexcept { + return !f; + } + friend bool operator==(std::nullptr_t, const any_invocable_impl& f) noexcept { + return !f; + } + friend bool operator!=(const any_invocable_impl& f, std::nullptr_t) noexcept { + return static_cast(f); + } + friend bool operator!=(std::nullptr_t, const any_invocable_impl& f) noexcept { + return static_cast(f); + } + + friend void swap(any_invocable_impl& lhs, any_invocable_impl& rhs) noexcept { + lhs.swap(rhs); + } + + private: + storage storage_; + handle_func handle_ = nullptr; + call_func call_; +}; + +template +using remove_cvref_t = std::remove_cv_t>; + +template +using can_convert = std::conjunction< + std::negation, AI>>, + std::negation>>, + std::is_invocable_r, + std::bool_constant<(!noex || + std::is_nothrow_invocable_r_v)>, + std::is_constructible, F>>; + +} // namespace any_detail + +template +class any_invocable; + +#define __OFATS_ANY_INVOCABLE(cv, ref, noex, inv_quals) \ + template \ + class any_invocable final \ + : public any_detail::any_invocable_impl { \ + using base_type = any_detail::any_invocable_impl; \ + \ + public: \ + using base_type::base_type; \ + \ + template < \ + class F, \ + class = std::enable_if_t::value>> \ + any_invocable(F&& f) { \ + base_type::template create>(std::forward(f)); \ + } \ + \ + template , \ + class = std::enable_if_t< \ + std::is_move_constructible_v && \ + std::is_constructible_v && \ + std::is_invocable_r_v && \ + (!noex || std::is_nothrow_invocable_r_v)>> \ + explicit any_invocable(std::in_place_type_t, Args&&... args) { \ + base_type::template create(std::forward(args)...); \ + } \ + \ + template < \ + class T, class U, class... Args, class VT = std::decay_t, \ + class = std::enable_if_t< \ + std::is_move_constructible_v && \ + std::is_constructible_v&, Args...> && \ + std::is_invocable_r_v && \ + (!noex || \ + std::is_nothrow_invocable_r_v)>> \ + explicit any_invocable(std::in_place_type_t, \ + std::initializer_list il, Args&&... args) { \ + base_type::template create(il, std::forward(args)...); \ + } \ + \ + template > \ + std::enable_if_t && \ + std::is_move_constructible_v, \ + any_invocable&> \ + operator=(F&& f) { \ + any_invocable{std::forward(f)}.swap(*this); \ + return *this; \ + } \ + template \ + any_invocable& operator=(std::reference_wrapper f) { \ + any_invocable{f}.swap(*this); \ + return *this; \ + } \ + \ + R operator()(ArgTypes... args) cv ref noexcept(noex) { \ + return base_type::call(std::forward(args)...); \ + } \ + } + +// cv -> {`empty`, const} +// ref -> {`empty`, &, &&} +// noex -> {true, false} +// inv_quals -> (is_empty(ref) ? & : ref) +__OFATS_ANY_INVOCABLE(, , false, &); // 000 +__OFATS_ANY_INVOCABLE(, , true, &); // 001 +__OFATS_ANY_INVOCABLE(, &, false, &); // 010 +__OFATS_ANY_INVOCABLE(, &, true, &); // 011 +__OFATS_ANY_INVOCABLE(, &&, false, &&); // 020 +__OFATS_ANY_INVOCABLE(, &&, true, &&); // 021 +__OFATS_ANY_INVOCABLE(const, , false, const&); // 100 +__OFATS_ANY_INVOCABLE(const, , true, const&); // 101 +__OFATS_ANY_INVOCABLE(const, &, false, const&); // 110 +__OFATS_ANY_INVOCABLE(const, &, true, const&); // 111 +__OFATS_ANY_INVOCABLE(const, &&, false, const&&); // 120 +__OFATS_ANY_INVOCABLE(const, &&, true, const&&); // 121 + +#undef __OFATS_ANY_INVOCABLE + +} // namespace ofats + +#endif // _ANY_INVOKABLE_H_ diff --git a/libfuse/include/thread_pool.hpp b/libfuse/include/thread_pool.hpp index e20d710e..3ffed9b7 100644 --- a/libfuse/include/thread_pool.hpp +++ b/libfuse/include/thread_pool.hpp @@ -1,6 +1,8 @@ #pragma once #include "moodycamel/blockingconcurrentqueue.h" +#include "moodycamel/lightweightsemaphore.h" +#include "invocable.h" #include #include @@ -13,9 +15,22 @@ #include #include #include +#include +#include #include +#define SEM +//#undef SEM + +#ifdef SEM +#define SEMA_WAIT(S) (S.wait()) +#define SEMA_SIGNAL(S) (S.signal()) +#else +#define SEMA_WAIT(S) +#define SEMA_SIGNAL(S) +#endif + struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits { @@ -25,157 +40,217 @@ struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits class ThreadPool { +public: + using PToken = moodycamel::ProducerToken; + using CToken = moodycamel::ConsumerToken; + private: - using Func = std::function; + using Func = ofats::any_invocable; using Queue = moodycamel::BlockingConcurrentQueue; public: explicit - ThreadPool(unsigned const thread_count_ = std::thread::hardware_concurrency(), - unsigned const max_queue_depth_ = std::thread::hardware_concurrency(), - std::string const name_ = {}) - : _queue(), - _queue_depth(0), - _max_queue_depth(std::max(thread_count_,max_queue_depth_)), - _name(name_) - { - syslog(LOG_DEBUG, - "threadpool (%s): spawning %u threads w/ max queue depth %u%s", - _name.c_str(), - thread_count_, - _max_queue_depth, - ((_max_queue_depth != max_queue_depth_) ? " (adjusted)" : "")); + ThreadPool(const unsigned thread_count_ = std::thread::hardware_concurrency(), + const unsigned max_queue_depth_ = std::thread::hardware_concurrency(), + std::string const name_ = {}); + ~ThreadPool(); - sigset_t oldset; - sigset_t newset; +private: + static void *start_routine(void *arg_); - sigfillset(&newset); - pthread_sigmask(SIG_BLOCK,&newset,&oldset); +public: + int add_thread(std::string const name = {}); + int remove_thread(void); + int set_threads(const std::size_t count); - _threads.reserve(thread_count_); - for(std::size_t i = 0; i < thread_count_; ++i) - { - int rv; - pthread_t t; + void shutdown(void); - rv = pthread_create(&t,NULL,ThreadPool::start_routine,this); - if(rv != 0) - { - syslog(LOG_WARNING, - "threadpool (%s): error spawning thread - %d (%s)", - _name.c_str(), - rv, - strerror(rv)); - continue; - } +public: + template + void + enqueue_work(ThreadPool::PToken &ptok_, + FuncType &&func_); - if(!_name.empty()) - pthread_setname_np(t,_name.c_str()); + template + void + enqueue_work(FuncType &&func_); - _threads.push_back(t); - } + template + [[nodiscard]] + std::future::type> + enqueue_task(FuncType&& func_); - pthread_sigmask(SIG_SETMASK,&oldset,NULL); +public: + std::vector threads() const; + ThreadPool::PToken ptoken(); - if(_threads.empty()) - throw std::runtime_error("threadpool: failed to spawn any threads"); - } +private: + Queue _queue; + moodycamel::LightweightSemaphore _sema; - ~ThreadPool() - { - syslog(LOG_DEBUG, - "threadpool (%s): destroying %lu threads", - _name.c_str(), - _threads.size()); +private: + std::string const _name; + std::vector _threads; + mutable std::mutex _threads_mutex; +}; - auto func = []() { pthread_exit(NULL); }; - for(std::size_t i = 0; i < _threads.size(); i++) - _queue.enqueue(func); - for(auto t : _threads) - pthread_cancel(t); - for(auto t : _threads) - pthread_join(t,NULL); - } +inline +ThreadPool::ThreadPool(const unsigned thread_count_, + const unsigned max_queue_depth_, + const std::string name_) + : _queue(), + _sema(max_queue_depth_), + _name(name_) +{ + sigset_t oldset; + sigset_t newset; -private: - static - void* - start_routine(void *arg_) - { - ThreadPool *btp = static_cast(arg_); - ThreadPool::Func func; - ThreadPool::Queue &q = btp->_queue; - std::atomic &queue_depth = btp->_queue_depth; - moodycamel::ConsumerToken ctok(btp->_queue); + sigfillset(&newset); + pthread_sigmask(SIG_BLOCK,&newset,&oldset); - while(true) - { - q.wait_dequeue(ctok,func); + _threads.reserve(thread_count_); + for(std::size_t i = 0; i < thread_count_; ++i) + { + int rv; + pthread_t t; - func(); + rv = pthread_create(&t,NULL,ThreadPool::start_routine,this); + if(rv != 0) + { + syslog(LOG_WARNING, + "threadpool (%s): error spawning thread - %d (%s)", + _name.c_str(), + rv, + strerror(rv)); + continue; + } - queue_depth.fetch_sub(1,std::memory_order_release); - } + if(!_name.empty()) + pthread_setname_np(t,_name.c_str()); - return NULL; - } + _threads.push_back(t); + } -public: - int - add_thread(std::string const name_ = {}) - { - int rv; - pthread_t t; - sigset_t oldset; - sigset_t newset; - std::string name; + pthread_sigmask(SIG_SETMASK,&oldset,NULL); - name = (name_.empty() ? _name : name_); + if(_threads.empty()) + throw std::runtime_error("threadpool: failed to spawn any threads"); - sigfillset(&newset); - pthread_sigmask(SIG_BLOCK,&newset,&oldset); - rv = pthread_create(&t,NULL,ThreadPool::start_routine,this); - pthread_sigmask(SIG_SETMASK,&oldset,NULL); + syslog(LOG_DEBUG, + "threadpool (%s): spawned %zu threads w/ max queue depth %u", + _name.c_str(), + _threads.size(), + max_queue_depth_); +} + + +inline +ThreadPool::~ThreadPool() +{ + syslog(LOG_DEBUG, + "threadpool (%s): destroying %zu threads", + _name.c_str(), + _threads.size()); + + for(auto t : _threads) + pthread_cancel(t); + for(auto t : _threads) + pthread_join(t,NULL); +} - if(rv != 0) - { - syslog(LOG_WARNING, - "threadpool (%s): error spawning thread - %d (%s)", - _name.c_str(), - rv, - strerror(rv)); - return -rv; - } - if(!name.empty()) - pthread_setname_np(t,name.c_str()); +inline +void* +ThreadPool::start_routine(void *arg_) +{ + ThreadPool *btp = static_cast(arg_); + + bool done; + ThreadPool::Func func; + ThreadPool::Queue &q = btp->_queue; + moodycamel::LightweightSemaphore &sema = btp->_sema; + ThreadPool::CToken ctok(btp->_queue); + done = false; + while(!done) { - std::lock_guard lg(_threads_mutex); - _threads.push_back(t); + q.wait_dequeue(ctok,func); + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL); + + try + { + func(); + } + catch(std::exception &e) + { + done = true; + } + + SEMA_SIGNAL(sema); + + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL); } - syslog(LOG_DEBUG, - "threadpool (%s): 1 thread added named '%s'", - _name.c_str(), - name.c_str()); + return nullptr; +} - return 0; - } - int - remove_thread(void) - { + +inline +int +ThreadPool::add_thread(const std::string name_) +{ + int rv; + pthread_t t; + sigset_t oldset; + sigset_t newset; + std::string name; + + name = (name_.empty() ? _name : name_); + + sigfillset(&newset); + pthread_sigmask(SIG_BLOCK,&newset,&oldset); + rv = pthread_create(&t,NULL,ThreadPool::start_routine,this); + pthread_sigmask(SIG_SETMASK,&oldset,NULL); + + if(rv != 0) { - std::lock_guard lg(_threads_mutex); - if(_threads.size() <= 1) - return -EINVAL; + syslog(LOG_WARNING, + "threadpool (%s): error spawning thread - %d (%s)", + _name.c_str(), + rv, + strerror(rv)); + return -rv; } - std::promise promise; - auto func = [&]() + if(!name.empty()) + pthread_setname_np(t,name.c_str()); + + { + std::lock_guard lg(_threads_mutex); + _threads.push_back(t); + } + + return 0; +} + + +inline +int +ThreadPool::remove_thread(void) +{ + { + std::lock_guard lg(_threads_mutex); + if(_threads.size() <= 1) + return -EINVAL; + } + + std::promise promise; + auto func = + [this,&promise]() { pthread_t t; @@ -195,124 +270,120 @@ public: } } - syslog(LOG_DEBUG, - "threadpool (%s): 1 thread removed", - _name.c_str()); - pthread_exit(NULL); }; - enqueue_work(func); - pthread_join(promise.get_future().get(),NULL); + enqueue_work(std::move(func)); + pthread_join(promise.get_future().get(),NULL); - return 0; - } + return 0; +} - int - set_threads(std::size_t const count_) + +inline +int +ThreadPool::set_threads(std::size_t const count_) +{ + int diff; { - int diff; - { - std::lock_guard lg(_threads_mutex); + std::lock_guard lg(_threads_mutex); - diff = ((int)count_ - (int)_threads.size()); - } + diff = ((int)count_ - (int)_threads.size()); + } - for(auto i = diff; i > 0; --i) - add_thread(); - for(auto i = diff; i < 0; ++i) - remove_thread(); + for(auto i = diff; i > 0; --i) + add_thread(); + for(auto i = diff; i < 0; ++i) + remove_thread(); - return diff; - } + return diff; +} -public: - template - void - enqueue_work(moodycamel::ProducerToken &ptok_, - FuncType &&f_) - { - timespec ts = {0,1000}; - for(unsigned i = 0; i < 1000000; i++) - { - if(_queue_depth.load(std::memory_order_acquire) < _max_queue_depth) - break; - ::nanosleep(&ts,NULL); - } - _queue.enqueue(ptok_,f_); - _queue_depth.fetch_add(1,std::memory_order_release); - } +inline +void +ThreadPool::shutdown(void) +{ + std::lock_guard lg(_threads_mutex); - template - void - enqueue_work(FuncType &&f_) - { - timespec ts = {0,1000}; - for(unsigned i = 0; i < 1000000; i++) - { - if(_queue_depth.load(std::memory_order_acquire) < _max_queue_depth) - break; - ::nanosleep(&ts,NULL); - } + for(pthread_t tid : _threads) + pthread_cancel(tid); + for(pthread_t tid : _threads) + pthread_join(tid,NULL); - _queue.enqueue(f_); - _queue_depth.fetch_add(1,std::memory_order_release); - } + _threads.clear(); +} + + +template +inline +void +ThreadPool::enqueue_work(ThreadPool::PToken &ptok_, + FuncType &&func_) +{ + SEMA_WAIT(_sema); + _queue.enqueue(ptok_, + std::forward(func_)); +} - template - [[nodiscard]] - std::future::type> - enqueue_task(FuncType&& f_) - { - using TaskReturnType = typename std::result_of::type; - using Promise = std::promise; - auto promise = std::make_shared(); - auto future = promise->get_future(); +template +inline +void +ThreadPool::enqueue_work(FuncType &&func_) +{ + SEMA_WAIT(_sema); + _queue.enqueue(std::forward(func_)); +} + + +template +[[nodiscard]] +inline +std::future::type> +ThreadPool::enqueue_task(FuncType&& func_) +{ + using TaskReturnType = typename std::result_of::type; + using Promise = std::promise; - auto work = [=]() + Promise promise; + auto future = promise.get_future(); + + auto work = + [promise_ = std::move(promise), + func_ = std::forward(func_)]() mutable { - auto rv = f_(); - promise->set_value(rv); + try + { + auto rv = func_(); + promise_.set_value(std::move(rv)); + } + catch(...) + { + promise_.set_exception(std::current_exception()); + } }; - timespec ts = {0,1000}; - for(unsigned i = 0; i < 1000000; i++) - { - if(_queue_depth.load(std::memory_order_acquire) < _max_queue_depth) - break; - ::nanosleep(&ts,NULL); - } - - _queue.enqueue(work); - _queue_depth.fetch_add(1,std::memory_order_release); + SEMA_WAIT(_sema); + _queue.enqueue(std::move(work)); - return future; - } + return future; +} -public: - std::vector - threads() const - { - std::lock_guard lg(_threads_mutex); - return _threads; - } +inline +std::vector +ThreadPool::threads() const +{ + std::lock_guard lg(_threads_mutex); - moodycamel::ProducerToken - ptoken() - { - return moodycamel::ProducerToken(_queue); - } + return _threads; +} -private: - Queue _queue; - std::atomic _queue_depth; - unsigned const _max_queue_depth; -private: - std::string const _name; - std::vector _threads; - mutable std::mutex _threads_mutex; -}; +inline +ThreadPool::PToken +ThreadPool::ptoken() +{ + return ThreadPool::PToken(_queue); +} diff --git a/libfuse/lib/fuse_loop.cpp b/libfuse/lib/fuse_loop.cpp index 4245bbee..a377bf60 100644 --- a/libfuse/lib/fuse_loop.cpp +++ b/libfuse/lib/fuse_loop.cpp @@ -4,7 +4,6 @@ #include "cpu.hpp" #include "fmt/core.h" -#include "make_unique.hpp" #include "scope_guard.hpp" #include "thread_pool.hpp" @@ -17,6 +16,10 @@ #include "fuse_msgbuf.hpp" #include "fuse_ll.hpp" +#include +#include +#include + #include #include #include @@ -28,12 +31,10 @@ #include #include -#include -#include static bool -retriable_receive_error(const int err_) +_retriable_receive_error(const int err_) { switch(err_) { @@ -46,20 +47,10 @@ retriable_receive_error(const int err_) } } -static -bool -fatal_receive_error(const int err_) -{ - return (err_ < 0); -} - static void -handle_receive_error(const int rv_, - fuse_msgbuf_t *msgbuf_) +_print_error(int rv_) { - msgbuf_free(msgbuf_); - fmt::print(stderr, "mergerfs: error reading from /dev/fuse - {} ({})\n", strerror(-rv_), @@ -72,8 +63,8 @@ struct AsyncWorker sem_t *_finished; std::shared_ptr _process_tp; - AsyncWorker(fuse_session *se_, - sem_t *finished_, + AsyncWorker(fuse_session *se_, + sem_t *finished_, std::shared_ptr process_tp_) : _se(se_), _finished(finished_), @@ -88,7 +79,7 @@ struct AsyncWorker DEFER{ fuse_session_exit(_se); }; DEFER{ sem_post(_finished); }; - moodycamel::ProducerToken ptok(_process_tp->ptoken()); + ThreadPool::PToken ptok(_process_tp->ptoken()); while(!fuse_session_exited(_se)) { int rv; @@ -96,26 +87,30 @@ struct AsyncWorker msgbuf = msgbuf_alloc(); - do + while(true) { pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL); rv = _se->receive_buf(_se,msgbuf); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL); - if(rv == 0) - return; - if(retriable_receive_error(rv)) + + if(rv > 0) + break; + if(::_retriable_receive_error(rv)) continue; - if(fatal_receive_error(rv)) - return handle_receive_error(rv,msgbuf); - } while(false); - auto const func = [=] - { - _se->process_buf(_se,msgbuf); - msgbuf_free(msgbuf); - }; + msgbuf_free(msgbuf); + if((rv == 0) || (rv == -ENODEV)) + return; + + return ::_print_error(rv); + } - _process_tp->enqueue_work(ptok,func); + _process_tp->enqueue_work(ptok, + [=]() + { + _se->process_buf(_se,msgbuf); + msgbuf_free(msgbuf); + }); } } }; @@ -147,20 +142,25 @@ struct SyncWorker msgbuf = msgbuf_alloc(); - do + while(true) { pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL); rv = _se->receive_buf(_se,msgbuf); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL); - if(rv == 0) - return; - if(retriable_receive_error(rv)) + if(rv > 0) + break; + if(::_retriable_receive_error(rv)) continue; - if(fatal_receive_error(rv)) - return handle_receive_error(rv,msgbuf); - } while(false); + + msgbuf_free(msgbuf); + if((rv == 0) || (rv == -ENODEV)) + return; + + return ::_print_error(rv); + } _se->process_buf(_se,msgbuf); + msgbuf_free(msgbuf); } } @@ -193,7 +193,7 @@ fuse_start_thread(pthread_t *thread_id, static int -calculate_thread_count(const int raw_thread_count_) +_calculate_thread_count(const int raw_thread_count_) { int thread_count; @@ -213,10 +213,12 @@ calculate_thread_count(const int raw_thread_count_) static void -calculate_thread_counts(int *read_thread_count_, - int *process_thread_count_, - int *process_thread_queue_depth_) +_calculate_thread_counts(int *read_thread_count_, + int *process_thread_count_, + int *process_thread_queue_depth_) { + constexpr int DEFAULT_QUEUE_DEPTH = 3; + if((*read_thread_count_ == -1) && (*process_thread_count_ == -1)) { int nproc; @@ -225,15 +227,22 @@ calculate_thread_counts(int *read_thread_count_, *read_thread_count_ = 2; *process_thread_count_ = std::max(2,(nproc - 2)); } - else + else if((*read_thread_count_ == 0) && (*process_thread_count_ != -1)) { - *read_thread_count_ = ::calculate_thread_count(*read_thread_count_); + *read_thread_count_ = 2; + *process_thread_count_ = ::_calculate_thread_count(*process_thread_count_); + } + else + { + *read_thread_count_ = ::_calculate_thread_count(*read_thread_count_); if(*process_thread_count_ != -1) - *process_thread_count_ = ::calculate_thread_count(*process_thread_count_); + *process_thread_count_ = ::_calculate_thread_count(*process_thread_count_); } if(*process_thread_queue_depth_ <= 0) - *process_thread_queue_depth_ = *process_thread_count_; + *process_thread_queue_depth_ = DEFAULT_QUEUE_DEPTH; + + *process_thread_queue_depth_ *= *process_thread_count_; } static @@ -456,8 +465,8 @@ pin_threads(const std::vector read_threads_, static void -wait(fuse_session *se_, - sem_t *finished_sem_) +_wait(fuse_session *se_, + sem_t *finished_sem_) { while(!fuse_session_exited(se_)) sem_wait(finished_sem_); @@ -484,14 +493,13 @@ fuse_session_loop_mt(struct fuse_session *se_, read_thread_count = raw_read_thread_count_; process_thread_count = raw_process_thread_count_; process_thread_queue_depth = raw_process_thread_queue_depth_; - ::calculate_thread_counts(&read_thread_count, - &process_thread_count, - &process_thread_queue_depth); + ::_calculate_thread_counts(&read_thread_count, + &process_thread_count, + &process_thread_queue_depth); if(process_thread_count > 0) process_tp = std::make_shared(process_thread_count, - (process_thread_count * - process_thread_queue_depth), + process_thread_queue_depth, "fuse.process"); read_tp = std::make_unique(read_thread_count, @@ -526,7 +534,7 @@ fuse_session_loop_mt(struct fuse_session *se_, process_thread_queue_depth, pin_threads_type_.c_str()); - ::wait(se_,&finished); + ::_wait(se_,&finished); sem_destroy(&finished); diff --git a/mkdocs/docs/config/func_readdir.md b/mkdocs/docs/config/func_readdir.md index 58a1cd43..d218fd7b 100644 --- a/mkdocs/docs/config/func_readdir.md +++ b/mkdocs/docs/config/func_readdir.md @@ -1,14 +1,20 @@ # func.readdir -examples: `func.readdir=seq`, `func.readdir=cor:4` +Defaults to `seq` + +Examples: `func.readdir=seq`, `func.readdir=cor:4` `readdir` has policies to control how it reads directory content. | Policy | Description | | ------ | ----------- | -| seq | "sequential" : Iterate sequentially over branches in the order defined in `branches`. This is the default and traditional behavior found prior to the readdir policy introduction. This will be increasingly slower as more branches are added to the pool. Especially if needing to wait for drives to spin up or network filesystems to respond. | -| cosr | "concurrent open, sequential read" : Concurrently open branch directories using a thread pool and process them in the order defined in `branches`. This keeps memory and CPU usage low while also reducing the time spent waiting on branches to respond. Number of threads defaults to the number of logical cores. Can be overwritten via the syntax `func.readdir=cosr:N` where `N` is the number of threads. | -| cor | "concurrent open and read" : Concurrently open branch directories and immediately start reading their contents using a thread pool. This will result in slightly higher memory and CPU usage but reduced latency. Particularly when using higher latency / slower speed network filesystem branches. Unlike `seq` and `cosr` the order of files could change due the async nature of the thread pool. This should not be a problem since the order of files listed in not guaranteed. Number of threads defaults to the number of logical cores. Can be overwritten via the syntax `func.readdir=cor:N` where `N` is the number of threads. | +| seq | "sequential" : Iterate sequentially over branches in the order defined in `branches`. This is the default and traditional behavior found prior to the readdir policy introduction. This will be increasingly slower as more branches are added to the pool. Especially if needing to wait for drives to spin up or network filesystems to respond. | +| cosr:N:M | "concurrent open, sequential read" : Concurrently open branch directories using a thread pool and process them in the order defined in `branches`. This keeps memory and CPU usage low while also reducing the time spent waiting on branches to respond. `N` is the number of threads. If negative it will be the core count divided by `abs(N)`. `M` is the queue depth. If either value is `0` it will be decided based on system configuration. | +| cosr:N | cosr:N:M with M = 0 | +| cosr | cosr:N:M with N = 0 and M = 0 | +| cor:N:M | "concurrent open and read" : Concurrently open branch directories and immediately start reading their contents using a thread pool. This will result in slightly higher memory and CPU usage but reduced latency. Particularly when using higher latency / slower speed network filesystem branches. Unlike `seq` and `cosr` the order of files could change due the async nature of the thread pool. This should not be a problem since the order of files listed in not guaranteed. `N` is the number of threads. If negative it will be the core count divided by `abs(N)`. `M` is the queue depth. If either value is `0` it will be decided based on system configuration. | +| cor:N | cosr:N:M with M = 0 | +| cor | cosr:N:M with N = 0 and M = 0 | Keep in mind that `readdir` mostly just provides a list of file names in a directory and possibly some basic metadata about said files. To diff --git a/mkdocs/docs/config/threads.md b/mkdocs/docs/config/threads.md index 2f728d49..ac00050a 100644 --- a/mkdocs/docs/config/threads.md +++ b/mkdocs/docs/config/threads.md @@ -6,6 +6,8 @@ parallel behaviors. ## read-thread-count +* Defaults to `0` + The number of threads used to read (and possibly process) messages from the kernel. @@ -14,9 +16,10 @@ from the kernel. * `read-thread-count=N` where `N>0`: Create a thread pool of `N` threads. * `read-thread-count=N` where `N<0`: Create a thread pool of `CPUCount / -N` threads. +* `read-thread-count=0` where `process-thread-count!=-1`: Creates `2` + read threads and number of process threads as defined below. * `read-thread-count=-1` where `process-thread-count=-1`: Creates `2` read threads and `max(2,CPUCount-2)` process threads. -* Defaults to `0`. When `process-thread-count=-1` (the default) this option sets the number of threads which read and then process requests from the @@ -26,11 +29,14 @@ When `process-thread-count` is set to anything else mergerfs will create two thread pools. A "read" thread pool which just reads from the kernel and hands off requests to the "process" thread pool. -Generally, only 1 or 2 "read" threads are necessary. +Generally, only 1 or 2 "read" threads are necessary when used with a +"process" thread pool. ## process-thread-count +* Defaults to `-1` + When enabled this sets the number of threads in the message processing pool. * `process-thread-count=-1`: Process thread pool is disabled. @@ -39,15 +45,21 @@ When enabled this sets the number of threads in the message processing pool. * `process-thread-count=N` where `N>0`: Create a thread pool of `N` threads. * `process-thread-count=N` where `N<-1`: Create a thread pool of `CPUCount / -N` threads. -* Defaults to `-1`. ## process-thread-queue-depth -* `process-thread-queue-depth=N` where `N>0`: Sets the number of outstanding - requests that a process thread can have to N. If requests come in - faster than can be processed and the max queue depth hit then - queuing the request will block in order to limit memory growth. -* `process-thread-queue-depth=0`: Sets the queue depth to the thread - pool count. -* Defaults to `0`. +* Defaults to `3` + +Sets the depth queue for the processing thread queue per +thread. Meaning if the read threads are getting requests faster than +can be processed they will be queued up upto the queue depth. Despite +the calculation being per thread the queue depth however is shared +across all in the pool. + +* `process-thread-queue-depth=N` where `N>0`: Sets the number of + outstanding requests that the process thread pool can have to `N * + process-thread-count`. If requests come in faster than can be + processed and the max queue depth hit then queuing the request will + block in order to limit memory growth. +* `process-thread-queue-depth<=0`: Sets the queue depth to 3. diff --git a/mkdocs/mkdocs.yml b/mkdocs/mkdocs.yml index e41c4c6a..a88f3e9a 100644 --- a/mkdocs/mkdocs.yml +++ b/mkdocs/mkdocs.yml @@ -11,6 +11,7 @@ plugins: theme: name: material logo: logo.png + favicon: logo.png features: - content.action.edit - content.action.view diff --git a/src/config.cpp b/src/config.cpp index 8fe82788..54421054 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -131,7 +131,7 @@ Config::Config() symlinkify_timeout(3600), fuse_read_thread_count(0), fuse_process_thread_count(-1), - fuse_process_thread_queue_depth(0), + fuse_process_thread_queue_depth(3), fuse_pin_threads("false"), version(MERGERFS_VERSION), writeback_cache(false), diff --git a/src/fuse_readdir_base.hpp b/src/fuse_readdir_base.hpp index dbdffba5..5c60df76 100644 --- a/src/fuse_readdir_base.hpp +++ b/src/fuse_readdir_base.hpp @@ -28,7 +28,7 @@ namespace FUSE virtual ~ReadDirBase() {}; public: - virtual int operator()(fuse_file_info_t const *ffi, + virtual int operator()(const fuse_file_info_t *ffi, fuse_dirents_t *buf) = 0; }; } diff --git a/src/fuse_readdir_cor.cpp b/src/fuse_readdir_cor.cpp index 5aad10c3..5912b002 100644 --- a/src/fuse_readdir_cor.cpp +++ b/src/fuse_readdir_cor.cpp @@ -139,6 +139,7 @@ namespace l } static + inline int concurrent_readdir(ThreadPool &tp_, const Branches::CPtr &branches_, @@ -168,7 +169,7 @@ namespace l mutex); }; - auto rv = tp_.enqueue_task(func); + auto rv = tp_.enqueue_task(std::move(func)); futures.emplace_back(std::move(rv)); } @@ -182,7 +183,7 @@ namespace l } int -FUSE::ReadDirCOR::operator()(fuse_file_info_t const *ffi_, +FUSE::ReadDirCOR::operator()(const fuse_file_info_t *ffi_, fuse_dirents_t *buf_) { Config::Read cfg; diff --git a/src/fuse_readdir_cor.hpp b/src/fuse_readdir_cor.hpp index 0f075cad..39ec3c6a 100644 --- a/src/fuse_readdir_cor.hpp +++ b/src/fuse_readdir_cor.hpp @@ -33,10 +33,10 @@ namespace FUSE unsigned max_queue_depth); ~ReadDirCOR(); - int operator()(fuse_file_info_t const *ffi, + int operator()(const fuse_file_info_t *ffi, fuse_dirents_t *buf); private: - ThreadPool _tp; + mutable ThreadPool _tp; }; } diff --git a/src/fuse_readdir_cosr.cpp b/src/fuse_readdir_cosr.cpp index 85e02422..297c16be 100644 --- a/src/fuse_readdir_cosr.cpp +++ b/src/fuse_readdir_cosr.cpp @@ -107,7 +107,7 @@ namespace l futures.reserve(branches_->size()); - for(auto const &branch : *branches_) + for(const auto &branch : *branches_) { auto func = [&branch,&rel_dirpath_,uid_,gid_]() @@ -124,7 +124,7 @@ namespace l return DirRV{&branch.path,dir,errno}; }; - auto rv = tp_.enqueue_task(func); + auto rv = tp_.enqueue_task(std::move(func)); futures.emplace_back(std::move(rv)); } diff --git a/src/fuse_readdir_factory.cpp b/src/fuse_readdir_factory.cpp index 38609930..3942f53d 100644 --- a/src/fuse_readdir_factory.cpp +++ b/src/fuse_readdir_factory.cpp @@ -28,50 +28,53 @@ #include #include -#define DEFAULT_MAX_QUEUE_DEPTH 3 -namespace l +static +void +_read_cfg(std::string const str_, + std::string &type_, + unsigned &concurrency_, + unsigned &max_queue_depth_) { - static - void - read_cfg(std::string const str_, - std::string &type_, - unsigned &concurrency_, - unsigned &max_queue_depth_) - { - char type[16]; - int concurrency; - int max_queue_depth; - - concurrency = 0; - max_queue_depth = 0; - std::sscanf(str_.c_str(), - "%15[a-z]:%d:%d", - type, - &concurrency, - &max_queue_depth); - - if(concurrency == 0) + char type[16]; + int concurrency; + int max_queue_depth; + + concurrency = 0; + max_queue_depth = 0; + std::sscanf(str_.c_str(), + "%15[a-z]:%d:%d", + type, + &concurrency, + &max_queue_depth); + + if(concurrency == 0) + { concurrency = std::thread::hardware_concurrency(); - else if(concurrency < 0) + if(concurrency > 8) + concurrency = 8; + } + else if(concurrency < 0) + { concurrency = (std::thread::hardware_concurrency() / std::abs(concurrency)); + } - if(concurrency == 0) - concurrency = 1; + if(concurrency <= 0) + concurrency = 1; - if(max_queue_depth == 0) - max_queue_depth = DEFAULT_MAX_QUEUE_DEPTH; + if(max_queue_depth <= 0) + max_queue_depth = 2; - max_queue_depth *= concurrency; + max_queue_depth *= concurrency; - type_ = type; - concurrency_ = concurrency; - max_queue_depth_ = max_queue_depth; - } + type_ = type; + concurrency_ = concurrency; + max_queue_depth_ = max_queue_depth; } + bool -FUSE::ReadDirFactory::valid(std::string const str_) +FUSE::ReadDirFactory::valid(const std::string str_) { unsigned concurrency; unsigned max_queue_depth; @@ -81,7 +84,7 @@ FUSE::ReadDirFactory::valid(std::string const str_) "seq", "cosr", "cor" }; - l::read_cfg(str_,type,concurrency,max_queue_depth); + ::_read_cfg(str_,type,concurrency,max_queue_depth); if(types.find(type) == types.end()) return false; @@ -101,7 +104,7 @@ FUSE::ReadDirFactory::make(std::string const str_) if(!valid(str_)) return {}; - l::read_cfg(str_,type,concurrency,max_queue_depth); + ::_read_cfg(str_,type,concurrency,max_queue_depth); if(type == "seq") return std::make_shared();