Browse Source

Improve thread pool

fixes
Antonio SJ Musumeci 3 days ago
parent
commit
f8c306de30
  1. 536
      tests/tests.cpp
  2. 2
      vendored/libfuse/include/thread_pool.hpp

536
tests/tests.cpp

@ -6,9 +6,27 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <future>
#include <mutex>
#include <numeric> #include <numeric>
#include <thread> #include <thread>
template<typename Predicate>
bool
wait_until(Predicate pred_,
const int timeout_ms_ = 2000)
{
for(int i = 0; i < timeout_ms_; ++i)
{
if(pred_())
return true;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
return pred_();
}
void void
test_nop() test_nop()
{ {
@ -998,6 +1016,511 @@ test_tp_work_ordering_single_thread()
TEST_CHECK(results[i] == i); TEST_CHECK(results[i] == i);
} }
void
test_tp_try_enqueue_work_fails_when_full()
{
ThreadPool tp(1, 1, "test.tewfull");
std::atomic<bool> release{false};
std::atomic<bool> worker_started{false};
std::atomic<int> ran{0};
tp.enqueue_work([&release, &worker_started, &ran]()
{
worker_started.store(true);
while(!release.load())
std::this_thread::sleep_for(std::chrono::milliseconds(1));
ran.fetch_add(1);
});
TEST_CHECK(wait_until([&worker_started](){ return worker_started.load(); }));
tp.enqueue_work([&ran](){ ran.fetch_add(1); });
bool ok = tp.try_enqueue_work([&ran](){ ran.fetch_add(1); });
TEST_CHECK(!ok);
release.store(true);
TEST_CHECK(wait_until([&ran](){ return ran.load() == 2; }));
TEST_CHECK(ran.load() == 2);
}
void
test_tp_try_enqueue_work_for_times_out_when_full()
{
ThreadPool tp(1, 1, "test.tewftimeout");
std::atomic<bool> release{false};
std::atomic<bool> worker_started{false};
std::atomic<int> ran{0};
tp.enqueue_work([&release, &worker_started, &ran]()
{
worker_started.store(true);
while(!release.load())
std::this_thread::sleep_for(std::chrono::milliseconds(1));
ran.fetch_add(1);
});
TEST_CHECK(wait_until([&worker_started](){ return worker_started.load(); }));
tp.enqueue_work([&ran](){ ran.fetch_add(1); });
auto start = std::chrono::steady_clock::now();
bool ok = tp.try_enqueue_work_for(20000, [&ran](){ ran.fetch_add(1); });
auto end = std::chrono::steady_clock::now();
TEST_CHECK(!ok);
auto elapsed_us =
std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
TEST_CHECK(elapsed_us >= 10000);
release.store(true);
TEST_CHECK(wait_until([&ran](){ return ran.load() == 2; }));
TEST_CHECK(ran.load() == 2);
}
void
test_tp_enqueue_work_blocks_until_slot_available()
{
ThreadPool tp(1, 1, "test.ewblocks");
std::atomic<bool> release{false};
std::atomic<bool> worker_started{false};
std::atomic<bool> producer_returned{false};
std::atomic<int> ran{0};
tp.enqueue_work([&release, &worker_started, &ran]()
{
worker_started.store(true);
while(!release.load())
std::this_thread::sleep_for(std::chrono::milliseconds(1));
ran.fetch_add(1);
});
TEST_CHECK(wait_until([&worker_started](){ return worker_started.load(); }));
tp.enqueue_work([&ran](){ ran.fetch_add(1); });
std::thread producer([&tp, &ran, &producer_returned]()
{
tp.enqueue_work([&ran](){ ran.fetch_add(1); });
producer_returned.store(true);
});
std::this_thread::sleep_for(std::chrono::milliseconds(20));
TEST_CHECK(!producer_returned.load());
release.store(true);
TEST_CHECK(wait_until([&producer_returned](){ return producer_returned.load(); }));
producer.join();
TEST_CHECK(wait_until([&ran](){ return ran.load() == 3; }));
TEST_CHECK(ran.load() == 3);
}
void
test_tp_worker_nonstd_exception_no_crash()
{
ThreadPool tp(2, 4, "test.nsexc");
tp.enqueue_work([](){ throw 7; });
std::atomic<bool> ran{false};
tp.enqueue_work([&ran](){ ran.store(true); });
TEST_CHECK(wait_until([&ran](){ return ran.load(); }));
TEST_CHECK(ran.load());
}
void
test_tp_remove_thread_under_load()
{
ThreadPool tp(4, 64, "test.rmuload");
std::atomic<int> counter{0};
const int N = 300;
for(int i = 0; i < N; ++i)
tp.enqueue_work([&counter]()
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
counter.fetch_add(1);
});
int rv1 = -1;
int rv2 = -1;
std::thread remover([&tp, &rv1, &rv2]()
{
rv1 = tp.remove_thread();
rv2 = tp.remove_thread();
});
remover.join();
TEST_CHECK(rv1 == 0);
TEST_CHECK(rv2 == 0);
TEST_CHECK(tp.threads().size() == 2);
TEST_CHECK(wait_until([&counter, N](){ return counter.load() == N; }, 10000));
TEST_CHECK(counter.load() == N);
std::atomic<bool> post_ok{false};
tp.enqueue_work([&post_ok](){ post_ok.store(true); });
TEST_CHECK(wait_until([&post_ok](){ return post_ok.load(); }));
}
void
test_tp_set_threads_concurrent_with_enqueue()
{
ThreadPool tp(3, 128, "test.stcwe");
const int PRODUCERS = 4;
const int PER_PRODUCER = 150;
const int TOTAL = PRODUCERS * PER_PRODUCER;
std::atomic<int> counter{0};
std::atomic<bool> producers_done{false};
std::atomic<int> resize_calls{0};
std::thread resizer([&tp, &producers_done, &resize_calls]()
{
int i = 0;
while(!producers_done.load())
{
std::size_t count = ((i % 2) == 0 ? 2 : 6);
int rv = tp.set_threads(count);
if(rv == 0)
resize_calls.fetch_add(1);
++i;
}
});
std::vector<std::thread> producers;
for(int p = 0; p < PRODUCERS; ++p)
{
producers.emplace_back([&tp, &counter]()
{
auto ptok = tp.ptoken();
for(int i = 0; i < PER_PRODUCER; ++i)
tp.enqueue_work(ptok, [&counter](){ counter.fetch_add(1); });
});
}
for(auto &t : producers)
t.join();
producers_done.store(true);
resizer.join();
TEST_CHECK(resize_calls.load() > 0);
TEST_CHECK(wait_until([&counter, TOTAL](){ return counter.load() == TOTAL; }, 10000));
TEST_CHECK(counter.load() == TOTAL);
}
void
test_tp_repeated_resize_stress()
{
ThreadPool tp(2, 64, "test.resize");
const int ROUNDS = 30;
const int PER_ROUND = 20;
const int TOTAL = ROUNDS * PER_ROUND;
std::atomic<int> counter{0};
for(int r = 0; r < ROUNDS; ++r)
{
std::size_t target = 1 + (r % 5);
int rv = tp.set_threads(target);
TEST_CHECK(rv == 0);
for(int i = 0; i < PER_ROUND; ++i)
tp.enqueue_work([&counter](){ counter.fetch_add(1); });
}
TEST_CHECK(wait_until([&counter, TOTAL](){ return counter.load() == TOTAL; }, 10000));
TEST_CHECK(counter.load() == TOTAL);
}
void
test_tp_many_producers_many_tasks_stress()
{
ThreadPool tp(8, 256, "test.mpmt");
const int PRODUCERS = 8;
const int ITEMS_PER = 200;
const int TOTAL = PRODUCERS * ITEMS_PER;
std::atomic<int> counter{0};
std::vector<std::thread> producers;
for(int p = 0; p < PRODUCERS; ++p)
{
producers.emplace_back([&tp, &counter]()
{
auto ptok = tp.ptoken();
for(int i = 0; i < ITEMS_PER; ++i)
tp.enqueue_work(ptok, [&counter](){ counter.fetch_add(1); });
});
}
for(auto &t : producers)
t.join();
TEST_CHECK(wait_until([&counter, TOTAL](){ return counter.load() == TOTAL; }, 10000));
TEST_CHECK(counter.load() == TOTAL);
}
void
test_tp_heavy_mixed_resize_and_enqueue_stress()
{
ThreadPool tp(6, 512, "test.hmix");
const int PRODUCERS = 10;
const int PER_PRODUCER = 2500;
const int TOTAL = PRODUCERS * PER_PRODUCER;
std::atomic<int> executed{0};
std::atomic<bool> producers_done{false};
std::atomic<int> resize_ops{0};
std::thread resizer([&tp, &producers_done, &resize_ops]()
{
int i = 0;
while(!producers_done.load())
{
const std::size_t target = 2 + (i % 10);
int rv = tp.set_threads(target);
if(rv == 0)
resize_ops.fetch_add(1);
++i;
}
});
std::vector<std::thread> producers;
producers.reserve(PRODUCERS);
for(int p = 0; p < PRODUCERS; ++p)
{
producers.emplace_back([&tp, &executed]()
{
auto ptok = tp.ptoken();
for(int i = 0; i < PER_PRODUCER; ++i)
tp.enqueue_work(ptok, [&executed](){ executed.fetch_add(1); });
});
}
for(auto &t : producers)
t.join();
producers_done.store(true);
resizer.join();
TEST_CHECK(resize_ops.load() > 0);
TEST_CHECK(wait_until([&executed, TOTAL](){ return executed.load() == TOTAL; }, 30000));
TEST_CHECK(executed.load() == TOTAL);
}
void
test_tp_heavy_try_enqueue_pressure()
{
ThreadPool tp(1, 1, "test.hpress");
const int PRODUCERS = 6;
const int ATTEMPTS_PER = 2500;
std::atomic<int> accepted{0};
std::atomic<int> rejected{0};
std::atomic<int> executed{0};
std::vector<std::thread> producers;
producers.reserve(PRODUCERS);
for(int p = 0; p < PRODUCERS; ++p)
{
producers.emplace_back([&tp, &accepted, &rejected, &executed]()
{
auto ptok = tp.ptoken();
for(int i = 0; i < ATTEMPTS_PER; ++i)
{
bool ok = tp.try_enqueue_work_for(ptok,
200,
[&executed]()
{
std::this_thread::sleep_for(std::chrono::microseconds(50));
executed.fetch_add(1);
});
if(ok)
accepted.fetch_add(1);
else
rejected.fetch_add(1);
}
});
}
for(auto &t : producers)
t.join();
TEST_CHECK(accepted.load() > 0);
TEST_CHECK(rejected.load() > 0);
TEST_CHECK(wait_until([&executed, &accepted]()
{
return executed.load() == accepted.load();
},
30000));
TEST_CHECK(executed.load() == accepted.load());
}
void
test_tp_heavy_repeated_construct_destroy()
{
const int ROUNDS = 150;
for(int r = 0; r < ROUNDS; ++r)
{
std::atomic<int> counter{0};
{
ThreadPool tp(4, 128, "test.hctor");
for(int i = 0; i < 400; ++i)
tp.enqueue_work([&counter]()
{
std::this_thread::sleep_for(std::chrono::microseconds(25));
counter.fetch_add(1);
});
}
TEST_CHECK(counter.load() == 400);
}
}
void
test_tp_heavy_enqueue_task_mixed_outcomes()
{
ThreadPool tp(8, 512, "test.htask");
const int N = 12000;
std::vector<std::future<int>> futures;
futures.reserve(N);
for(int i = 0; i < N; ++i)
{
futures.emplace_back(tp.enqueue_task([i]() -> int
{
if((i % 11) == 0)
throw std::runtime_error("heavy task error");
return i;
}));
}
long long expected_sum = 0;
int expected_success = 0;
int expected_errors = 0;
for(int i = 0; i < N; ++i)
{
if((i % 11) == 0)
{
expected_errors += 1;
}
else
{
expected_success += 1;
expected_sum += i;
}
}
int actual_success = 0;
int actual_errors = 0;
long long actual_sum = 0;
for(auto &f : futures)
{
try
{
actual_sum += f.get();
actual_success += 1;
}
catch(const std::runtime_error &e)
{
actual_errors += 1;
TEST_CHECK(std::string(e.what()) == "heavy task error");
}
}
TEST_CHECK(actual_success == expected_success);
TEST_CHECK(actual_errors == expected_errors);
TEST_CHECK(actual_sum == expected_sum);
}
void
test_tp_heavy_add_remove_churn_under_enqueue()
{
ThreadPool tp(6, 512, "test.hchurn");
const int PRODUCERS = 6;
const int PER_PRODUCER = 2200;
const int TOTAL = PRODUCERS * PER_PRODUCER;
const int CHURN_OPS = 700;
std::atomic<int> executed{0};
std::atomic<bool> producers_done{false};
std::atomic<int> add_ok{0};
std::atomic<int> remove_ok{0};
std::thread churner([&tp, &producers_done, &add_ok, &remove_ok]()
{
int i = 0;
while(!producers_done.load() && i < CHURN_OPS)
{
int rv_add = tp.add_thread();
if(rv_add == 0)
add_ok.fetch_add(1);
int rv_remove = tp.remove_thread();
if(rv_remove == 0)
remove_ok.fetch_add(1);
++i;
}
});
std::vector<std::thread> producers;
producers.reserve(PRODUCERS);
for(int p = 0; p < PRODUCERS; ++p)
{
producers.emplace_back([&tp, &executed]()
{
auto ptok = tp.ptoken();
for(int i = 0; i < PER_PRODUCER; ++i)
tp.enqueue_work(ptok,
[&executed]()
{
executed.fetch_add(1);
});
});
}
for(auto &t : producers)
t.join();
producers_done.store(true);
churner.join();
TEST_CHECK(add_ok.load() > 0);
TEST_CHECK(remove_ok.load() > 0);
TEST_CHECK(wait_until([&executed, TOTAL](){ return executed.load() == TOTAL; }, 30000));
TEST_CHECK(executed.load() == TOTAL);
}
TEST_LIST = TEST_LIST =
{ {
{"nop",test_nop}, {"nop",test_nop},
@ -1047,5 +1570,18 @@ TEST_LIST =
{"tp_destruction_drains_queue",test_tp_destruction_drains_queue}, {"tp_destruction_drains_queue",test_tp_destruction_drains_queue},
{"tp_move_only_callable",test_tp_move_only_callable}, {"tp_move_only_callable",test_tp_move_only_callable},
{"tp_work_ordering_single_thread",test_tp_work_ordering_single_thread}, {"tp_work_ordering_single_thread",test_tp_work_ordering_single_thread},
{"tp_try_enqueue_work_fails_when_full",test_tp_try_enqueue_work_fails_when_full},
{"tp_try_enqueue_work_for_times_out_when_full",test_tp_try_enqueue_work_for_times_out_when_full},
{"tp_enqueue_work_blocks_until_slot_available",test_tp_enqueue_work_blocks_until_slot_available},
{"tp_worker_nonstd_exception_no_crash",test_tp_worker_nonstd_exception_no_crash},
{"tp_remove_thread_under_load",test_tp_remove_thread_under_load},
{"tp_set_threads_concurrent_with_enqueue",test_tp_set_threads_concurrent_with_enqueue},
{"tp_repeated_resize_stress",test_tp_repeated_resize_stress},
{"tp_many_producers_many_tasks_stress",test_tp_many_producers_many_tasks_stress},
{"tp_heavy_mixed_resize_and_enqueue_stress",test_tp_heavy_mixed_resize_and_enqueue_stress},
{"tp_heavy_try_enqueue_pressure",test_tp_heavy_try_enqueue_pressure},
{"tp_heavy_repeated_construct_destroy",test_tp_heavy_repeated_construct_destroy},
{"tp_heavy_enqueue_task_mixed_outcomes",test_tp_heavy_enqueue_task_mixed_outcomes},
{"tp_heavy_add_remove_churn_under_enqueue",test_tp_heavy_add_remove_churn_under_enqueue},
{NULL,NULL} {NULL,NULL}
}; };

2
vendored/libfuse/include/thread_pool.hpp

@ -229,7 +229,7 @@ ThreadPool::start_routine(void *arg_)
{ {
done = true; done = true;
} }
catch(std::exception &e)
catch(const std::exception &e)
{ {
syslog(LOG_CRIT, syslog(LOG_CRIT,
"threadpool (%s): uncaught exception caught by worker - %s", "threadpool (%s): uncaught exception caught by worker - %s",

Loading…
Cancel
Save