You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

318 lines
6.8 KiB

  1. #pragma once
  2. #include "moodycamel/blockingconcurrentqueue.h"
  3. #include <algorithm>
  4. #include <atomic>
  5. #include <csignal>
  6. #include <cstring>
  7. #include <future>
  8. #include <memory>
  9. #include <mutex>
  10. #include <stdexcept>
  11. #include <string>
  12. #include <thread>
  13. #include <vector>
  14. #include <syslog.h>
  15. struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits
  16. {
  17. static const int MAX_SEMA_SPINS = 1;
  18. };
  19. class ThreadPool
  20. {
  21. private:
  22. using Func = std::function<void(void)>;
  23. using Queue = moodycamel::BlockingConcurrentQueue<Func,ThreadPoolTraits>;
  24. public:
  25. explicit
  26. ThreadPool(unsigned const thread_count_ = std::thread::hardware_concurrency(),
  27. unsigned const max_queue_depth_ = std::thread::hardware_concurrency(),
  28. std::string const name_ = {})
  29. : _queue(),
  30. _queue_depth(0),
  31. _max_queue_depth(std::max(thread_count_,max_queue_depth_)),
  32. _name(name_)
  33. {
  34. syslog(LOG_DEBUG,
  35. "threadpool (%s): spawning %u threads w/ max queue depth %u%s",
  36. _name.c_str(),
  37. thread_count_,
  38. _max_queue_depth,
  39. ((_max_queue_depth != max_queue_depth_) ? " (adjusted)" : ""));
  40. sigset_t oldset;
  41. sigset_t newset;
  42. sigfillset(&newset);
  43. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  44. _threads.reserve(thread_count_);
  45. for(std::size_t i = 0; i < thread_count_; ++i)
  46. {
  47. int rv;
  48. pthread_t t;
  49. rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
  50. if(rv != 0)
  51. {
  52. syslog(LOG_WARNING,
  53. "threadpool (%s): error spawning thread - %d (%s)",
  54. _name.c_str(),
  55. rv,
  56. strerror(rv));
  57. continue;
  58. }
  59. if(!_name.empty())
  60. pthread_setname_np(t,_name.c_str());
  61. _threads.push_back(t);
  62. }
  63. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  64. if(_threads.empty())
  65. throw std::runtime_error("threadpool: failed to spawn any threads");
  66. }
  67. ~ThreadPool()
  68. {
  69. syslog(LOG_DEBUG,
  70. "threadpool (%s): destroying %lu threads",
  71. _name.c_str(),
  72. _threads.size());
  73. auto func = []() { pthread_exit(NULL); };
  74. for(std::size_t i = 0; i < _threads.size(); i++)
  75. _queue.enqueue(func);
  76. for(auto t : _threads)
  77. pthread_cancel(t);
  78. for(auto t : _threads)
  79. pthread_join(t,NULL);
  80. }
  81. private:
  82. static
  83. void*
  84. start_routine(void *arg_)
  85. {
  86. ThreadPool *btp = static_cast<ThreadPool*>(arg_);
  87. ThreadPool::Func func;
  88. ThreadPool::Queue &q = btp->_queue;
  89. std::atomic<unsigned> &queue_depth = btp->_queue_depth;
  90. moodycamel::ConsumerToken ctok(btp->_queue);
  91. while(true)
  92. {
  93. q.wait_dequeue(ctok,func);
  94. func();
  95. queue_depth.fetch_sub(1,std::memory_order_release);
  96. }
  97. return NULL;
  98. }
  99. public:
  100. int
  101. add_thread(std::string const name_ = {})
  102. {
  103. int rv;
  104. pthread_t t;
  105. sigset_t oldset;
  106. sigset_t newset;
  107. std::string name;
  108. name = (name_.empty() ? _name : name_);
  109. sigfillset(&newset);
  110. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  111. rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
  112. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  113. if(rv != 0)
  114. {
  115. syslog(LOG_WARNING,
  116. "threadpool (%s): error spawning thread - %d (%s)",
  117. _name.c_str(),
  118. rv,
  119. strerror(rv));
  120. return -rv;
  121. }
  122. if(!name.empty())
  123. pthread_setname_np(t,name.c_str());
  124. {
  125. std::lock_guard<std::mutex> lg(_threads_mutex);
  126. _threads.push_back(t);
  127. }
  128. syslog(LOG_DEBUG,
  129. "threadpool (%s): 1 thread added named '%s'",
  130. _name.c_str(),
  131. name.c_str());
  132. return 0;
  133. }
  134. int
  135. remove_thread(void)
  136. {
  137. {
  138. std::lock_guard<std::mutex> lg(_threads_mutex);
  139. if(_threads.size() <= 1)
  140. return -EINVAL;
  141. }
  142. std::promise<pthread_t> promise;
  143. auto func = [&]()
  144. {
  145. pthread_t t;
  146. t = pthread_self();
  147. promise.set_value(t);
  148. {
  149. std::lock_guard<std::mutex> lg(_threads_mutex);
  150. for(auto i = _threads.begin(); i != _threads.end(); ++i)
  151. {
  152. if(*i != t)
  153. continue;
  154. _threads.erase(i);
  155. break;
  156. }
  157. }
  158. syslog(LOG_DEBUG,
  159. "threadpool (%s): 1 thread removed",
  160. _name.c_str());
  161. pthread_exit(NULL);
  162. };
  163. enqueue_work(func);
  164. pthread_join(promise.get_future().get(),NULL);
  165. return 0;
  166. }
  167. int
  168. set_threads(std::size_t const count_)
  169. {
  170. int diff;
  171. {
  172. std::lock_guard<std::mutex> lg(_threads_mutex);
  173. diff = ((int)count_ - (int)_threads.size());
  174. }
  175. for(auto i = diff; i > 0; --i)
  176. add_thread();
  177. for(auto i = diff; i < 0; ++i)
  178. remove_thread();
  179. return diff;
  180. }
  181. public:
  182. template<typename FuncType>
  183. void
  184. enqueue_work(moodycamel::ProducerToken &ptok_,
  185. FuncType &&f_)
  186. {
  187. timespec ts = {0,1000};
  188. for(unsigned i = 0; i < 1000000; i++)
  189. {
  190. if(_queue_depth.load(std::memory_order_acquire) < _max_queue_depth)
  191. break;
  192. ::nanosleep(&ts,NULL);
  193. }
  194. _queue.enqueue(ptok_,f_);
  195. _queue_depth.fetch_add(1,std::memory_order_release);
  196. }
  197. template<typename FuncType>
  198. void
  199. enqueue_work(FuncType &&f_)
  200. {
  201. timespec ts = {0,1000};
  202. for(unsigned i = 0; i < 1000000; i++)
  203. {
  204. if(_queue_depth.load(std::memory_order_acquire) < _max_queue_depth)
  205. break;
  206. ::nanosleep(&ts,NULL);
  207. }
  208. _queue.enqueue(f_);
  209. _queue_depth.fetch_add(1,std::memory_order_release);
  210. }
  211. template<typename FuncType>
  212. [[nodiscard]]
  213. std::future<typename std::result_of<FuncType()>::type>
  214. enqueue_task(FuncType&& f_)
  215. {
  216. using TaskReturnType = typename std::result_of<FuncType()>::type;
  217. using Promise = std::promise<TaskReturnType>;
  218. auto promise = std::make_shared<Promise>();
  219. auto future = promise->get_future();
  220. auto work = [=]()
  221. {
  222. auto rv = f_();
  223. promise->set_value(rv);
  224. };
  225. timespec ts = {0,1000};
  226. for(unsigned i = 0; i < 1000000; i++)
  227. {
  228. if(_queue_depth.load(std::memory_order_acquire) < _max_queue_depth)
  229. break;
  230. ::nanosleep(&ts,NULL);
  231. }
  232. _queue.enqueue(work);
  233. _queue_depth.fetch_add(1,std::memory_order_release);
  234. return future;
  235. }
  236. public:
  237. std::vector<pthread_t>
  238. threads() const
  239. {
  240. std::lock_guard<std::mutex> lg(_threads_mutex);
  241. return _threads;
  242. }
  243. moodycamel::ProducerToken
  244. ptoken()
  245. {
  246. return moodycamel::ProducerToken(_queue);
  247. }
  248. private:
  249. Queue _queue;
  250. std::atomic<unsigned> _queue_depth;
  251. unsigned const _max_queue_depth;
  252. private:
  253. std::string const _name;
  254. std::vector<pthread_t> _threads;
  255. mutable std::mutex _threads_mutex;
  256. };