Browse Source

Misc fixes and improvements to thread pool

fixes
Antonio SJ Musumeci 5 days ago
parent
commit
db2e948ca0
  1. 548
      tests/tests.cpp
  2. 153
      vendored/libfuse/include/bounded_queue.hpp
  3. 9
      vendored/libfuse/include/mutex.hpp
  4. 287
      vendored/libfuse/include/thread_pool.hpp

548
tests/tests.cpp

@ -2,6 +2,12 @@
#include "config.hpp"
#include "str.hpp"
#include "thread_pool.hpp"
#include <atomic>
#include <chrono>
#include <numeric>
#include <thread>
void
test_nop()
@ -481,6 +487,517 @@ test_config()
TEST_CHECK(cfg.set("async-read","true") == 0);
}
// =====================================================================
// ThreadPool tests
// =====================================================================
void
test_tp_construct_default()
{
ThreadPool tp(2, 4, "test.default");
auto threads = tp.threads();
TEST_CHECK(threads.size() == 2);
}
void
test_tp_construct_named()
{
ThreadPool tp(1, 2, "test.named");
auto threads = tp.threads();
TEST_CHECK(threads.size() == 1);
}
void
test_tp_construct_zero_threads_throws()
{
bool threw = false;
try
{
ThreadPool tp(0, 4, "test.zero");
}
catch(const std::runtime_error &)
{
threw = true;
}
TEST_CHECK(threw);
}
void
test_tp_enqueue_work()
{
ThreadPool tp(2, 4, "test.ew");
std::atomic<int> counter{0};
for(int i = 0; i < 10; ++i)
tp.enqueue_work([&counter](){ counter.fetch_add(1); });
// wait for completion
for(int i = 0; i < 1000 && counter.load() < 10; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
TEST_CHECK(counter.load() == 10);
}
void
test_tp_enqueue_work_with_ptoken()
{
ThreadPool tp(2, 4, "test.ewp");
std::atomic<int> counter{0};
auto ptok = tp.ptoken();
for(int i = 0; i < 10; ++i)
tp.enqueue_work(ptok, [&counter](){ counter.fetch_add(1); });
for(int i = 0; i < 1000 && counter.load() < 10; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
TEST_CHECK(counter.load() == 10);
}
void
test_tp_try_enqueue_work()
{
// Use 1 thread and a small queue depth of 2
ThreadPool tp(1, 2, "test.tew");
std::atomic<int> counter{0};
// These should succeed (queue has room)
bool ok = tp.try_enqueue_work([&counter](){ counter.fetch_add(1); });
TEST_CHECK(ok);
for(int i = 0; i < 1000 && counter.load() < 1; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
TEST_CHECK(counter.load() == 1);
}
void
test_tp_try_enqueue_work_with_ptoken()
{
ThreadPool tp(1, 2, "test.tewp");
std::atomic<int> counter{0};
auto ptok = tp.ptoken();
bool ok = tp.try_enqueue_work(ptok, [&counter](){ counter.fetch_add(1); });
TEST_CHECK(ok);
for(int i = 0; i < 1000 && counter.load() < 1; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
TEST_CHECK(counter.load() == 1);
}
void
test_tp_try_enqueue_work_for()
{
ThreadPool tp(1, 2, "test.tewf");
std::atomic<int> counter{0};
bool ok = tp.try_enqueue_work_for(100000, // 100ms
[&counter](){ counter.fetch_add(1); });
TEST_CHECK(ok);
for(int i = 0; i < 1000 && counter.load() < 1; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
TEST_CHECK(counter.load() == 1);
}
void
test_tp_try_enqueue_work_for_with_ptoken()
{
ThreadPool tp(1, 2, "test.tewfp");
std::atomic<int> counter{0};
auto ptok = tp.ptoken();
bool ok = tp.try_enqueue_work_for(ptok,
100000, // 100ms
[&counter](){ counter.fetch_add(1); });
TEST_CHECK(ok);
for(int i = 0; i < 1000 && counter.load() < 1; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
TEST_CHECK(counter.load() == 1);
}
void
test_tp_enqueue_task_int()
{
ThreadPool tp(2, 4, "test.eti");
auto future = tp.enqueue_task([]() -> int { return 42; });
TEST_CHECK(future.get() == 42);
}
void
test_tp_enqueue_task_void()
{
ThreadPool tp(2, 4, "test.etv");
std::atomic<bool> ran{false};
auto future = tp.enqueue_task([&ran]() { ran.store(true); });
future.get(); // should not throw
TEST_CHECK(ran.load());
}
void
test_tp_enqueue_task_string()
{
ThreadPool tp(2, 4, "test.ets");
auto future = tp.enqueue_task([]() -> std::string { return "hello"; });
TEST_CHECK(future.get() == "hello");
}
void
test_tp_enqueue_task_exception()
{
ThreadPool tp(2, 4, "test.ete");
auto future = tp.enqueue_task([]() -> int
{
throw std::runtime_error("task error");
return 0;
});
bool caught = false;
try
{
future.get();
}
catch(const std::runtime_error &e)
{
caught = true;
TEST_CHECK(std::string(e.what()) == "task error");
}
TEST_CHECK(caught);
}
void
test_tp_threads_returns_correct_count()
{
ThreadPool tp(4, 8, "test.thr");
auto threads = tp.threads();
TEST_CHECK(threads.size() == 4);
// All thread IDs should be non-zero
for(auto t : threads)
TEST_CHECK(t != 0);
}
void
test_tp_threads_unique_ids()
{
ThreadPool tp(4, 8, "test.uid");
auto threads = tp.threads();
// All thread IDs should be unique
for(std::size_t i = 0; i < threads.size(); ++i)
for(std::size_t j = i + 1; j < threads.size(); ++j)
TEST_CHECK(threads[i] != threads[j]);
}
void
test_tp_add_thread()
{
ThreadPool tp(2, 4, "test.add");
TEST_CHECK(tp.threads().size() == 2);
int rv = tp.add_thread();
TEST_CHECK(rv == 0);
TEST_CHECK(tp.threads().size() == 3);
}
void
test_tp_remove_thread()
{
ThreadPool tp(3, 4, "test.rm");
TEST_CHECK(tp.threads().size() == 3);
int rv = tp.remove_thread();
TEST_CHECK(rv == 0);
TEST_CHECK(tp.threads().size() == 2);
}
void
test_tp_remove_thread_refuses_last()
{
ThreadPool tp(1, 4, "test.rmlast");
TEST_CHECK(tp.threads().size() == 1);
int rv = tp.remove_thread();
TEST_CHECK(rv == -EINVAL);
TEST_CHECK(tp.threads().size() == 1);
}
void
test_tp_set_threads_grow()
{
ThreadPool tp(2, 8, "test.grow");
TEST_CHECK(tp.threads().size() == 2);
int rv = tp.set_threads(4);
TEST_CHECK(rv == 0);
TEST_CHECK(tp.threads().size() == 4);
}
void
test_tp_set_threads_shrink()
{
ThreadPool tp(4, 8, "test.shrink");
TEST_CHECK(tp.threads().size() == 4);
int rv = tp.set_threads(2);
TEST_CHECK(rv == 0);
TEST_CHECK(tp.threads().size() == 2);
}
void
test_tp_set_threads_zero()
{
ThreadPool tp(2, 4, "test.sz");
int rv = tp.set_threads(0);
TEST_CHECK(rv == -EINVAL);
TEST_CHECK(tp.threads().size() == 2);
}
void
test_tp_set_threads_same()
{
ThreadPool tp(3, 4, "test.same");
int rv = tp.set_threads(3);
TEST_CHECK(rv == 0);
TEST_CHECK(tp.threads().size() == 3);
}
void
test_tp_work_after_add_thread()
{
ThreadPool tp(1, 4, "test.waat");
tp.add_thread();
tp.add_thread();
std::atomic<int> counter{0};
const int N = 20;
for(int i = 0; i < N; ++i)
tp.enqueue_work([&counter](){ counter.fetch_add(1); });
for(int i = 0; i < 2000 && counter.load() < N; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
TEST_CHECK(counter.load() == N);
}
void
test_tp_work_after_remove_thread()
{
ThreadPool tp(3, 4, "test.wart");
tp.remove_thread();
std::atomic<int> counter{0};
const int N = 10;
for(int i = 0; i < N; ++i)
tp.enqueue_work([&counter](){ counter.fetch_add(1); });
for(int i = 0; i < 2000 && counter.load() < N; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
TEST_CHECK(counter.load() == N);
}
void
test_tp_worker_exception_no_crash()
{
ThreadPool tp(2, 4, "test.exc");
// Enqueue work that throws -- pool should not crash
tp.enqueue_work([](){
throw std::runtime_error("deliberate error");
});
// Enqueue normal work after the exception
std::atomic<bool> ran{false};
tp.enqueue_work([&ran](){ ran.store(true); });
for(int i = 0; i < 1000 && !ran.load(); ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
TEST_CHECK(ran.load());
}
void
test_tp_concurrent_enqueue()
{
ThreadPool tp(4, 64, "test.conc");
std::atomic<int> counter{0};
const int PRODUCERS = 4;
const int ITEMS_PER = 50;
const int TOTAL = PRODUCERS * ITEMS_PER;
std::vector<std::thread> producers;
for(int p = 0; p < PRODUCERS; ++p)
{
producers.emplace_back([&tp, &counter]()
{
auto ptok = tp.ptoken();
for(int i = 0; i < ITEMS_PER; ++i)
tp.enqueue_work(ptok, [&counter](){ counter.fetch_add(1); });
});
}
for(auto &t : producers)
t.join();
for(int i = 0; i < 5000 && counter.load() < TOTAL; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
TEST_CHECK(counter.load() == TOTAL);
}
void
test_tp_enqueue_task_multiple()
{
ThreadPool tp(4, 16, "test.etm");
std::vector<std::future<int>> futures;
const int N = 20;
for(int i = 0; i < N; ++i)
futures.push_back(tp.enqueue_task([i]() -> int { return i * i; }));
for(int i = 0; i < N; ++i)
TEST_CHECK(futures[i].get() == i * i);
}
void
test_tp_ptoken_creation()
{
ThreadPool tp(2, 4, "test.ptok");
// Multiple ptokens should be independently usable
auto ptok1 = tp.ptoken();
auto ptok2 = tp.ptoken();
std::atomic<int> counter{0};
tp.enqueue_work(ptok1, [&counter](){ counter.fetch_add(1); });
tp.enqueue_work(ptok2, [&counter](){ counter.fetch_add(1); });
for(int i = 0; i < 1000 && counter.load() < 2; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
TEST_CHECK(counter.load() == 2);
}
void
test_tp_destruction_drains_queue()
{
std::atomic<int> counter{0};
{
ThreadPool tp(2, 64, "test.drain");
for(int i = 0; i < 50; ++i)
tp.enqueue_work([&counter](){
counter.fetch_add(1);
});
// destructor runs here -- should drain all queued work
}
TEST_CHECK(counter.load() == 50);
}
void
test_tp_move_only_callable()
{
ThreadPool tp(2, 4, "test.moc");
auto ptr = std::make_unique<int>(99);
auto future = tp.enqueue_task([p = std::move(ptr)]() -> int
{
return *p;
});
TEST_CHECK(future.get() == 99);
}
void
test_tp_work_ordering_single_thread()
{
// With a single thread, work should execute in FIFO order
ThreadPool tp(1, 16, "test.order");
std::vector<int> results;
std::mutex mtx;
const int N = 10;
for(int i = 0; i < N; ++i)
tp.enqueue_work([i, &results, &mtx](){
std::lock_guard<std::mutex> lk(mtx);
results.push_back(i);
});
// Wait for all work to complete
for(int i = 0; i < 2000; ++i)
{
{
std::lock_guard<std::mutex> lk(mtx);
if((int)results.size() == N)
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
std::lock_guard<std::mutex> lk(mtx);
TEST_CHECK((int)results.size() == N);
for(int i = 0; i < N; ++i)
TEST_CHECK(results[i] == i);
}
TEST_LIST =
{
{"nop",test_nop},
@ -499,5 +1016,36 @@ TEST_LIST =
{"config_xattr",test_config_xattr},
{"config",test_config},
{"str",test_str_stuff},
{"tp_construct_default",test_tp_construct_default},
{"tp_construct_named",test_tp_construct_named},
{"tp_construct_zero_threads_throws",test_tp_construct_zero_threads_throws},
{"tp_enqueue_work",test_tp_enqueue_work},
{"tp_enqueue_work_with_ptoken",test_tp_enqueue_work_with_ptoken},
{"tp_try_enqueue_work",test_tp_try_enqueue_work},
{"tp_try_enqueue_work_with_ptoken",test_tp_try_enqueue_work_with_ptoken},
{"tp_try_enqueue_work_for",test_tp_try_enqueue_work_for},
{"tp_try_enqueue_work_for_with_ptoken",test_tp_try_enqueue_work_for_with_ptoken},
{"tp_enqueue_task_int",test_tp_enqueue_task_int},
{"tp_enqueue_task_void",test_tp_enqueue_task_void},
{"tp_enqueue_task_string",test_tp_enqueue_task_string},
{"tp_enqueue_task_exception",test_tp_enqueue_task_exception},
{"tp_enqueue_task_multiple",test_tp_enqueue_task_multiple},
{"tp_threads_returns_correct_count",test_tp_threads_returns_correct_count},
{"tp_threads_unique_ids",test_tp_threads_unique_ids},
{"tp_add_thread",test_tp_add_thread},
{"tp_remove_thread",test_tp_remove_thread},
{"tp_remove_thread_refuses_last",test_tp_remove_thread_refuses_last},
{"tp_set_threads_grow",test_tp_set_threads_grow},
{"tp_set_threads_shrink",test_tp_set_threads_shrink},
{"tp_set_threads_zero",test_tp_set_threads_zero},
{"tp_set_threads_same",test_tp_set_threads_same},
{"tp_work_after_add_thread",test_tp_work_after_add_thread},
{"tp_work_after_remove_thread",test_tp_work_after_remove_thread},
{"tp_worker_exception_no_crash",test_tp_worker_exception_no_crash},
{"tp_concurrent_enqueue",test_tp_concurrent_enqueue},
{"tp_ptoken_creation",test_tp_ptoken_creation},
{"tp_destruction_drains_queue",test_tp_destruction_drains_queue},
{"tp_move_only_callable",test_tp_move_only_callable},
{"tp_work_ordering_single_thread",test_tp_work_ordering_single_thread},
{NULL,NULL}
};

153
vendored/libfuse/include/bounded_queue.hpp

@ -0,0 +1,153 @@
/*
ISC License
Copyright (c) 2026, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include "moodycamel/blockingconcurrentqueue.h"
#include "moodycamel/lightweightsemaphore.h"
#include <cstdint>
// Wraps an unbounded BlockingConcurrentQueue with a semaphore to enforce
// a maximum queue depth. The semaphore tracks available slots:
// - Producers wait (decrement) before enqueuing, blocking when full.
// - Consumers signal (increment) after dequeuing, freeing a slot.
// This co-locates the invariant so the pairing cannot drift apart.
template<typename T, typename Traits = moodycamel::ConcurrentQueueDefaultTraits>
class BoundedQueue
{
public:
using PToken = moodycamel::ProducerToken;
using CToken = moodycamel::ConsumerToken;
private:
using Queue = moodycamel::BlockingConcurrentQueue<T,Traits>;
Queue _queue;
moodycamel::LightweightSemaphore _slots;
public:
explicit
BoundedQueue(std::size_t max_depth_)
: _queue(),
_slots(max_depth_)
{
}
BoundedQueue(const BoundedQueue&) = delete;
BoundedQueue& operator=(const BoundedQueue&) = delete;
BoundedQueue(BoundedQueue&&) = delete;
BoundedQueue& operator=(BoundedQueue&&) = delete;
// -- Blocking enqueue (waits indefinitely for a slot) -------------------
template<typename U>
void
enqueue(U &&item_)
{
_slots.wait();
_queue.enqueue(std::forward<U>(item_));
}
template<typename U>
void
enqueue(PToken &ptok_, U &&item_)
{
_slots.wait();
_queue.enqueue(ptok_, std::forward<U>(item_));
}
// -- Non-blocking enqueue (fails immediately if full) -------------------
template<typename U>
bool
try_enqueue(U &&item_)
{
if(!_slots.tryWait())
return false;
_queue.enqueue(std::forward<U>(item_));
return true;
}
template<typename U>
bool
try_enqueue(PToken &ptok_, U &&item_)
{
if(!_slots.tryWait())
return false;
_queue.enqueue(ptok_, std::forward<U>(item_));
return true;
}
// -- Timed enqueue (waits up to timeout_usecs_ for a slot) --------------
template<typename U>
bool
try_enqueue_for(std::int64_t timeout_usecs_, U &&item_)
{
if(!_slots.wait(timeout_usecs_))
return false;
_queue.enqueue(std::forward<U>(item_));
return true;
}
template<typename U>
bool
try_enqueue_for(PToken &ptok_, std::int64_t timeout_usecs_, U &&item_)
{
if(!_slots.wait(timeout_usecs_))
return false;
_queue.enqueue(ptok_, std::forward<U>(item_));
return true;
}
// -- Unbounded enqueue (bypasses backpressure) --------------------------
// Used for internal control messages (e.g. thread exit signals) that
// must not be blocked by a full queue.
template<typename U>
void
enqueue_unbounded(U &&item_)
{
_queue.enqueue(std::forward<U>(item_));
}
// -- Blocking dequeue (signals a slot after consuming) ------------------
void
wait_dequeue(CToken &ctok_, T &item_)
{
_queue.wait_dequeue(ctok_, item_);
_slots.signal();
}
// -- Token creation -----------------------------------------------------
PToken
make_ptoken()
{
return PToken(_queue);
}
CToken
make_ctoken()
{
return CToken(_queue);
}
};

9
vendored/libfuse/include/mutex.hpp

@ -6,6 +6,8 @@
#include "mutex_ndebug.hpp"
#endif
#include "scope_guard.hpp"
typedef pthread_mutex_t mutex_t;
// Simple mutex_t wrapper to provide RAII
@ -50,3 +52,10 @@ public:
mutex_unlock(_mutex);
}
};
// Single-line mutex guard using scope_guard
// Expands to mutex_lock() followed by DEFER mutex_unlock()
// Both lock and unlock capture __FILE__/__func__/__LINE__ from the call site
#define mutex_lockguard(m) \
mutex_lock(m); \
DEFER { mutex_unlock(m); }

287
vendored/libfuse/include/thread_pool.hpp

@ -1,41 +1,50 @@
/*
ISC License
Copyright (c) 2026, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include "moodycamel/blockingconcurrentqueue.h"
#include "moodycamel/lightweightsemaphore.h"
#include "bounded_queue.hpp"
#include "invocable.h"
#include "mutex.hpp"
#include <algorithm>
#include <atomic>
#include <cerrno>
#include <csignal>
#include <cstdint>
#include <cstring>
#include <future>
#include <memory>
#include <optional>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>
#include <utility>
#include <vector>
#include <pthread.h>
#include <syslog.h>
#define SEM
//#undef SEM
#ifdef SEM
#define SEMA_WAIT(S) (S.wait())
#define SEMA_SIGNAL(S) (S.signal())
#else
#define SEMA_WAIT(S)
#define SEMA_SIGNAL(S)
#endif
struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits
{
static const int MAX_SEMA_SPINS = 1;
static const int MAX_SEMA_SPINS = 10;
};
@ -47,25 +56,29 @@ public:
private:
using Func = ofats::any_invocable<void()>;
using Queue = moodycamel::BlockingConcurrentQueue<Func,ThreadPoolTraits>;
using Queue = BoundedQueue<Func,ThreadPoolTraits>;
struct ThreadExitSignal {};
public:
explicit
ThreadPool(const unsigned thread_count_ = std::thread::hardware_concurrency(),
const unsigned max_queue_depth_ = std::thread::hardware_concurrency(),
std::string const name_ = {});
ThreadPool(const unsigned thread_count_ = std::thread::hardware_concurrency(),
const unsigned max_queue_depth_ = std::thread::hardware_concurrency() * 2,
const std::string &name_ = {});
~ThreadPool();
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
private:
static void *start_routine(void *arg_);
public:
int add_thread(std::string const name = {});
int remove_thread(void);
int add_thread(const std::string &name = {});
int remove_thread();
int set_threads(const std::size_t count);
void shutdown(void);
public:
template<typename FuncType>
void
@ -76,6 +89,26 @@ public:
void
enqueue_work(FuncType &&func_);
template<typename FuncType>
bool
try_enqueue_work(ThreadPool::PToken &ptok_,
FuncType &&func_);
template<typename FuncType>
bool
try_enqueue_work(FuncType &&func_);
template<typename FuncType>
bool
try_enqueue_work_for(ThreadPool::PToken &ptok_,
std::int64_t timeout_usecs_,
FuncType &&func_);
template<typename FuncType>
bool
try_enqueue_work_for(std::int64_t timeout_usecs_,
FuncType &&func_);
template<typename FuncType>
[[nodiscard]]
std::future<std::invoke_result_t<FuncType>>
@ -87,7 +120,6 @@ public:
private:
Queue _queue;
moodycamel::LightweightSemaphore _sema;
private:
std::string const _name;
@ -98,11 +130,10 @@ private:
inline
ThreadPool::ThreadPool(const unsigned thread_count_,
const unsigned max_queue_depth_,
const std::string name_)
: _queue(),
_sema(max_queue_depth_),
ThreadPool::ThreadPool(const unsigned thread_count_,
const unsigned max_queue_depth_,
const std::string &name_)
: _queue(max_queue_depth_),
_name(name_)
{
sigset_t oldset;
@ -128,8 +159,11 @@ ThreadPool::ThreadPool(const unsigned thread_count_,
continue;
}
if(!_name.empty())
pthread_setname_np(t,_name.c_str());
if(not _name.empty())
{
std::string name = _name.substr(0, 15);
pthread_setname_np(t, name.c_str());
}
_threads.push_back(t);
}
@ -150,18 +184,25 @@ ThreadPool::ThreadPool(const unsigned thread_count_,
inline
ThreadPool::~ThreadPool()
{
std::vector<pthread_t> threads;
{
mutex_lockguard(_threads_mutex);
threads = _threads;
}
syslog(LOG_DEBUG,
"threadpool (%s): destroying %zu threads",
_name.c_str(),
_threads.size());
threads.size());
for(auto t : _threads)
pthread_cancel(t);
for(auto t : _threads)
for(auto _ : threads)
_queue.enqueue_unbounded([](){ throw ThreadExitSignal{}; });
for(auto t : threads)
pthread_join(t,NULL);
}
// Threads purposefully do not restore default signal handling.
inline
void*
ThreadPool::start_routine(void *arg_)
@ -171,8 +212,7 @@ ThreadPool::start_routine(void *arg_)
bool done;
ThreadPool::Func func;
ThreadPool::Queue &q = btp->_queue;
moodycamel::LightweightSemaphore &sema = btp->_sema;
ThreadPool::CToken ctok(btp->_queue);
ThreadPool::CToken ctok(btp->_queue.make_ctoken());
done = false;
while(!done)
@ -185,12 +225,26 @@ ThreadPool::start_routine(void *arg_)
{
func();
}
catch(std::exception &e)
catch(const ThreadExitSignal&)
{
done = true;
}
catch(std::exception &e)
{
syslog(LOG_CRIT,
"threadpool (%s): uncaught exception caught by worker - %s",
btp->_name.c_str(),
e.what());
}
catch(...)
{
syslog(LOG_CRIT,
"threadpool (%s): uncaught non-standard exception caught by worker",
btp->_name.c_str());
}
SEMA_SIGNAL(sema);
// force destruction to release resources
func = {};
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
}
@ -198,11 +252,9 @@ ThreadPool::start_routine(void *arg_)
return nullptr;
}
inline
int
ThreadPool::add_thread(const std::string name_)
ThreadPool::add_thread(const std::string &name_)
{
int rv;
pthread_t t;
@ -227,117 +279,154 @@ ThreadPool::add_thread(const std::string name_)
return -rv;
}
if(!name.empty())
pthread_setname_np(t,name.c_str());
if(not name.empty())
{
std::string n = name.substr(0, 15);
pthread_setname_np(t,n.c_str());
}
{
LockGuard lg(_threads_mutex);
mutex_lockguard(_threads_mutex);
_threads.push_back(t);
}
return 0;
}
inline
int
ThreadPool::remove_thread(void)
ThreadPool::remove_thread()
{
{
LockGuard lg(_threads_mutex);
mutex_lockguard(_threads_mutex);
if(_threads.size() <= 1)
return -EINVAL;
}
std::promise<pthread_t> promise;
auto promise = std::make_shared<std::promise<std::optional<pthread_t>>>();
auto func =
[this,&promise]()
[this,promise]()
{
pthread_t t;
t = pthread_self();
promise.set_value(t);
{
LockGuard lg(_threads_mutex);
mutex_lockguard(_threads_mutex);
for(auto i = _threads.begin(); i != _threads.end(); ++i)
if(_threads.size() <= 1)
{
if(*i != t)
continue;
_threads.erase(i);
break;
promise->set_value(std::nullopt);
return;
}
auto it = std::find(_threads.begin(),
_threads.end(),
t);
if(it != _threads.end())
_threads.erase(it);
}
pthread_exit(NULL);
promise->set_value(t);
throw ThreadExitSignal{};
};
enqueue_work(std::move(func));
pthread_join(promise.get_future().get(),NULL);
_queue.enqueue_unbounded(std::move(func));
return 0;
}
auto result = promise->get_future().get();
if(result.has_value())
{
pthread_join(result.value(),NULL);
return 0;
}
return -EINVAL;
}
inline
int
ThreadPool::set_threads(std::size_t const count_)
ThreadPool::set_threads(const std::size_t count_)
{
int diff;
if(count_ == 0)
return -EINVAL;
{
LockGuard lg(_threads_mutex);
std::size_t current;
diff = ((int)count_ - (int)_threads.size());
{
mutex_lockguard(_threads_mutex);
current = _threads.size();
}
for(auto i = diff; i > 0; --i)
for(std::size_t i = current; i < count_; ++i)
add_thread();
for(auto i = diff; i < 0; ++i)
for(std::size_t i = count_; i < current; ++i)
remove_thread();
return diff;
return 0;
}
template<typename FuncType>
inline
void
ThreadPool::enqueue_work(ThreadPool::PToken &ptok_,
FuncType &&func_)
{
_queue.enqueue(ptok_,
std::forward<FuncType>(func_));
}
template<typename FuncType>
inline
void
ThreadPool::shutdown(void)
ThreadPool::enqueue_work(FuncType &&func_)
{
LockGuard lg(_threads_mutex);
_queue.enqueue(std::forward<FuncType>(func_));
}
for(pthread_t tid : _threads)
pthread_cancel(tid);
for(pthread_t tid : _threads)
pthread_join(tid,NULL);
_threads.clear();
template<typename FuncType>
inline
bool
ThreadPool::try_enqueue_work(ThreadPool::PToken &ptok_,
FuncType &&func_)
{
return _queue.try_enqueue(ptok_,
std::forward<FuncType>(func_));
}
template<typename FuncType>
inline
void
ThreadPool::enqueue_work(ThreadPool::PToken &ptok_,
FuncType &&func_)
bool
ThreadPool::try_enqueue_work(FuncType &&func_)
{
SEMA_WAIT(_sema);
_queue.enqueue(ptok_,
std::forward<FuncType>(func_));
return _queue.try_enqueue(std::forward<FuncType>(func_));
}
template<typename FuncType>
inline
void
ThreadPool::enqueue_work(FuncType &&func_)
bool
ThreadPool::try_enqueue_work_for(ThreadPool::PToken &ptok_,
std::int64_t timeout_usecs_,
FuncType &&func_)
{
SEMA_WAIT(_sema);
_queue.enqueue(std::forward<FuncType>(func_));
return _queue.try_enqueue_for(ptok_,
timeout_usecs_,
std::forward<FuncType>(func_));
}
template<typename FuncType>
inline
bool
ThreadPool::try_enqueue_work_for(std::int64_t timeout_usecs_,
FuncType &&func_)
{
return _queue.try_enqueue_for(timeout_usecs_,
std::forward<FuncType>(func_));
}
@ -359,8 +448,15 @@ ThreadPool::enqueue_task(FuncType&& func_)
{
try
{
auto rv = func_();
promise_.set_value(std::move(rv));
if constexpr (std::is_void_v<TaskReturnType>)
{
func_();
promise_.set_value();
}
else
{
promise_.set_value(func_());
}
}
catch(...)
{
@ -368,7 +464,6 @@ ThreadPool::enqueue_task(FuncType&& func_)
}
};
SEMA_WAIT(_sema);
_queue.enqueue(std::move(work));
return future;
@ -379,7 +474,7 @@ inline
std::vector<pthread_t>
ThreadPool::threads() const
{
LockGuard lg(_threads_mutex);
mutex_lockguard(_threads_mutex);
return _threads;
}
@ -389,5 +484,5 @@ inline
ThreadPool::PToken
ThreadPool::ptoken()
{
return ThreadPool::PToken(_queue);
return _queue.make_ptoken();
}
Loading…
Cancel
Save