Browse Source

Rework thread pool, add ability to add/remove threads at runtime

pull/1240/head
Antonio SJ Musumeci 9 months ago
parent
commit
0a94bd7cfc
  1. 20
      libfuse/lib/fuse_loop.cpp
  2. 310
      libfuse/lib/thread_pool.hpp
  3. 161
      libfuse/lib/unbounded_queue.hpp
  4. 6
      src/fuse_readdir_cor.cpp
  5. 6
      src/fuse_readdir_cor.hpp
  6. 6
      src/fuse_readdir_cosr.cpp
  7. 5
      src/fuse_readdir_cosr.hpp
  8. 161
      src/thread_pool.hpp
  9. 172
      src/unbounded_thread_pool.hpp

20
libfuse/lib/fuse_loop.cpp

@ -2,12 +2,12 @@
#define _GNU_SOURCE
#endif
#include "bounded_thread_pool.hpp"
#include "cpu.hpp"
#include "fmt/core.h"
#include "make_unique.hpp"
#include "scope_guard.hpp"
#include "syslog.h"
#include "thread_pool.hpp"
#include "fuse_i.h"
#include "fuse_kernel.h"
@ -70,11 +70,11 @@ struct AsyncWorker
{
fuse_session *_se;
sem_t *_finished;
std::shared_ptr<BoundedThreadPool> _process_tp;
std::shared_ptr<ThreadPool> _process_tp;
AsyncWorker(fuse_session *se_,
sem_t *finished_,
std::shared_ptr<BoundedThreadPool> process_tp_)
std::shared_ptr<ThreadPool> process_tp_)
: _se(se_),
_finished(finished_),
_process_tp(process_tp_)
@ -88,7 +88,7 @@ struct AsyncWorker
DEFER{ fuse_session_exit(_se); };
DEFER{ sem_post(_finished); };
moodycamel::ProducerToken ptok(_process_tp->queue());
moodycamel::ProducerToken ptok(_process_tp->ptoken());
while(!fuse_session_exited(_se))
{
int rv;
@ -474,8 +474,8 @@ fuse_session_loop_mt(struct fuse_session *se_,
int process_thread_queue_depth;
std::vector<pthread_t> read_threads;
std::vector<pthread_t> process_threads;
std::unique_ptr<BoundedThreadPool> read_tp;
std::shared_ptr<BoundedThreadPool> process_tp;
std::unique_ptr<ThreadPool> read_tp;
std::shared_ptr<ThreadPool> process_tp;
sem_init(&finished,0,0);
@ -487,11 +487,11 @@ fuse_session_loop_mt(struct fuse_session *se_,
&process_thread_queue_depth);
if(process_thread_count > 0)
process_tp = std::make_shared<BoundedThreadPool>(process_thread_count,
process_thread_queue_depth,
"fuse.process");
process_tp = std::make_shared<ThreadPool>(process_thread_count,
process_thread_queue_depth,
"fuse.process");
read_tp = std::make_unique<BoundedThreadPool>(read_thread_count,1,"fuse.read");
read_tp = std::make_unique<ThreadPool>(read_thread_count,1,"fuse.read");
if(process_tp)
{
for(auto i = 0; i < read_thread_count; i++)

310
libfuse/lib/thread_pool.hpp

@ -0,0 +1,310 @@
#pragma once
#include "moodycamel/blockingconcurrentqueue.h"
#include "syslog.h"
#include <atomic>
#include <csignal>
#include <cstring>
#include <future>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>
struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits
{
static const int MAX_SEMA_SPINS = 1;
};
class ThreadPool
{
private:
using Func = std::function<void(void)>;
using Queue = moodycamel::BlockingConcurrentQueue<Func,ThreadPoolTraits>;
public:
explicit
ThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(),
std::size_t const queue_depth_ = 1,
std::string const name_ = {})
: _queue(queue_depth_,thread_count_,thread_count_),
_name(get_thread_name(name_))
{
syslog_debug("threadpool: spawning %zu threads of queue depth %zu named '%s'",
thread_count_,
queue_depth_,
_name.c_str());
sigset_t oldset;
sigset_t newset;
sigfillset(&newset);
pthread_sigmask(SIG_BLOCK,&newset,&oldset);
_threads.reserve(thread_count_);
for(std::size_t i = 0; i < thread_count_; ++i)
{
int rv;
pthread_t t;
rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
if(rv != 0)
{
syslog_warning("threadpool: error spawning thread - %d (%s)",
rv,
strerror(rv));
continue;
}
if(!_name.empty())
pthread_setname_np(t,_name.c_str());
_threads.push_back(t);
}
pthread_sigmask(SIG_SETMASK,&oldset,NULL);
if(_threads.empty())
throw std::runtime_error("threadpool: failed to spawn any threads");
}
~ThreadPool()
{
syslog_debug("threadpool: destroying %zu threads named '%s'",
_threads.size(),
_name.c_str());
for(auto t : _threads)
pthread_cancel(t);
Func f;
while(_queue.try_dequeue(f))
continue;
for(auto t : _threads)
pthread_join(t,NULL);
}
private:
static
std::string
get_thread_name(std::string const name_)
{
if(!name_.empty())
return name_;
char name[16];
pthread_getname_np(pthread_self(),name,sizeof(name));
return name;
}
static
void*
start_routine(void *arg_)
{
ThreadPool *btp = static_cast<ThreadPool*>(arg_);
ThreadPool::Func func;
ThreadPool::Queue &q = btp->_queue;
moodycamel::ConsumerToken ctok(btp->_queue);
while(true)
{
q.wait_dequeue(ctok,func);
func();
}
return NULL;
}
public:
int
add_thread(std::string const name_ = {})
{
int rv;
pthread_t t;
sigset_t oldset;
sigset_t newset;
std::string name;
name = (name_.empty() ? _name : name_);
sigfillset(&newset);
pthread_sigmask(SIG_BLOCK,&newset,&oldset);
rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
pthread_sigmask(SIG_SETMASK,&oldset,NULL);
if(rv != 0)
{
syslog_warning("threadpool: error spawning thread - %d (%s)",
rv,
strerror(rv));
return -rv;
}
if(!name.empty())
pthread_setname_np(t,name.c_str());
{
std::lock_guard<std::mutex> lg(_threads_mutex);
_threads.push_back(t);
}
syslog_debug("threadpool: 1 thread added to pool '%s' named '%s'",
_name.c_str(),
name.c_str());
return 0;
}
int
remove_thread(void)
{
{
std::lock_guard<std::mutex> lg(_threads_mutex);
if(_threads.size() <= 1)
return -EINVAL;
}
std::promise<pthread_t> promise;
auto func = [&]()
{
pthread_t t;
t = pthread_self();
promise.set_value(t);
{
std::lock_guard<std::mutex> lg(_threads_mutex);
for(auto i = _threads.begin(); i != _threads.end(); ++i)
{
if(*i != t)
continue;
_threads.erase(i);
break;
}
}
char name[16];
pthread_getname_np(t,name,sizeof(name));
syslog_debug("threadpool: 1 thread removed from pool '%s' named '%s'",
_name.c_str(),
name);
pthread_exit(NULL);
};
enqueue_work(func);
pthread_join(promise.get_future().get(),NULL);
return 0;
}
int
set_threads(std::size_t const count_)
{
int diff;
{
std::lock_guard<std::mutex> lg(_threads_mutex);
diff = ((int)count_ - (int)_threads.size());
}
syslog_debug("diff: %d",diff);
for(auto i = diff; i > 0; --i)
add_thread();
for(auto i = diff; i < 0; ++i)
remove_thread();
return diff;
}
public:
template<typename FuncType>
void
enqueue_work(moodycamel::ProducerToken &ptok_,
FuncType &&f_)
{
timespec ts = {0,10};
while(true)
{
if(_queue.try_enqueue(ptok_,f_))
return;
::nanosleep(&ts,NULL);
ts.tv_nsec += 10;
}
}
template<typename FuncType>
void
enqueue_work(FuncType &&f_)
{
timespec ts = {0,10};
while(true)
{
if(_queue.try_enqueue(f_))
return;
::nanosleep(&ts,NULL);
ts.tv_nsec += 10;
}
}
template<typename FuncType>
[[nodiscard]]
std::future<typename std::result_of<FuncType()>::type>
enqueue_task(FuncType&& f_)
{
using TaskReturnType = typename std::result_of<FuncType()>::type;
using Promise = std::promise<TaskReturnType>;
auto promise = std::make_shared<Promise>();
auto future = promise->get_future();
auto work = [=]()
{
auto rv = f_();
promise->set_value(rv);
};
timespec ts = {0,10};
while(true)
{
if(_queue.try_enqueue(work))
break;
::nanosleep(&ts,NULL);
ts.tv_nsec += 10;
}
return future;
}
public:
std::vector<pthread_t>
threads() const
{
std::lock_guard<std::mutex> lg(_threads_mutex);
return _threads;
}
moodycamel::ProducerToken
ptoken()
{
return moodycamel::ProducerToken(_queue);
}
private:
Queue _queue;
private:
std::string const _name;
std::vector<pthread_t> _threads;
mutable std::mutex _threads_mutex;
};

161
libfuse/lib/unbounded_queue.hpp

@ -1,161 +0,0 @@
#pragma once
#include <condition_variable>
#include <mutex>
#include <queue>
#include <utility>
template<typename T>
class UnboundedQueue
{
public:
explicit
UnboundedQueue(bool block_ = true)
: _block(block_)
{
}
void
push(const T& item_)
{
{
std::lock_guard<std::mutex> guard(_queue_lock);
_queue.push(item_);
}
_condition.notify_one();
}
void
push(T&& item_)
{
{
std::lock_guard<std::mutex> guard(_queue_lock);
_queue.push(std::move(item_));
}
_condition.notify_one();
}
template<typename... Args>
void
emplace(Args&&... args_)
{
{
std::lock_guard<std::mutex> guard(_queue_lock);
_queue.emplace(std::forward<Args>(args_)...);
}
_condition.notify_one();
}
bool
try_push(const T& item_)
{
{
std::unique_lock<std::mutex> lock(_queue_lock, std::try_to_lock);
if(!lock)
return false;
_queue.push(item_);
}
_condition.notify_one();
return true;
}
bool
try_push(T&& item_)
{
{
std::unique_lock<std::mutex> lock(_queue_lock, std::try_to_lock);
if(!lock)
return false;
_queue.push(std::move(item_));
}
_condition.notify_one();
return true;
}
//TODO: push multiple T at once
bool
pop(T& item_)
{
std::unique_lock<std::mutex> guard(_queue_lock);
_condition.wait(guard, [&]() { return !_queue.empty() || !_block; });
if(_queue.empty())
return false;
item_ = std::move(_queue.front());
_queue.pop();
return true;
}
bool
try_pop(T& item_)
{
std::unique_lock<std::mutex> lock(_queue_lock, std::try_to_lock);
if(!lock || _queue.empty())
return false;
item_ = std::move(_queue.front());
_queue.pop();
return true;
}
std::size_t
size() const
{
std::lock_guard<std::mutex> guard(_queue_lock);
return _queue.size();
}
bool
empty() const
{
std::lock_guard<std::mutex> guard(_queue_lock);
return _queue.empty();
}
void
block()
{
std::lock_guard<std::mutex> guard(_queue_lock);
_block = true;
}
void
unblock()
{
{
std::lock_guard<std::mutex> guard(_queue_lock);
_block = false;
}
_condition.notify_all();
}
bool
blocking() const
{
std::lock_guard<std::mutex> guard(_queue_lock);
return _block;
}
private:
mutable std::mutex _queue_lock;
private:
bool _block;
std::queue<T> _queue;
std::condition_variable _condition;
};

6
src/fuse_readdir_cor.cpp

@ -32,7 +32,7 @@
FUSE::ReadDirCOR::ReadDirCOR(unsigned concurrency_)
: _tp(concurrency_,"readdir.cor")
: _tp(concurrency_,concurrency_,"readdir.cor")
{
}
@ -117,7 +117,7 @@ namespace l
static
std::vector<int>
concurrent_readdir(UnboundedThreadPool &tp_,
concurrent_readdir(ThreadPool &tp_,
const Branches::CPtr &branches_,
const char *dirname_,
fuse_dirents_t *buf_,
@ -171,7 +171,7 @@ namespace l
static
int
readdir(UnboundedThreadPool &tp_,
readdir(ThreadPool &tp_,
const Branches::CPtr &branches_,
const char *dirname_,
fuse_dirents_t *buf_,

6
src/fuse_readdir_cor.hpp

@ -19,7 +19,9 @@
#pragma once
#include "fuse_readdir_base.hpp"
#include "unbounded_thread_pool.hpp"
#include "thread_pool.hpp"
// concurrent open & read
namespace FUSE
@ -34,6 +36,6 @@ namespace FUSE
fuse_dirents_t *buf);
private:
UnboundedThreadPool _tp;
ThreadPool _tp;
};
}

6
src/fuse_readdir_cosr.cpp

@ -34,7 +34,7 @@
FUSE::ReadDirCOSR::ReadDirCOSR(unsigned concurrency_)
: _tp(concurrency_,"readdir.cosr")
: _tp(concurrency_,concurrency_,"readdir.cosr")
{
}
@ -63,7 +63,7 @@ namespace l
static
inline
std::vector<std::future<DIR*>>
opendir(UnboundedThreadPool &tp_,
opendir(ThreadPool &tp_,
const Branches::CPtr &branches_,
char const *dirname_,
uid_t const uid_,
@ -148,7 +148,7 @@ namespace l
static
inline
int
readdir(UnboundedThreadPool &tp_,
readdir(ThreadPool &tp_,
const Branches::CPtr &branches_,
const char *dirname_,
fuse_dirents_t *buf_,

5
src/fuse_readdir_cosr.hpp

@ -19,7 +19,8 @@
#pragma once
#include "fuse_readdir_base.hpp"
#include "unbounded_thread_pool.hpp"
#include "thread_pool.hpp"
// concurrent open, sequential read
namespace FUSE
@ -34,6 +35,6 @@ namespace FUSE
fuse_dirents_t *buf);
private:
UnboundedThreadPool _tp;
ThreadPool _tp;
};
}

161
libfuse/lib/bounded_thread_pool.hpp → src/thread_pool.hpp

@ -1,39 +1,39 @@
#pragma once
#include "moodycamel/blockingconcurrentqueue.h"
#include "syslog.h"
#include "syslog.hpp"
#include <atomic>
#include <csignal>
#include <cstring>
#include <future>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>
struct BoundedThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits
struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits
{
static const int MAX_SEMA_SPINS = 1;
};
class BoundedThreadPool
class ThreadPool
{
private:
using Func = std::function<void(void)>;
using Queue = moodycamel::BlockingConcurrentQueue<Func,BoundedThreadPoolTraits>;
using Queue = moodycamel::BlockingConcurrentQueue<Func,ThreadPoolTraits>;
public:
explicit
BoundedThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(),
std::size_t const queue_depth_ = 1,
std::string const name_ = {})
ThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(),
std::size_t const queue_depth_ = 1,
std::string const name_ = {})
: _queue(queue_depth_,thread_count_,thread_count_),
_done(false),
_name(name_)
_name(get_thread_name(name_))
{
syslog_debug("threadpool: spawning %zu threads of queue depth %zu named '%s'",
thread_count_,
@ -52,7 +52,7 @@ public:
int rv;
pthread_t t;
rv = pthread_create(&t,NULL,BoundedThreadPool::start_routine,this);
rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
if(rv != 0)
{
syslog_warning("threadpool: error spawning thread - %d (%s)",
@ -73,14 +73,12 @@ public:
throw std::runtime_error("threadpool: failed to spawn any threads");
}
~BoundedThreadPool()
~ThreadPool()
{
syslog_debug("threadpool: destroying %zu threads named '%s'",
_threads.size(),
_name.c_str());
_done.store(true,std::memory_order_relaxed);
for(auto t : _threads)
pthread_cancel(t);
@ -93,17 +91,29 @@ public:
}
private:
static
std::string
get_thread_name(std::string const name_)
{
if(!name_.empty())
return name_;
char name[16];
pthread_getname_np(pthread_self(),name,sizeof(name));
return name;
}
static
void*
start_routine(void *arg_)
{
BoundedThreadPool *btp = static_cast<BoundedThreadPool*>(arg_);
BoundedThreadPool::Func func;
std::atomic<bool> &done = btp->_done;
BoundedThreadPool::Queue &q = btp->_queue;
ThreadPool *btp = static_cast<ThreadPool*>(arg_);
ThreadPool::Func func;
ThreadPool::Queue &q = btp->_queue;
moodycamel::ConsumerToken ctok(btp->_queue);
while(!done.load(std::memory_order_relaxed))
while(true)
{
q.wait_dequeue(ctok,func);
@ -113,6 +123,110 @@ private:
return NULL;
}
public:
int
add_thread(std::string const name_ = {})
{
int rv;
pthread_t t;
sigset_t oldset;
sigset_t newset;
std::string name;
name = (name_.empty() ? _name : name_);
sigfillset(&newset);
pthread_sigmask(SIG_BLOCK,&newset,&oldset);
rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
pthread_sigmask(SIG_SETMASK,&oldset,NULL);
if(rv != 0)
{
syslog_warning("threadpool: error spawning thread - %d (%s)",
rv,
strerror(rv));
return -rv;
}
if(!name.empty())
pthread_setname_np(t,name.c_str());
{
std::lock_guard<std::mutex> lg(_threads_mutex);
_threads.push_back(t);
}
syslog_debug("threadpool: 1 thread added to pool '%s' named '%s'",
_name.c_str(),
name.c_str());
return 0;
}
int
remove_thread(void)
{
{
std::lock_guard<std::mutex> lg(_threads_mutex);
if(_threads.size() <= 1)
return -EINVAL;
}
std::promise<pthread_t> promise;
auto func = [&]()
{
pthread_t t;
t = pthread_self();
promise.set_value(t);
{
std::lock_guard<std::mutex> lg(_threads_mutex);
for(auto i = _threads.begin(); i != _threads.end(); ++i)
{
if(*i != t)
continue;
_threads.erase(i);
break;
}
}
char name[16];
pthread_getname_np(t,name,sizeof(name));
syslog_debug("threadpool: 1 thread removed from pool '%s' named '%s'",
_name.c_str(),
name);
pthread_exit(NULL);
};
enqueue_work(func);
pthread_join(promise.get_future().get(),NULL);
return 0;
}
int
set_threads(std::size_t const count_)
{
int diff;
{
std::lock_guard<std::mutex> lg(_threads_mutex);
diff = ((int)count_ - (int)_threads.size());
}
syslog_debug("diff: %d",diff);
for(auto i = diff; i > 0; --i)
add_thread();
for(auto i = diff; i < 0; ++i)
remove_thread();
return diff;
}
public:
template<typename FuncType>
void
@ -175,21 +289,16 @@ public:
std::vector<pthread_t>
threads() const
{
return _threads;
}
std::lock_guard<std::mutex> lg(_threads_mutex);
public:
Queue&
queue()
{
return _queue;
return _threads;
}
private:
Queue _queue;
private:
std::atomic<bool> _done;
std::string const _name;
std::vector<pthread_t> _threads;
mutable std::mutex _threads_mutex;
};

172
src/unbounded_thread_pool.hpp

@ -1,172 +0,0 @@
#pragma once
#include "moodycamel/blockingconcurrentqueue.h"
#include "syslog.hpp"
#include <atomic>
#include <csignal>
#include <cstring>
#include <future>
#include <memory>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>
struct UnboundedThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits
{
static const int MAX_SEMA_SPINS = 1;
};
class UnboundedThreadPool
{
private:
using Func = std::function<void(void)>;
using Queue = moodycamel::BlockingConcurrentQueue<Func,UnboundedThreadPoolTraits>;
public:
explicit
UnboundedThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(),
std::string const name_ = {})
: _queue(thread_count_,thread_count_,thread_count_),
_done(false),
_name(name_)
{
syslog_debug("threadpool: spawning %zu threads named '%s'",
thread_count_,
_name.c_str());
sigset_t oldset;
sigset_t newset;
sigfillset(&newset);
pthread_sigmask(SIG_BLOCK,&newset,&oldset);
_threads.reserve(thread_count_);
for(std::size_t i = 0; i < thread_count_; ++i)
{
int rv;
pthread_t t;
rv = pthread_create(&t,NULL,UnboundedThreadPool::start_routine,this);
if(rv != 0)
{
syslog_warning("threadpool: error spawning thread - %d (%s)",
rv,
strerror(rv));
continue;
}
if(!_name.empty())
pthread_setname_np(t,_name.c_str());
_threads.push_back(t);
}
pthread_sigmask(SIG_SETMASK,&oldset,NULL);
if(_threads.empty())
throw std::runtime_error("threadpool: failed to spawn any threads");
}
~UnboundedThreadPool()
{
syslog_debug("threadpool: destroying %zu threads named '%s'",
_threads.size(),
_name.c_str());
_done.store(true,std::memory_order_relaxed);
for(auto t : _threads)
pthread_cancel(t);
Func f;
while(_queue.try_dequeue(f))
continue;
for(auto t : _threads)
pthread_join(t,NULL);
}
private:
static
void*
start_routine(void *arg_)
{
UnboundedThreadPool *btp = static_cast<UnboundedThreadPool*>(arg_);
UnboundedThreadPool::Func func;
std::atomic<bool> &done = btp->_done;
UnboundedThreadPool::Queue &q = btp->_queue;
moodycamel::ConsumerToken ctok(btp->_queue);
while(!done.load(std::memory_order_relaxed))
{
q.wait_dequeue(ctok,func);
func();
}
return NULL;
}
public:
template<typename FuncType>
void
enqueue_work(moodycamel::ProducerToken &ptok_,
FuncType &&f_)
{
_queue.enqueue(ptok_,f_);
}
template<typename FuncType>
void
enqueue_work(FuncType &&f_)
{
_queue.enqueue(f_);
}
template<typename FuncType>
[[nodiscard]]
std::future<typename std::result_of<FuncType()>::type>
enqueue_task(FuncType&& f_)
{
using TaskReturnType = typename std::result_of<FuncType()>::type;
using Promise = std::promise<TaskReturnType>;
auto promise = std::make_shared<Promise>();
auto future = promise->get_future();
auto work = [=]()
{
auto rv = f_();
promise->set_value(rv);
};
_queue.enqueue(work);
return future;
}
public:
std::vector<pthread_t>
threads() const
{
return _threads;
}
public:
Queue&
queue()
{
return _queue;
}
private:
Queue _queue;
private:
std::atomic<bool> _done;
std::string const _name;
std::vector<pthread_t> _threads;
};
Loading…
Cancel
Save