diff --git a/tests/tests.cpp b/tests/tests.cpp index 3ef94908..6ca5b514 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -2,6 +2,12 @@ #include "config.hpp" #include "str.hpp" +#include "thread_pool.hpp" + +#include +#include +#include +#include void test_nop() @@ -481,6 +487,517 @@ test_config() TEST_CHECK(cfg.set("async-read","true") == 0); } +// ===================================================================== +// ThreadPool tests +// ===================================================================== + +void +test_tp_construct_default() +{ + ThreadPool tp(2, 4, "test.default"); + + auto threads = tp.threads(); + + TEST_CHECK(threads.size() == 2); +} + +void +test_tp_construct_named() +{ + ThreadPool tp(1, 2, "test.named"); + + auto threads = tp.threads(); + + TEST_CHECK(threads.size() == 1); +} + +void +test_tp_construct_zero_threads_throws() +{ + bool threw = false; + + try + { + ThreadPool tp(0, 4, "test.zero"); + } + catch(const std::runtime_error &) + { + threw = true; + } + + TEST_CHECK(threw); +} + +void +test_tp_enqueue_work() +{ + ThreadPool tp(2, 4, "test.ew"); + + std::atomic counter{0}; + + for(int i = 0; i < 10; ++i) + tp.enqueue_work([&counter](){ counter.fetch_add(1); }); + + // wait for completion + for(int i = 0; i < 1000 && counter.load() < 10; ++i) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + TEST_CHECK(counter.load() == 10); +} + +void +test_tp_enqueue_work_with_ptoken() +{ + ThreadPool tp(2, 4, "test.ewp"); + + std::atomic counter{0}; + auto ptok = tp.ptoken(); + + for(int i = 0; i < 10; ++i) + tp.enqueue_work(ptok, [&counter](){ counter.fetch_add(1); }); + + for(int i = 0; i < 1000 && counter.load() < 10; ++i) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + TEST_CHECK(counter.load() == 10); +} + +void +test_tp_try_enqueue_work() +{ + // Use 1 thread and a small queue depth of 2 + ThreadPool tp(1, 2, "test.tew"); + + std::atomic counter{0}; + + // These should succeed (queue has room) + bool ok = tp.try_enqueue_work([&counter](){ counter.fetch_add(1); }); + TEST_CHECK(ok); + + for(int i = 0; i < 1000 && counter.load() < 1; ++i) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + TEST_CHECK(counter.load() == 1); +} + +void +test_tp_try_enqueue_work_with_ptoken() +{ + ThreadPool tp(1, 2, "test.tewp"); + + std::atomic counter{0}; + auto ptok = tp.ptoken(); + + bool ok = tp.try_enqueue_work(ptok, [&counter](){ counter.fetch_add(1); }); + TEST_CHECK(ok); + + for(int i = 0; i < 1000 && counter.load() < 1; ++i) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + TEST_CHECK(counter.load() == 1); +} + +void +test_tp_try_enqueue_work_for() +{ + ThreadPool tp(1, 2, "test.tewf"); + + std::atomic counter{0}; + + bool ok = tp.try_enqueue_work_for(100000, // 100ms + [&counter](){ counter.fetch_add(1); }); + TEST_CHECK(ok); + + for(int i = 0; i < 1000 && counter.load() < 1; ++i) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + TEST_CHECK(counter.load() == 1); +} + +void +test_tp_try_enqueue_work_for_with_ptoken() +{ + ThreadPool tp(1, 2, "test.tewfp"); + + std::atomic counter{0}; + auto ptok = tp.ptoken(); + + bool ok = tp.try_enqueue_work_for(ptok, + 100000, // 100ms + [&counter](){ counter.fetch_add(1); }); + TEST_CHECK(ok); + + for(int i = 0; i < 1000 && counter.load() < 1; ++i) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + TEST_CHECK(counter.load() == 1); +} + +void +test_tp_enqueue_task_int() +{ + ThreadPool tp(2, 4, "test.eti"); + + auto future = tp.enqueue_task([]() -> int { return 42; }); + + TEST_CHECK(future.get() == 42); +} + +void +test_tp_enqueue_task_void() +{ + ThreadPool tp(2, 4, "test.etv"); + + std::atomic ran{false}; + + auto future = tp.enqueue_task([&ran]() { ran.store(true); }); + + future.get(); // should not throw + + TEST_CHECK(ran.load()); +} + +void +test_tp_enqueue_task_string() +{ + ThreadPool tp(2, 4, "test.ets"); + + auto future = tp.enqueue_task([]() -> std::string { return "hello"; }); + + TEST_CHECK(future.get() == "hello"); +} + +void +test_tp_enqueue_task_exception() +{ + ThreadPool tp(2, 4, "test.ete"); + + auto future = tp.enqueue_task([]() -> int + { + throw std::runtime_error("task error"); + return 0; + }); + + bool caught = false; + + try + { + future.get(); + } + catch(const std::runtime_error &e) + { + caught = true; + TEST_CHECK(std::string(e.what()) == "task error"); + } + + TEST_CHECK(caught); +} + +void +test_tp_threads_returns_correct_count() +{ + ThreadPool tp(4, 8, "test.thr"); + + auto threads = tp.threads(); + + TEST_CHECK(threads.size() == 4); + + // All thread IDs should be non-zero + for(auto t : threads) + TEST_CHECK(t != 0); +} + +void +test_tp_threads_unique_ids() +{ + ThreadPool tp(4, 8, "test.uid"); + + auto threads = tp.threads(); + + // All thread IDs should be unique + for(std::size_t i = 0; i < threads.size(); ++i) + for(std::size_t j = i + 1; j < threads.size(); ++j) + TEST_CHECK(threads[i] != threads[j]); +} + +void +test_tp_add_thread() +{ + ThreadPool tp(2, 4, "test.add"); + + TEST_CHECK(tp.threads().size() == 2); + + int rv = tp.add_thread(); + + TEST_CHECK(rv == 0); + TEST_CHECK(tp.threads().size() == 3); +} + +void +test_tp_remove_thread() +{ + ThreadPool tp(3, 4, "test.rm"); + + TEST_CHECK(tp.threads().size() == 3); + + int rv = tp.remove_thread(); + + TEST_CHECK(rv == 0); + TEST_CHECK(tp.threads().size() == 2); +} + +void +test_tp_remove_thread_refuses_last() +{ + ThreadPool tp(1, 4, "test.rmlast"); + + TEST_CHECK(tp.threads().size() == 1); + + int rv = tp.remove_thread(); + + TEST_CHECK(rv == -EINVAL); + TEST_CHECK(tp.threads().size() == 1); +} + +void +test_tp_set_threads_grow() +{ + ThreadPool tp(2, 8, "test.grow"); + + TEST_CHECK(tp.threads().size() == 2); + + int rv = tp.set_threads(4); + + TEST_CHECK(rv == 0); + TEST_CHECK(tp.threads().size() == 4); +} + +void +test_tp_set_threads_shrink() +{ + ThreadPool tp(4, 8, "test.shrink"); + + TEST_CHECK(tp.threads().size() == 4); + + int rv = tp.set_threads(2); + + TEST_CHECK(rv == 0); + TEST_CHECK(tp.threads().size() == 2); +} + +void +test_tp_set_threads_zero() +{ + ThreadPool tp(2, 4, "test.sz"); + + int rv = tp.set_threads(0); + + TEST_CHECK(rv == -EINVAL); + TEST_CHECK(tp.threads().size() == 2); +} + +void +test_tp_set_threads_same() +{ + ThreadPool tp(3, 4, "test.same"); + + int rv = tp.set_threads(3); + + TEST_CHECK(rv == 0); + TEST_CHECK(tp.threads().size() == 3); +} + +void +test_tp_work_after_add_thread() +{ + ThreadPool tp(1, 4, "test.waat"); + + tp.add_thread(); + tp.add_thread(); + + std::atomic counter{0}; + const int N = 20; + + for(int i = 0; i < N; ++i) + tp.enqueue_work([&counter](){ counter.fetch_add(1); }); + + for(int i = 0; i < 2000 && counter.load() < N; ++i) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + TEST_CHECK(counter.load() == N); +} + +void +test_tp_work_after_remove_thread() +{ + ThreadPool tp(3, 4, "test.wart"); + + tp.remove_thread(); + + std::atomic counter{0}; + const int N = 10; + + for(int i = 0; i < N; ++i) + tp.enqueue_work([&counter](){ counter.fetch_add(1); }); + + for(int i = 0; i < 2000 && counter.load() < N; ++i) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + TEST_CHECK(counter.load() == N); +} + +void +test_tp_worker_exception_no_crash() +{ + ThreadPool tp(2, 4, "test.exc"); + + // Enqueue work that throws -- pool should not crash + tp.enqueue_work([](){ + throw std::runtime_error("deliberate error"); + }); + + // Enqueue normal work after the exception + std::atomic ran{false}; + tp.enqueue_work([&ran](){ ran.store(true); }); + + for(int i = 0; i < 1000 && !ran.load(); ++i) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + TEST_CHECK(ran.load()); +} + +void +test_tp_concurrent_enqueue() +{ + ThreadPool tp(4, 64, "test.conc"); + + std::atomic counter{0}; + const int PRODUCERS = 4; + const int ITEMS_PER = 50; + const int TOTAL = PRODUCERS * ITEMS_PER; + + std::vector producers; + for(int p = 0; p < PRODUCERS; ++p) + { + producers.emplace_back([&tp, &counter]() + { + auto ptok = tp.ptoken(); + for(int i = 0; i < ITEMS_PER; ++i) + tp.enqueue_work(ptok, [&counter](){ counter.fetch_add(1); }); + }); + } + + for(auto &t : producers) + t.join(); + + for(int i = 0; i < 5000 && counter.load() < TOTAL; ++i) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + TEST_CHECK(counter.load() == TOTAL); +} + +void +test_tp_enqueue_task_multiple() +{ + ThreadPool tp(4, 16, "test.etm"); + + std::vector> futures; + const int N = 20; + + for(int i = 0; i < N; ++i) + futures.push_back(tp.enqueue_task([i]() -> int { return i * i; })); + + for(int i = 0; i < N; ++i) + TEST_CHECK(futures[i].get() == i * i); +} + +void +test_tp_ptoken_creation() +{ + ThreadPool tp(2, 4, "test.ptok"); + + // Multiple ptokens should be independently usable + auto ptok1 = tp.ptoken(); + auto ptok2 = tp.ptoken(); + + std::atomic counter{0}; + + tp.enqueue_work(ptok1, [&counter](){ counter.fetch_add(1); }); + tp.enqueue_work(ptok2, [&counter](){ counter.fetch_add(1); }); + + for(int i = 0; i < 1000 && counter.load() < 2; ++i) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + TEST_CHECK(counter.load() == 2); +} + +void +test_tp_destruction_drains_queue() +{ + std::atomic counter{0}; + + { + ThreadPool tp(2, 64, "test.drain"); + + for(int i = 0; i < 50; ++i) + tp.enqueue_work([&counter](){ + counter.fetch_add(1); + }); + + // destructor runs here -- should drain all queued work + } + + TEST_CHECK(counter.load() == 50); +} + +void +test_tp_move_only_callable() +{ + ThreadPool tp(2, 4, "test.moc"); + + auto ptr = std::make_unique(99); + + auto future = tp.enqueue_task([p = std::move(ptr)]() -> int + { + return *p; + }); + + TEST_CHECK(future.get() == 99); +} + +void +test_tp_work_ordering_single_thread() +{ + // With a single thread, work should execute in FIFO order + ThreadPool tp(1, 16, "test.order"); + + std::vector results; + std::mutex mtx; + const int N = 10; + + for(int i = 0; i < N; ++i) + tp.enqueue_work([i, &results, &mtx](){ + std::lock_guard lk(mtx); + results.push_back(i); + }); + + // Wait for all work to complete + for(int i = 0; i < 2000; ++i) + { + { + std::lock_guard lk(mtx); + if((int)results.size() == N) + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + std::lock_guard lk(mtx); + TEST_CHECK((int)results.size() == N); + for(int i = 0; i < N; ++i) + TEST_CHECK(results[i] == i); +} + TEST_LIST = { {"nop",test_nop}, @@ -499,5 +1016,36 @@ TEST_LIST = {"config_xattr",test_config_xattr}, {"config",test_config}, {"str",test_str_stuff}, + {"tp_construct_default",test_tp_construct_default}, + {"tp_construct_named",test_tp_construct_named}, + {"tp_construct_zero_threads_throws",test_tp_construct_zero_threads_throws}, + {"tp_enqueue_work",test_tp_enqueue_work}, + {"tp_enqueue_work_with_ptoken",test_tp_enqueue_work_with_ptoken}, + {"tp_try_enqueue_work",test_tp_try_enqueue_work}, + {"tp_try_enqueue_work_with_ptoken",test_tp_try_enqueue_work_with_ptoken}, + {"tp_try_enqueue_work_for",test_tp_try_enqueue_work_for}, + {"tp_try_enqueue_work_for_with_ptoken",test_tp_try_enqueue_work_for_with_ptoken}, + {"tp_enqueue_task_int",test_tp_enqueue_task_int}, + {"tp_enqueue_task_void",test_tp_enqueue_task_void}, + {"tp_enqueue_task_string",test_tp_enqueue_task_string}, + {"tp_enqueue_task_exception",test_tp_enqueue_task_exception}, + {"tp_enqueue_task_multiple",test_tp_enqueue_task_multiple}, + {"tp_threads_returns_correct_count",test_tp_threads_returns_correct_count}, + {"tp_threads_unique_ids",test_tp_threads_unique_ids}, + {"tp_add_thread",test_tp_add_thread}, + {"tp_remove_thread",test_tp_remove_thread}, + {"tp_remove_thread_refuses_last",test_tp_remove_thread_refuses_last}, + {"tp_set_threads_grow",test_tp_set_threads_grow}, + {"tp_set_threads_shrink",test_tp_set_threads_shrink}, + {"tp_set_threads_zero",test_tp_set_threads_zero}, + {"tp_set_threads_same",test_tp_set_threads_same}, + {"tp_work_after_add_thread",test_tp_work_after_add_thread}, + {"tp_work_after_remove_thread",test_tp_work_after_remove_thread}, + {"tp_worker_exception_no_crash",test_tp_worker_exception_no_crash}, + {"tp_concurrent_enqueue",test_tp_concurrent_enqueue}, + {"tp_ptoken_creation",test_tp_ptoken_creation}, + {"tp_destruction_drains_queue",test_tp_destruction_drains_queue}, + {"tp_move_only_callable",test_tp_move_only_callable}, + {"tp_work_ordering_single_thread",test_tp_work_ordering_single_thread}, {NULL,NULL} }; diff --git a/vendored/libfuse/include/bounded_queue.hpp b/vendored/libfuse/include/bounded_queue.hpp new file mode 100644 index 00000000..f2b68459 --- /dev/null +++ b/vendored/libfuse/include/bounded_queue.hpp @@ -0,0 +1,153 @@ +/* + ISC License + + Copyright (c) 2026, Antonio SJ Musumeci + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#pragma once + +#include "moodycamel/blockingconcurrentqueue.h" +#include "moodycamel/lightweightsemaphore.h" + +#include + + +// Wraps an unbounded BlockingConcurrentQueue with a semaphore to enforce +// a maximum queue depth. The semaphore tracks available slots: +// - Producers wait (decrement) before enqueuing, blocking when full. +// - Consumers signal (increment) after dequeuing, freeing a slot. +// This co-locates the invariant so the pairing cannot drift apart. +template +class BoundedQueue +{ +public: + using PToken = moodycamel::ProducerToken; + using CToken = moodycamel::ConsumerToken; + +private: + using Queue = moodycamel::BlockingConcurrentQueue; + + Queue _queue; + moodycamel::LightweightSemaphore _slots; + +public: + explicit + BoundedQueue(std::size_t max_depth_) + : _queue(), + _slots(max_depth_) + { + } + + BoundedQueue(const BoundedQueue&) = delete; + BoundedQueue& operator=(const BoundedQueue&) = delete; + BoundedQueue(BoundedQueue&&) = delete; + BoundedQueue& operator=(BoundedQueue&&) = delete; + + // -- Blocking enqueue (waits indefinitely for a slot) ------------------- + + template + void + enqueue(U &&item_) + { + _slots.wait(); + _queue.enqueue(std::forward(item_)); + } + + template + void + enqueue(PToken &ptok_, U &&item_) + { + _slots.wait(); + _queue.enqueue(ptok_, std::forward(item_)); + } + + // -- Non-blocking enqueue (fails immediately if full) ------------------- + + template + bool + try_enqueue(U &&item_) + { + if(!_slots.tryWait()) + return false; + _queue.enqueue(std::forward(item_)); + return true; + } + + template + bool + try_enqueue(PToken &ptok_, U &&item_) + { + if(!_slots.tryWait()) + return false; + _queue.enqueue(ptok_, std::forward(item_)); + return true; + } + + // -- Timed enqueue (waits up to timeout_usecs_ for a slot) -------------- + + template + bool + try_enqueue_for(std::int64_t timeout_usecs_, U &&item_) + { + if(!_slots.wait(timeout_usecs_)) + return false; + _queue.enqueue(std::forward(item_)); + return true; + } + + template + bool + try_enqueue_for(PToken &ptok_, std::int64_t timeout_usecs_, U &&item_) + { + if(!_slots.wait(timeout_usecs_)) + return false; + _queue.enqueue(ptok_, std::forward(item_)); + return true; + } + + // -- Unbounded enqueue (bypasses backpressure) -------------------------- + // Used for internal control messages (e.g. thread exit signals) that + // must not be blocked by a full queue. + + template + void + enqueue_unbounded(U &&item_) + { + _queue.enqueue(std::forward(item_)); + } + + // -- Blocking dequeue (signals a slot after consuming) ------------------ + + void + wait_dequeue(CToken &ctok_, T &item_) + { + _queue.wait_dequeue(ctok_, item_); + _slots.signal(); + } + + // -- Token creation ----------------------------------------------------- + + PToken + make_ptoken() + { + return PToken(_queue); + } + + CToken + make_ctoken() + { + return CToken(_queue); + } +}; diff --git a/vendored/libfuse/include/mutex.hpp b/vendored/libfuse/include/mutex.hpp index be1d4d6a..119b4ce5 100644 --- a/vendored/libfuse/include/mutex.hpp +++ b/vendored/libfuse/include/mutex.hpp @@ -6,6 +6,8 @@ #include "mutex_ndebug.hpp" #endif +#include "scope_guard.hpp" + typedef pthread_mutex_t mutex_t; // Simple mutex_t wrapper to provide RAII @@ -50,3 +52,10 @@ public: mutex_unlock(_mutex); } }; + +// Single-line mutex guard using scope_guard +// Expands to mutex_lock() followed by DEFER mutex_unlock() +// Both lock and unlock capture __FILE__/__func__/__LINE__ from the call site +#define mutex_lockguard(m) \ + mutex_lock(m); \ + DEFER { mutex_unlock(m); } diff --git a/vendored/libfuse/include/thread_pool.hpp b/vendored/libfuse/include/thread_pool.hpp index 86d3c7a3..5ceaa717 100644 --- a/vendored/libfuse/include/thread_pool.hpp +++ b/vendored/libfuse/include/thread_pool.hpp @@ -1,41 +1,50 @@ +/* + ISC License + + Copyright (c) 2026, Antonio SJ Musumeci + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + #pragma once -#include "moodycamel/blockingconcurrentqueue.h" -#include "moodycamel/lightweightsemaphore.h" +#include "bounded_queue.hpp" #include "invocable.h" #include "mutex.hpp" #include #include +#include #include +#include #include #include #include +#include #include #include #include -#include #include +#include #include #include -#define SEM -//#undef SEM - -#ifdef SEM -#define SEMA_WAIT(S) (S.wait()) -#define SEMA_SIGNAL(S) (S.signal()) -#else -#define SEMA_WAIT(S) -#define SEMA_SIGNAL(S) -#endif - struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits { - static const int MAX_SEMA_SPINS = 1; + static const int MAX_SEMA_SPINS = 10; }; @@ -47,25 +56,29 @@ public: private: using Func = ofats::any_invocable; - using Queue = moodycamel::BlockingConcurrentQueue; + using Queue = BoundedQueue; + struct ThreadExitSignal {}; public: explicit - ThreadPool(const unsigned thread_count_ = std::thread::hardware_concurrency(), - const unsigned max_queue_depth_ = std::thread::hardware_concurrency(), - std::string const name_ = {}); + ThreadPool(const unsigned thread_count_ = std::thread::hardware_concurrency(), + const unsigned max_queue_depth_ = std::thread::hardware_concurrency() * 2, + const std::string &name_ = {}); ~ThreadPool(); + ThreadPool(const ThreadPool&) = delete; + ThreadPool& operator=(const ThreadPool&) = delete; + ThreadPool(ThreadPool&&) = delete; + ThreadPool& operator=(ThreadPool&&) = delete; + private: static void *start_routine(void *arg_); public: - int add_thread(std::string const name = {}); - int remove_thread(void); + int add_thread(const std::string &name = {}); + int remove_thread(); int set_threads(const std::size_t count); - void shutdown(void); - public: template void @@ -76,6 +89,26 @@ public: void enqueue_work(FuncType &&func_); + template + bool + try_enqueue_work(ThreadPool::PToken &ptok_, + FuncType &&func_); + + template + bool + try_enqueue_work(FuncType &&func_); + + template + bool + try_enqueue_work_for(ThreadPool::PToken &ptok_, + std::int64_t timeout_usecs_, + FuncType &&func_); + + template + bool + try_enqueue_work_for(std::int64_t timeout_usecs_, + FuncType &&func_); + template [[nodiscard]] std::future> @@ -87,7 +120,6 @@ public: private: Queue _queue; - moodycamel::LightweightSemaphore _sema; private: std::string const _name; @@ -98,11 +130,10 @@ private: inline -ThreadPool::ThreadPool(const unsigned thread_count_, - const unsigned max_queue_depth_, - const std::string name_) - : _queue(), - _sema(max_queue_depth_), +ThreadPool::ThreadPool(const unsigned thread_count_, + const unsigned max_queue_depth_, + const std::string &name_) + : _queue(max_queue_depth_), _name(name_) { sigset_t oldset; @@ -128,8 +159,11 @@ ThreadPool::ThreadPool(const unsigned thread_count_, continue; } - if(!_name.empty()) - pthread_setname_np(t,_name.c_str()); + if(not _name.empty()) + { + std::string name = _name.substr(0, 15); + pthread_setname_np(t, name.c_str()); + } _threads.push_back(t); } @@ -150,18 +184,25 @@ ThreadPool::ThreadPool(const unsigned thread_count_, inline ThreadPool::~ThreadPool() { + std::vector threads; + + { + mutex_lockguard(_threads_mutex); + threads = _threads; + } + syslog(LOG_DEBUG, "threadpool (%s): destroying %zu threads", _name.c_str(), - _threads.size()); + threads.size()); - for(auto t : _threads) - pthread_cancel(t); - for(auto t : _threads) + for(auto _ : threads) + _queue.enqueue_unbounded([](){ throw ThreadExitSignal{}; }); + for(auto t : threads) pthread_join(t,NULL); } - +// Threads purposefully do not restore default signal handling. inline void* ThreadPool::start_routine(void *arg_) @@ -171,8 +212,7 @@ ThreadPool::start_routine(void *arg_) bool done; ThreadPool::Func func; ThreadPool::Queue &q = btp->_queue; - moodycamel::LightweightSemaphore &sema = btp->_sema; - ThreadPool::CToken ctok(btp->_queue); + ThreadPool::CToken ctok(btp->_queue.make_ctoken()); done = false; while(!done) @@ -185,12 +225,26 @@ ThreadPool::start_routine(void *arg_) { func(); } - catch(std::exception &e) + catch(const ThreadExitSignal&) { done = true; } + catch(std::exception &e) + { + syslog(LOG_CRIT, + "threadpool (%s): uncaught exception caught by worker - %s", + btp->_name.c_str(), + e.what()); + } + catch(...) + { + syslog(LOG_CRIT, + "threadpool (%s): uncaught non-standard exception caught by worker", + btp->_name.c_str()); + } - SEMA_SIGNAL(sema); + // force destruction to release resources + func = {}; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL); } @@ -198,11 +252,9 @@ ThreadPool::start_routine(void *arg_) return nullptr; } - - inline int -ThreadPool::add_thread(const std::string name_) +ThreadPool::add_thread(const std::string &name_) { int rv; pthread_t t; @@ -227,117 +279,154 @@ ThreadPool::add_thread(const std::string name_) return -rv; } - if(!name.empty()) - pthread_setname_np(t,name.c_str()); + if(not name.empty()) + { + std::string n = name.substr(0, 15); + pthread_setname_np(t,n.c_str()); + } { - LockGuard lg(_threads_mutex); - + mutex_lockguard(_threads_mutex); _threads.push_back(t); } return 0; } - inline int -ThreadPool::remove_thread(void) +ThreadPool::remove_thread() { { - LockGuard lg(_threads_mutex); - + mutex_lockguard(_threads_mutex); if(_threads.size() <= 1) return -EINVAL; } - std::promise promise; + auto promise = std::make_shared>>(); + auto func = - [this,&promise]() + [this,promise]() { pthread_t t; t = pthread_self(); - promise.set_value(t); { - LockGuard lg(_threads_mutex); + mutex_lockguard(_threads_mutex); - for(auto i = _threads.begin(); i != _threads.end(); ++i) + if(_threads.size() <= 1) { - if(*i != t) - continue; - - _threads.erase(i); - break; + promise->set_value(std::nullopt); + return; } + + auto it = std::find(_threads.begin(), + _threads.end(), + t); + if(it != _threads.end()) + _threads.erase(it); } - pthread_exit(NULL); + promise->set_value(t); + + throw ThreadExitSignal{}; }; - enqueue_work(std::move(func)); - pthread_join(promise.get_future().get(),NULL); + _queue.enqueue_unbounded(std::move(func)); - return 0; -} + auto result = promise->get_future().get(); + if(result.has_value()) + { + pthread_join(result.value(),NULL); + return 0; + } + return -EINVAL; +} inline int -ThreadPool::set_threads(std::size_t const count_) +ThreadPool::set_threads(const std::size_t count_) { - int diff; + if(count_ == 0) + return -EINVAL; - { - LockGuard lg(_threads_mutex); + std::size_t current; - diff = ((int)count_ - (int)_threads.size()); + { + mutex_lockguard(_threads_mutex); + current = _threads.size(); } - for(auto i = diff; i > 0; --i) + for(std::size_t i = current; i < count_; ++i) add_thread(); - for(auto i = diff; i < 0; ++i) + for(std::size_t i = count_; i < current; ++i) remove_thread(); - return diff; + return 0; } +template +inline +void +ThreadPool::enqueue_work(ThreadPool::PToken &ptok_, + FuncType &&func_) +{ + _queue.enqueue(ptok_, + std::forward(func_)); +} +template inline void -ThreadPool::shutdown(void) +ThreadPool::enqueue_work(FuncType &&func_) { - LockGuard lg(_threads_mutex); + _queue.enqueue(std::forward(func_)); +} - for(pthread_t tid : _threads) - pthread_cancel(tid); - for(pthread_t tid : _threads) - pthread_join(tid,NULL); - _threads.clear(); +template +inline +bool +ThreadPool::try_enqueue_work(ThreadPool::PToken &ptok_, + FuncType &&func_) +{ + return _queue.try_enqueue(ptok_, + std::forward(func_)); } template inline -void -ThreadPool::enqueue_work(ThreadPool::PToken &ptok_, - FuncType &&func_) +bool +ThreadPool::try_enqueue_work(FuncType &&func_) { - SEMA_WAIT(_sema); - _queue.enqueue(ptok_, - std::forward(func_)); + return _queue.try_enqueue(std::forward(func_)); } template inline -void -ThreadPool::enqueue_work(FuncType &&func_) +bool +ThreadPool::try_enqueue_work_for(ThreadPool::PToken &ptok_, + std::int64_t timeout_usecs_, + FuncType &&func_) { - SEMA_WAIT(_sema); - _queue.enqueue(std::forward(func_)); + return _queue.try_enqueue_for(ptok_, + timeout_usecs_, + std::forward(func_)); +} + + +template +inline +bool +ThreadPool::try_enqueue_work_for(std::int64_t timeout_usecs_, + FuncType &&func_) +{ + return _queue.try_enqueue_for(timeout_usecs_, + std::forward(func_)); } @@ -359,8 +448,15 @@ ThreadPool::enqueue_task(FuncType&& func_) { try { - auto rv = func_(); - promise_.set_value(std::move(rv)); + if constexpr (std::is_void_v) + { + func_(); + promise_.set_value(); + } + else + { + promise_.set_value(func_()); + } } catch(...) { @@ -368,7 +464,6 @@ ThreadPool::enqueue_task(FuncType&& func_) } }; - SEMA_WAIT(_sema); _queue.enqueue(std::move(work)); return future; @@ -379,7 +474,7 @@ inline std::vector ThreadPool::threads() const { - LockGuard lg(_threads_mutex); + mutex_lockguard(_threads_mutex); return _threads; } @@ -389,5 +484,5 @@ inline ThreadPool::PToken ThreadPool::ptoken() { - return ThreadPool::PToken(_queue); + return _queue.make_ptoken(); }