diff --git a/libfuse/lib/fuse_loop.cpp b/libfuse/lib/fuse_loop.cpp index 1867dab9..327f945a 100644 --- a/libfuse/lib/fuse_loop.cpp +++ b/libfuse/lib/fuse_loop.cpp @@ -2,12 +2,12 @@ #define _GNU_SOURCE #endif -#include "bounded_thread_pool.hpp" #include "cpu.hpp" #include "fmt/core.h" #include "make_unique.hpp" #include "scope_guard.hpp" #include "syslog.h" +#include "thread_pool.hpp" #include "fuse_i.h" #include "fuse_kernel.h" @@ -70,11 +70,11 @@ struct AsyncWorker { fuse_session *_se; sem_t *_finished; - std::shared_ptr _process_tp; + std::shared_ptr _process_tp; AsyncWorker(fuse_session *se_, sem_t *finished_, - std::shared_ptr process_tp_) + std::shared_ptr process_tp_) : _se(se_), _finished(finished_), _process_tp(process_tp_) @@ -88,7 +88,7 @@ struct AsyncWorker DEFER{ fuse_session_exit(_se); }; DEFER{ sem_post(_finished); }; - moodycamel::ProducerToken ptok(_process_tp->queue()); + moodycamel::ProducerToken ptok(_process_tp->ptoken()); while(!fuse_session_exited(_se)) { int rv; @@ -474,8 +474,8 @@ fuse_session_loop_mt(struct fuse_session *se_, int process_thread_queue_depth; std::vector read_threads; std::vector process_threads; - std::unique_ptr read_tp; - std::shared_ptr process_tp; + std::unique_ptr read_tp; + std::shared_ptr process_tp; sem_init(&finished,0,0); @@ -487,11 +487,11 @@ fuse_session_loop_mt(struct fuse_session *se_, &process_thread_queue_depth); if(process_thread_count > 0) - process_tp = std::make_shared(process_thread_count, - process_thread_queue_depth, - "fuse.process"); + process_tp = std::make_shared(process_thread_count, + process_thread_queue_depth, + "fuse.process"); - read_tp = std::make_unique(read_thread_count,1,"fuse.read"); + read_tp = std::make_unique(read_thread_count,1,"fuse.read"); if(process_tp) { for(auto i = 0; i < read_thread_count; i++) diff --git a/libfuse/lib/thread_pool.hpp b/libfuse/lib/thread_pool.hpp new file mode 100644 index 00000000..33f9af6f --- /dev/null +++ b/libfuse/lib/thread_pool.hpp @@ -0,0 +1,310 @@ +#pragma once + +#include "moodycamel/blockingconcurrentqueue.h" +#include "syslog.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits +{ + static const int MAX_SEMA_SPINS = 1; +}; + + +class ThreadPool +{ +private: + using Func = std::function; + using Queue = moodycamel::BlockingConcurrentQueue; + +public: + explicit + ThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(), + std::size_t const queue_depth_ = 1, + std::string const name_ = {}) + : _queue(queue_depth_,thread_count_,thread_count_), + _name(get_thread_name(name_)) + { + syslog_debug("threadpool: spawning %zu threads of queue depth %zu named '%s'", + thread_count_, + queue_depth_, + _name.c_str()); + + sigset_t oldset; + sigset_t newset; + + sigfillset(&newset); + pthread_sigmask(SIG_BLOCK,&newset,&oldset); + + _threads.reserve(thread_count_); + for(std::size_t i = 0; i < thread_count_; ++i) + { + int rv; + pthread_t t; + + rv = pthread_create(&t,NULL,ThreadPool::start_routine,this); + if(rv != 0) + { + syslog_warning("threadpool: error spawning thread - %d (%s)", + rv, + strerror(rv)); + continue; + } + + if(!_name.empty()) + pthread_setname_np(t,_name.c_str()); + + _threads.push_back(t); + } + + pthread_sigmask(SIG_SETMASK,&oldset,NULL); + + if(_threads.empty()) + throw std::runtime_error("threadpool: failed to spawn any threads"); + } + + ~ThreadPool() + { + syslog_debug("threadpool: destroying %zu threads named '%s'", + _threads.size(), + _name.c_str()); + + for(auto t : _threads) + pthread_cancel(t); + + Func f; + while(_queue.try_dequeue(f)) + continue; + + for(auto t : _threads) + pthread_join(t,NULL); + } + +private: + static + std::string + get_thread_name(std::string const name_) + { + if(!name_.empty()) + return name_; + + char name[16]; + pthread_getname_np(pthread_self(),name,sizeof(name)); + + return name; + } + + static + void* + start_routine(void *arg_) + { + ThreadPool *btp = static_cast(arg_); + ThreadPool::Func func; + ThreadPool::Queue &q = btp->_queue; + moodycamel::ConsumerToken ctok(btp->_queue); + + while(true) + { + q.wait_dequeue(ctok,func); + + func(); + } + + return NULL; + } + +public: + int + add_thread(std::string const name_ = {}) + { + int rv; + pthread_t t; + sigset_t oldset; + sigset_t newset; + std::string name; + + name = (name_.empty() ? _name : name_); + + sigfillset(&newset); + pthread_sigmask(SIG_BLOCK,&newset,&oldset); + rv = pthread_create(&t,NULL,ThreadPool::start_routine,this); + pthread_sigmask(SIG_SETMASK,&oldset,NULL); + + if(rv != 0) + { + syslog_warning("threadpool: error spawning thread - %d (%s)", + rv, + strerror(rv)); + return -rv; + } + + if(!name.empty()) + pthread_setname_np(t,name.c_str()); + + { + std::lock_guard lg(_threads_mutex); + _threads.push_back(t); + } + + syslog_debug("threadpool: 1 thread added to pool '%s' named '%s'", + _name.c_str(), + name.c_str()); + + return 0; + } + + int + remove_thread(void) + { + { + std::lock_guard lg(_threads_mutex); + if(_threads.size() <= 1) + return -EINVAL; + } + + std::promise promise; + auto func = [&]() + { + pthread_t t; + + t = pthread_self(); + promise.set_value(t); + + { + std::lock_guard lg(_threads_mutex); + + for(auto i = _threads.begin(); i != _threads.end(); ++i) + { + if(*i != t) + continue; + + _threads.erase(i); + break; + } + } + + char name[16]; + pthread_getname_np(t,name,sizeof(name)); + syslog_debug("threadpool: 1 thread removed from pool '%s' named '%s'", + _name.c_str(), + name); + + pthread_exit(NULL); + }; + + enqueue_work(func); + pthread_join(promise.get_future().get(),NULL); + + return 0; + } + + int + set_threads(std::size_t const count_) + { + int diff; + { + std::lock_guard lg(_threads_mutex); + + diff = ((int)count_ - (int)_threads.size()); + } + + syslog_debug("diff: %d",diff); + for(auto i = diff; i > 0; --i) + add_thread(); + for(auto i = diff; i < 0; ++i) + remove_thread(); + + return diff; + } + +public: + template + void + enqueue_work(moodycamel::ProducerToken &ptok_, + FuncType &&f_) + { + timespec ts = {0,10}; + while(true) + { + if(_queue.try_enqueue(ptok_,f_)) + return; + ::nanosleep(&ts,NULL); + ts.tv_nsec += 10; + } + } + + template + void + enqueue_work(FuncType &&f_) + { + timespec ts = {0,10}; + while(true) + { + if(_queue.try_enqueue(f_)) + return; + ::nanosleep(&ts,NULL); + ts.tv_nsec += 10; + } + } + + template + [[nodiscard]] + std::future::type> + enqueue_task(FuncType&& f_) + { + using TaskReturnType = typename std::result_of::type; + using Promise = std::promise; + + auto promise = std::make_shared(); + auto future = promise->get_future(); + auto work = [=]() + { + auto rv = f_(); + promise->set_value(rv); + }; + + timespec ts = {0,10}; + while(true) + { + if(_queue.try_enqueue(work)) + break; + ::nanosleep(&ts,NULL); + ts.tv_nsec += 10; + } + + return future; + } + +public: + std::vector + threads() const + { + std::lock_guard lg(_threads_mutex); + + return _threads; + } + + moodycamel::ProducerToken + ptoken() + { + return moodycamel::ProducerToken(_queue); + } + +private: + Queue _queue; + +private: + std::string const _name; + std::vector _threads; + mutable std::mutex _threads_mutex; +}; diff --git a/libfuse/lib/unbounded_queue.hpp b/libfuse/lib/unbounded_queue.hpp deleted file mode 100644 index 1e527b78..00000000 --- a/libfuse/lib/unbounded_queue.hpp +++ /dev/null @@ -1,161 +0,0 @@ -#pragma once - -#include -#include -#include -#include - - -template -class UnboundedQueue -{ -public: - explicit - UnboundedQueue(bool block_ = true) - : _block(block_) - { - } - - void - push(const T& item_) - { - { - std::lock_guard guard(_queue_lock); - _queue.push(item_); - } - _condition.notify_one(); - } - - void - push(T&& item_) - { - { - std::lock_guard guard(_queue_lock); - _queue.push(std::move(item_)); - } - - _condition.notify_one(); - } - - template - void - emplace(Args&&... args_) - { - { - std::lock_guard guard(_queue_lock); - _queue.emplace(std::forward(args_)...); - } - - _condition.notify_one(); - } - - bool - try_push(const T& item_) - { - { - std::unique_lock lock(_queue_lock, std::try_to_lock); - if(!lock) - return false; - _queue.push(item_); - } - - _condition.notify_one(); - - return true; - } - - bool - try_push(T&& item_) - { - { - std::unique_lock lock(_queue_lock, std::try_to_lock); - if(!lock) - return false; - _queue.push(std::move(item_)); - } - - _condition.notify_one(); - - return true; - } - - //TODO: push multiple T at once - - bool - pop(T& item_) - { - std::unique_lock guard(_queue_lock); - - _condition.wait(guard, [&]() { return !_queue.empty() || !_block; }); - if(_queue.empty()) - return false; - - item_ = std::move(_queue.front()); - _queue.pop(); - - return true; - } - - bool - try_pop(T& item_) - { - std::unique_lock lock(_queue_lock, std::try_to_lock); - if(!lock || _queue.empty()) - return false; - - item_ = std::move(_queue.front()); - _queue.pop(); - - return true; - } - - std::size_t - size() const - { - std::lock_guard guard(_queue_lock); - - return _queue.size(); - } - - bool - empty() const - { - std::lock_guard guard(_queue_lock); - - return _queue.empty(); - } - - void - block() - { - std::lock_guard guard(_queue_lock); - _block = true; - } - - void - unblock() - { - { - std::lock_guard guard(_queue_lock); - _block = false; - } - - _condition.notify_all(); - } - - bool - blocking() const - { - std::lock_guard guard(_queue_lock); - - return _block; - } - -private: - mutable std::mutex _queue_lock; - -private: - bool _block; - std::queue _queue; - std::condition_variable _condition; -}; diff --git a/src/fuse_readdir_cor.cpp b/src/fuse_readdir_cor.cpp index 6ef28c94..c6c1526b 100644 --- a/src/fuse_readdir_cor.cpp +++ b/src/fuse_readdir_cor.cpp @@ -32,7 +32,7 @@ FUSE::ReadDirCOR::ReadDirCOR(unsigned concurrency_) - : _tp(concurrency_,"readdir.cor") + : _tp(concurrency_,concurrency_,"readdir.cor") { } @@ -117,7 +117,7 @@ namespace l static std::vector - concurrent_readdir(UnboundedThreadPool &tp_, + concurrent_readdir(ThreadPool &tp_, const Branches::CPtr &branches_, const char *dirname_, fuse_dirents_t *buf_, @@ -171,7 +171,7 @@ namespace l static int - readdir(UnboundedThreadPool &tp_, + readdir(ThreadPool &tp_, const Branches::CPtr &branches_, const char *dirname_, fuse_dirents_t *buf_, diff --git a/src/fuse_readdir_cor.hpp b/src/fuse_readdir_cor.hpp index 8d7f586d..6da212fc 100644 --- a/src/fuse_readdir_cor.hpp +++ b/src/fuse_readdir_cor.hpp @@ -19,7 +19,9 @@ #pragma once #include "fuse_readdir_base.hpp" -#include "unbounded_thread_pool.hpp" + +#include "thread_pool.hpp" + // concurrent open & read namespace FUSE @@ -34,6 +36,6 @@ namespace FUSE fuse_dirents_t *buf); private: - UnboundedThreadPool _tp; + ThreadPool _tp; }; } diff --git a/src/fuse_readdir_cosr.cpp b/src/fuse_readdir_cosr.cpp index 4785f781..92f75ac8 100644 --- a/src/fuse_readdir_cosr.cpp +++ b/src/fuse_readdir_cosr.cpp @@ -34,7 +34,7 @@ FUSE::ReadDirCOSR::ReadDirCOSR(unsigned concurrency_) - : _tp(concurrency_,"readdir.cosr") + : _tp(concurrency_,concurrency_,"readdir.cosr") { } @@ -63,7 +63,7 @@ namespace l static inline std::vector> - opendir(UnboundedThreadPool &tp_, + opendir(ThreadPool &tp_, const Branches::CPtr &branches_, char const *dirname_, uid_t const uid_, @@ -148,7 +148,7 @@ namespace l static inline int - readdir(UnboundedThreadPool &tp_, + readdir(ThreadPool &tp_, const Branches::CPtr &branches_, const char *dirname_, fuse_dirents_t *buf_, diff --git a/src/fuse_readdir_cosr.hpp b/src/fuse_readdir_cosr.hpp index 6783f125..8149d36d 100644 --- a/src/fuse_readdir_cosr.hpp +++ b/src/fuse_readdir_cosr.hpp @@ -19,7 +19,8 @@ #pragma once #include "fuse_readdir_base.hpp" -#include "unbounded_thread_pool.hpp" + +#include "thread_pool.hpp" // concurrent open, sequential read namespace FUSE @@ -34,6 +35,6 @@ namespace FUSE fuse_dirents_t *buf); private: - UnboundedThreadPool _tp; + ThreadPool _tp; }; } diff --git a/libfuse/lib/bounded_thread_pool.hpp b/src/thread_pool.hpp similarity index 51% rename from libfuse/lib/bounded_thread_pool.hpp rename to src/thread_pool.hpp index 167be321..c79d6698 100644 --- a/libfuse/lib/bounded_thread_pool.hpp +++ b/src/thread_pool.hpp @@ -1,39 +1,39 @@ #pragma once #include "moodycamel/blockingconcurrentqueue.h" -#include "syslog.h" +#include "syslog.hpp" #include #include #include #include #include +#include #include #include #include #include -struct BoundedThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits +struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits { static const int MAX_SEMA_SPINS = 1; }; -class BoundedThreadPool +class ThreadPool { private: using Func = std::function; - using Queue = moodycamel::BlockingConcurrentQueue; + using Queue = moodycamel::BlockingConcurrentQueue; public: explicit - BoundedThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(), - std::size_t const queue_depth_ = 1, - std::string const name_ = {}) + ThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(), + std::size_t const queue_depth_ = 1, + std::string const name_ = {}) : _queue(queue_depth_,thread_count_,thread_count_), - _done(false), - _name(name_) + _name(get_thread_name(name_)) { syslog_debug("threadpool: spawning %zu threads of queue depth %zu named '%s'", thread_count_, @@ -52,7 +52,7 @@ public: int rv; pthread_t t; - rv = pthread_create(&t,NULL,BoundedThreadPool::start_routine,this); + rv = pthread_create(&t,NULL,ThreadPool::start_routine,this); if(rv != 0) { syslog_warning("threadpool: error spawning thread - %d (%s)", @@ -73,14 +73,12 @@ public: throw std::runtime_error("threadpool: failed to spawn any threads"); } - ~BoundedThreadPool() + ~ThreadPool() { syslog_debug("threadpool: destroying %zu threads named '%s'", _threads.size(), _name.c_str()); - _done.store(true,std::memory_order_relaxed); - for(auto t : _threads) pthread_cancel(t); @@ -93,17 +91,29 @@ public: } private: + static + std::string + get_thread_name(std::string const name_) + { + if(!name_.empty()) + return name_; + + char name[16]; + pthread_getname_np(pthread_self(),name,sizeof(name)); + + return name; + } + static void* start_routine(void *arg_) { - BoundedThreadPool *btp = static_cast(arg_); - BoundedThreadPool::Func func; - std::atomic &done = btp->_done; - BoundedThreadPool::Queue &q = btp->_queue; + ThreadPool *btp = static_cast(arg_); + ThreadPool::Func func; + ThreadPool::Queue &q = btp->_queue; moodycamel::ConsumerToken ctok(btp->_queue); - while(!done.load(std::memory_order_relaxed)) + while(true) { q.wait_dequeue(ctok,func); @@ -113,6 +123,110 @@ private: return NULL; } +public: + int + add_thread(std::string const name_ = {}) + { + int rv; + pthread_t t; + sigset_t oldset; + sigset_t newset; + std::string name; + + name = (name_.empty() ? _name : name_); + + sigfillset(&newset); + pthread_sigmask(SIG_BLOCK,&newset,&oldset); + rv = pthread_create(&t,NULL,ThreadPool::start_routine,this); + pthread_sigmask(SIG_SETMASK,&oldset,NULL); + + if(rv != 0) + { + syslog_warning("threadpool: error spawning thread - %d (%s)", + rv, + strerror(rv)); + return -rv; + } + + if(!name.empty()) + pthread_setname_np(t,name.c_str()); + + { + std::lock_guard lg(_threads_mutex); + _threads.push_back(t); + } + + syslog_debug("threadpool: 1 thread added to pool '%s' named '%s'", + _name.c_str(), + name.c_str()); + + return 0; + } + + int + remove_thread(void) + { + { + std::lock_guard lg(_threads_mutex); + if(_threads.size() <= 1) + return -EINVAL; + } + + std::promise promise; + auto func = [&]() + { + pthread_t t; + + t = pthread_self(); + promise.set_value(t); + + { + std::lock_guard lg(_threads_mutex); + + for(auto i = _threads.begin(); i != _threads.end(); ++i) + { + if(*i != t) + continue; + + _threads.erase(i); + break; + } + } + + char name[16]; + pthread_getname_np(t,name,sizeof(name)); + syslog_debug("threadpool: 1 thread removed from pool '%s' named '%s'", + _name.c_str(), + name); + + pthread_exit(NULL); + }; + + enqueue_work(func); + pthread_join(promise.get_future().get(),NULL); + + return 0; + } + + int + set_threads(std::size_t const count_) + { + int diff; + { + std::lock_guard lg(_threads_mutex); + + diff = ((int)count_ - (int)_threads.size()); + } + + syslog_debug("diff: %d",diff); + for(auto i = diff; i > 0; --i) + add_thread(); + for(auto i = diff; i < 0; ++i) + remove_thread(); + + return diff; + } + public: template void @@ -175,21 +289,16 @@ public: std::vector threads() const { - return _threads; - } + std::lock_guard lg(_threads_mutex); -public: - Queue& - queue() - { - return _queue; + return _threads; } private: Queue _queue; private: - std::atomic _done; std::string const _name; std::vector _threads; + mutable std::mutex _threads_mutex; }; diff --git a/src/unbounded_thread_pool.hpp b/src/unbounded_thread_pool.hpp deleted file mode 100644 index 84be35a2..00000000 --- a/src/unbounded_thread_pool.hpp +++ /dev/null @@ -1,172 +0,0 @@ -#pragma once - -#include "moodycamel/blockingconcurrentqueue.h" -#include "syslog.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -struct UnboundedThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits -{ - static const int MAX_SEMA_SPINS = 1; -}; - - -class UnboundedThreadPool -{ -private: - using Func = std::function; - using Queue = moodycamel::BlockingConcurrentQueue; - -public: - explicit - UnboundedThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(), - std::string const name_ = {}) - : _queue(thread_count_,thread_count_,thread_count_), - _done(false), - _name(name_) - { - syslog_debug("threadpool: spawning %zu threads named '%s'", - thread_count_, - _name.c_str()); - - sigset_t oldset; - sigset_t newset; - - sigfillset(&newset); - pthread_sigmask(SIG_BLOCK,&newset,&oldset); - - _threads.reserve(thread_count_); - for(std::size_t i = 0; i < thread_count_; ++i) - { - int rv; - pthread_t t; - - rv = pthread_create(&t,NULL,UnboundedThreadPool::start_routine,this); - if(rv != 0) - { - syslog_warning("threadpool: error spawning thread - %d (%s)", - rv, - strerror(rv)); - continue; - } - - if(!_name.empty()) - pthread_setname_np(t,_name.c_str()); - - _threads.push_back(t); - } - - pthread_sigmask(SIG_SETMASK,&oldset,NULL); - - if(_threads.empty()) - throw std::runtime_error("threadpool: failed to spawn any threads"); - } - - ~UnboundedThreadPool() - { - syslog_debug("threadpool: destroying %zu threads named '%s'", - _threads.size(), - _name.c_str()); - - _done.store(true,std::memory_order_relaxed); - - for(auto t : _threads) - pthread_cancel(t); - - Func f; - while(_queue.try_dequeue(f)) - continue; - - for(auto t : _threads) - pthread_join(t,NULL); - } - -private: - static - void* - start_routine(void *arg_) - { - UnboundedThreadPool *btp = static_cast(arg_); - UnboundedThreadPool::Func func; - std::atomic &done = btp->_done; - UnboundedThreadPool::Queue &q = btp->_queue; - moodycamel::ConsumerToken ctok(btp->_queue); - - while(!done.load(std::memory_order_relaxed)) - { - q.wait_dequeue(ctok,func); - - func(); - } - - return NULL; - } - -public: - template - void - enqueue_work(moodycamel::ProducerToken &ptok_, - FuncType &&f_) - { - _queue.enqueue(ptok_,f_); - } - - template - void - enqueue_work(FuncType &&f_) - { - _queue.enqueue(f_); - } - - template - [[nodiscard]] - std::future::type> - enqueue_task(FuncType&& f_) - { - using TaskReturnType = typename std::result_of::type; - using Promise = std::promise; - - auto promise = std::make_shared(); - auto future = promise->get_future(); - auto work = [=]() - { - auto rv = f_(); - promise->set_value(rv); - }; - - _queue.enqueue(work); - - return future; - } - -public: - std::vector - threads() const - { - return _threads; - } - -public: - Queue& - queue() - { - return _queue; - } - -private: - Queue _queue; - -private: - std::atomic _done; - std::string const _name; - std::vector _threads; -};