From f8c306de305a72dbf8a4a9ad386b4816fcbc9922 Mon Sep 17 00:00:00 2001 From: Antonio SJ Musumeci Date: Fri, 13 Mar 2026 14:15:40 -0500 Subject: [PATCH] Improve thread pool --- tests/tests.cpp | 536 +++++++++++++++++++++++ vendored/libfuse/include/thread_pool.hpp | 2 +- 2 files changed, 537 insertions(+), 1 deletion(-) diff --git a/tests/tests.cpp b/tests/tests.cpp index 6ca5b514..714da80e 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -6,9 +6,27 @@ #include #include +#include +#include #include #include +template +bool +wait_until(Predicate pred_, + const int timeout_ms_ = 2000) +{ + for(int i = 0; i < timeout_ms_; ++i) + { + if(pred_()) + return true; + + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + return pred_(); +} + void test_nop() { @@ -998,6 +1016,511 @@ test_tp_work_ordering_single_thread() TEST_CHECK(results[i] == i); } +void +test_tp_try_enqueue_work_fails_when_full() +{ + ThreadPool tp(1, 1, "test.tewfull"); + + std::atomic release{false}; + std::atomic worker_started{false}; + std::atomic ran{0}; + + tp.enqueue_work([&release, &worker_started, &ran]() + { + worker_started.store(true); + while(!release.load()) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + ran.fetch_add(1); + }); + + TEST_CHECK(wait_until([&worker_started](){ return worker_started.load(); })); + + tp.enqueue_work([&ran](){ ran.fetch_add(1); }); + + bool ok = tp.try_enqueue_work([&ran](){ ran.fetch_add(1); }); + TEST_CHECK(!ok); + + release.store(true); + + TEST_CHECK(wait_until([&ran](){ return ran.load() == 2; })); + TEST_CHECK(ran.load() == 2); +} + +void +test_tp_try_enqueue_work_for_times_out_when_full() +{ + ThreadPool tp(1, 1, "test.tewftimeout"); + + std::atomic release{false}; + std::atomic worker_started{false}; + std::atomic ran{0}; + + tp.enqueue_work([&release, &worker_started, &ran]() + { + worker_started.store(true); + while(!release.load()) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + ran.fetch_add(1); + }); + + TEST_CHECK(wait_until([&worker_started](){ return worker_started.load(); })); + + tp.enqueue_work([&ran](){ ran.fetch_add(1); }); + + auto start = std::chrono::steady_clock::now(); + bool ok = tp.try_enqueue_work_for(20000, [&ran](){ ran.fetch_add(1); }); + auto end = std::chrono::steady_clock::now(); + + TEST_CHECK(!ok); + + auto elapsed_us = + std::chrono::duration_cast(end - start).count(); + TEST_CHECK(elapsed_us >= 10000); + + release.store(true); + + TEST_CHECK(wait_until([&ran](){ return ran.load() == 2; })); + TEST_CHECK(ran.load() == 2); +} + +void +test_tp_enqueue_work_blocks_until_slot_available() +{ + ThreadPool tp(1, 1, "test.ewblocks"); + + std::atomic release{false}; + std::atomic worker_started{false}; + std::atomic producer_returned{false}; + std::atomic ran{0}; + + tp.enqueue_work([&release, &worker_started, &ran]() + { + worker_started.store(true); + while(!release.load()) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + ran.fetch_add(1); + }); + + TEST_CHECK(wait_until([&worker_started](){ return worker_started.load(); })); + + tp.enqueue_work([&ran](){ ran.fetch_add(1); }); + + std::thread producer([&tp, &ran, &producer_returned]() + { + tp.enqueue_work([&ran](){ ran.fetch_add(1); }); + producer_returned.store(true); + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + TEST_CHECK(!producer_returned.load()); + + release.store(true); + + TEST_CHECK(wait_until([&producer_returned](){ return producer_returned.load(); })); + producer.join(); + + TEST_CHECK(wait_until([&ran](){ return ran.load() == 3; })); + TEST_CHECK(ran.load() == 3); +} + +void +test_tp_worker_nonstd_exception_no_crash() +{ + ThreadPool tp(2, 4, "test.nsexc"); + + tp.enqueue_work([](){ throw 7; }); + + std::atomic ran{false}; + tp.enqueue_work([&ran](){ ran.store(true); }); + + TEST_CHECK(wait_until([&ran](){ return ran.load(); })); + TEST_CHECK(ran.load()); +} + +void +test_tp_remove_thread_under_load() +{ + ThreadPool tp(4, 64, "test.rmuload"); + + std::atomic counter{0}; + const int N = 300; + + for(int i = 0; i < N; ++i) + tp.enqueue_work([&counter]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + counter.fetch_add(1); + }); + + int rv1 = -1; + int rv2 = -1; + + std::thread remover([&tp, &rv1, &rv2]() + { + rv1 = tp.remove_thread(); + rv2 = tp.remove_thread(); + }); + + remover.join(); + + TEST_CHECK(rv1 == 0); + TEST_CHECK(rv2 == 0); + TEST_CHECK(tp.threads().size() == 2); + + TEST_CHECK(wait_until([&counter, N](){ return counter.load() == N; }, 10000)); + TEST_CHECK(counter.load() == N); + + std::atomic post_ok{false}; + tp.enqueue_work([&post_ok](){ post_ok.store(true); }); + TEST_CHECK(wait_until([&post_ok](){ return post_ok.load(); })); +} + +void +test_tp_set_threads_concurrent_with_enqueue() +{ + ThreadPool tp(3, 128, "test.stcwe"); + + const int PRODUCERS = 4; + const int PER_PRODUCER = 150; + const int TOTAL = PRODUCERS * PER_PRODUCER; + std::atomic counter{0}; + + std::atomic producers_done{false}; + std::atomic resize_calls{0}; + + std::thread resizer([&tp, &producers_done, &resize_calls]() + { + int i = 0; + while(!producers_done.load()) + { + std::size_t count = ((i % 2) == 0 ? 2 : 6); + int rv = tp.set_threads(count); + if(rv == 0) + resize_calls.fetch_add(1); + ++i; + } + }); + + std::vector producers; + for(int p = 0; p < PRODUCERS; ++p) + { + producers.emplace_back([&tp, &counter]() + { + auto ptok = tp.ptoken(); + for(int i = 0; i < PER_PRODUCER; ++i) + tp.enqueue_work(ptok, [&counter](){ counter.fetch_add(1); }); + }); + } + + for(auto &t : producers) + t.join(); + + producers_done.store(true); + resizer.join(); + + TEST_CHECK(resize_calls.load() > 0); + TEST_CHECK(wait_until([&counter, TOTAL](){ return counter.load() == TOTAL; }, 10000)); + TEST_CHECK(counter.load() == TOTAL); +} + +void +test_tp_repeated_resize_stress() +{ + ThreadPool tp(2, 64, "test.resize"); + + const int ROUNDS = 30; + const int PER_ROUND = 20; + const int TOTAL = ROUNDS * PER_ROUND; + std::atomic counter{0}; + + for(int r = 0; r < ROUNDS; ++r) + { + std::size_t target = 1 + (r % 5); + int rv = tp.set_threads(target); + TEST_CHECK(rv == 0); + + for(int i = 0; i < PER_ROUND; ++i) + tp.enqueue_work([&counter](){ counter.fetch_add(1); }); + } + + TEST_CHECK(wait_until([&counter, TOTAL](){ return counter.load() == TOTAL; }, 10000)); + TEST_CHECK(counter.load() == TOTAL); +} + +void +test_tp_many_producers_many_tasks_stress() +{ + ThreadPool tp(8, 256, "test.mpmt"); + + const int PRODUCERS = 8; + const int ITEMS_PER = 200; + const int TOTAL = PRODUCERS * ITEMS_PER; + std::atomic counter{0}; + + std::vector producers; + for(int p = 0; p < PRODUCERS; ++p) + { + producers.emplace_back([&tp, &counter]() + { + auto ptok = tp.ptoken(); + for(int i = 0; i < ITEMS_PER; ++i) + tp.enqueue_work(ptok, [&counter](){ counter.fetch_add(1); }); + }); + } + + for(auto &t : producers) + t.join(); + + TEST_CHECK(wait_until([&counter, TOTAL](){ return counter.load() == TOTAL; }, 10000)); + TEST_CHECK(counter.load() == TOTAL); +} + +void +test_tp_heavy_mixed_resize_and_enqueue_stress() +{ + ThreadPool tp(6, 512, "test.hmix"); + + const int PRODUCERS = 10; + const int PER_PRODUCER = 2500; + const int TOTAL = PRODUCERS * PER_PRODUCER; + + std::atomic executed{0}; + std::atomic producers_done{false}; + std::atomic resize_ops{0}; + + std::thread resizer([&tp, &producers_done, &resize_ops]() + { + int i = 0; + while(!producers_done.load()) + { + const std::size_t target = 2 + (i % 10); + int rv = tp.set_threads(target); + if(rv == 0) + resize_ops.fetch_add(1); + ++i; + } + }); + + std::vector producers; + producers.reserve(PRODUCERS); + for(int p = 0; p < PRODUCERS; ++p) + { + producers.emplace_back([&tp, &executed]() + { + auto ptok = tp.ptoken(); + + for(int i = 0; i < PER_PRODUCER; ++i) + tp.enqueue_work(ptok, [&executed](){ executed.fetch_add(1); }); + }); + } + + for(auto &t : producers) + t.join(); + + producers_done.store(true); + resizer.join(); + + TEST_CHECK(resize_ops.load() > 0); + TEST_CHECK(wait_until([&executed, TOTAL](){ return executed.load() == TOTAL; }, 30000)); + TEST_CHECK(executed.load() == TOTAL); +} + +void +test_tp_heavy_try_enqueue_pressure() +{ + ThreadPool tp(1, 1, "test.hpress"); + + const int PRODUCERS = 6; + const int ATTEMPTS_PER = 2500; + + std::atomic accepted{0}; + std::atomic rejected{0}; + std::atomic executed{0}; + + std::vector producers; + producers.reserve(PRODUCERS); + for(int p = 0; p < PRODUCERS; ++p) + { + producers.emplace_back([&tp, &accepted, &rejected, &executed]() + { + auto ptok = tp.ptoken(); + + for(int i = 0; i < ATTEMPTS_PER; ++i) + { + bool ok = tp.try_enqueue_work_for(ptok, + 200, + [&executed]() + { + std::this_thread::sleep_for(std::chrono::microseconds(50)); + executed.fetch_add(1); + }); + if(ok) + accepted.fetch_add(1); + else + rejected.fetch_add(1); + } + }); + } + + for(auto &t : producers) + t.join(); + + TEST_CHECK(accepted.load() > 0); + TEST_CHECK(rejected.load() > 0); + + TEST_CHECK(wait_until([&executed, &accepted]() + { + return executed.load() == accepted.load(); + }, + 30000)); + TEST_CHECK(executed.load() == accepted.load()); +} + +void +test_tp_heavy_repeated_construct_destroy() +{ + const int ROUNDS = 150; + + for(int r = 0; r < ROUNDS; ++r) + { + std::atomic counter{0}; + + { + ThreadPool tp(4, 128, "test.hctor"); + + for(int i = 0; i < 400; ++i) + tp.enqueue_work([&counter]() + { + std::this_thread::sleep_for(std::chrono::microseconds(25)); + counter.fetch_add(1); + }); + } + + TEST_CHECK(counter.load() == 400); + } +} + +void +test_tp_heavy_enqueue_task_mixed_outcomes() +{ + ThreadPool tp(8, 512, "test.htask"); + + const int N = 12000; + + std::vector> futures; + futures.reserve(N); + + for(int i = 0; i < N; ++i) + { + futures.emplace_back(tp.enqueue_task([i]() -> int + { + if((i % 11) == 0) + throw std::runtime_error("heavy task error"); + + return i; + })); + } + + long long expected_sum = 0; + int expected_success = 0; + int expected_errors = 0; + + for(int i = 0; i < N; ++i) + { + if((i % 11) == 0) + { + expected_errors += 1; + } + else + { + expected_success += 1; + expected_sum += i; + } + } + + int actual_success = 0; + int actual_errors = 0; + long long actual_sum = 0; + + for(auto &f : futures) + { + try + { + actual_sum += f.get(); + actual_success += 1; + } + catch(const std::runtime_error &e) + { + actual_errors += 1; + TEST_CHECK(std::string(e.what()) == "heavy task error"); + } + } + + TEST_CHECK(actual_success == expected_success); + TEST_CHECK(actual_errors == expected_errors); + TEST_CHECK(actual_sum == expected_sum); +} + +void +test_tp_heavy_add_remove_churn_under_enqueue() +{ + ThreadPool tp(6, 512, "test.hchurn"); + + const int PRODUCERS = 6; + const int PER_PRODUCER = 2200; + const int TOTAL = PRODUCERS * PER_PRODUCER; + const int CHURN_OPS = 700; + + std::atomic executed{0}; + std::atomic producers_done{false}; + std::atomic add_ok{0}; + std::atomic remove_ok{0}; + + std::thread churner([&tp, &producers_done, &add_ok, &remove_ok]() + { + int i = 0; + while(!producers_done.load() && i < CHURN_OPS) + { + int rv_add = tp.add_thread(); + if(rv_add == 0) + add_ok.fetch_add(1); + + int rv_remove = tp.remove_thread(); + if(rv_remove == 0) + remove_ok.fetch_add(1); + + ++i; + } + }); + + std::vector producers; + producers.reserve(PRODUCERS); + for(int p = 0; p < PRODUCERS; ++p) + { + producers.emplace_back([&tp, &executed]() + { + auto ptok = tp.ptoken(); + for(int i = 0; i < PER_PRODUCER; ++i) + tp.enqueue_work(ptok, + [&executed]() + { + executed.fetch_add(1); + }); + }); + } + + for(auto &t : producers) + t.join(); + + producers_done.store(true); + churner.join(); + + TEST_CHECK(add_ok.load() > 0); + TEST_CHECK(remove_ok.load() > 0); + TEST_CHECK(wait_until([&executed, TOTAL](){ return executed.load() == TOTAL; }, 30000)); + TEST_CHECK(executed.load() == TOTAL); +} + TEST_LIST = { {"nop",test_nop}, @@ -1047,5 +1570,18 @@ TEST_LIST = {"tp_destruction_drains_queue",test_tp_destruction_drains_queue}, {"tp_move_only_callable",test_tp_move_only_callable}, {"tp_work_ordering_single_thread",test_tp_work_ordering_single_thread}, + {"tp_try_enqueue_work_fails_when_full",test_tp_try_enqueue_work_fails_when_full}, + {"tp_try_enqueue_work_for_times_out_when_full",test_tp_try_enqueue_work_for_times_out_when_full}, + {"tp_enqueue_work_blocks_until_slot_available",test_tp_enqueue_work_blocks_until_slot_available}, + {"tp_worker_nonstd_exception_no_crash",test_tp_worker_nonstd_exception_no_crash}, + {"tp_remove_thread_under_load",test_tp_remove_thread_under_load}, + {"tp_set_threads_concurrent_with_enqueue",test_tp_set_threads_concurrent_with_enqueue}, + {"tp_repeated_resize_stress",test_tp_repeated_resize_stress}, + {"tp_many_producers_many_tasks_stress",test_tp_many_producers_many_tasks_stress}, + {"tp_heavy_mixed_resize_and_enqueue_stress",test_tp_heavy_mixed_resize_and_enqueue_stress}, + {"tp_heavy_try_enqueue_pressure",test_tp_heavy_try_enqueue_pressure}, + {"tp_heavy_repeated_construct_destroy",test_tp_heavy_repeated_construct_destroy}, + {"tp_heavy_enqueue_task_mixed_outcomes",test_tp_heavy_enqueue_task_mixed_outcomes}, + {"tp_heavy_add_remove_churn_under_enqueue",test_tp_heavy_add_remove_churn_under_enqueue}, {NULL,NULL} }; diff --git a/vendored/libfuse/include/thread_pool.hpp b/vendored/libfuse/include/thread_pool.hpp index 5ceaa717..a37cf8ae 100644 --- a/vendored/libfuse/include/thread_pool.hpp +++ b/vendored/libfuse/include/thread_pool.hpp @@ -229,7 +229,7 @@ ThreadPool::start_routine(void *arg_) { done = true; } - catch(std::exception &e) + catch(const std::exception &e) { syslog(LOG_CRIT, "threadpool (%s): uncaught exception caught by worker - %s",