You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

309 lines
6.2 KiB

  1. #pragma once
  2. #include "moodycamel/blockingconcurrentqueue.h"
  3. #include "syslog.h"
  4. #include <atomic>
  5. #include <csignal>
  6. #include <cstring>
  7. #include <future>
  8. #include <memory>
  9. #include <mutex>
  10. #include <stdexcept>
  11. #include <string>
  12. #include <thread>
  13. #include <vector>
  14. struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits
  15. {
  16. static const int MAX_SEMA_SPINS = 1;
  17. };
  18. class ThreadPool
  19. {
  20. private:
  21. using Func = std::function<void(void)>;
  22. using Queue = moodycamel::BlockingConcurrentQueue<Func,ThreadPoolTraits>;
  23. public:
  24. explicit
  25. ThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(),
  26. std::size_t const queue_depth_ = 1,
  27. std::string const name_ = {})
  28. : _queue(queue_depth_,thread_count_,thread_count_),
  29. _name(get_thread_name(name_))
  30. {
  31. syslog_debug("threadpool: spawning %zu threads of queue depth %zu named '%s'",
  32. thread_count_,
  33. queue_depth_,
  34. _name.c_str());
  35. sigset_t oldset;
  36. sigset_t newset;
  37. sigfillset(&newset);
  38. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  39. _threads.reserve(thread_count_);
  40. for(std::size_t i = 0; i < thread_count_; ++i)
  41. {
  42. int rv;
  43. pthread_t t;
  44. rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
  45. if(rv != 0)
  46. {
  47. syslog_warning("threadpool: error spawning thread - %d (%s)",
  48. rv,
  49. strerror(rv));
  50. continue;
  51. }
  52. if(!_name.empty())
  53. pthread_setname_np(t,_name.c_str());
  54. _threads.push_back(t);
  55. }
  56. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  57. if(_threads.empty())
  58. throw std::runtime_error("threadpool: failed to spawn any threads");
  59. }
  60. ~ThreadPool()
  61. {
  62. syslog_debug("threadpool: destroying %zu threads named '%s'",
  63. _threads.size(),
  64. _name.c_str());
  65. for(auto t : _threads)
  66. pthread_cancel(t);
  67. Func f;
  68. while(_queue.try_dequeue(f))
  69. continue;
  70. for(auto t : _threads)
  71. pthread_join(t,NULL);
  72. }
  73. private:
  74. static
  75. std::string
  76. get_thread_name(std::string const name_)
  77. {
  78. if(!name_.empty())
  79. return name_;
  80. char name[16];
  81. pthread_getname_np(pthread_self(),name,sizeof(name));
  82. return name;
  83. }
  84. static
  85. void*
  86. start_routine(void *arg_)
  87. {
  88. ThreadPool *btp = static_cast<ThreadPool*>(arg_);
  89. ThreadPool::Func func;
  90. ThreadPool::Queue &q = btp->_queue;
  91. moodycamel::ConsumerToken ctok(btp->_queue);
  92. while(true)
  93. {
  94. q.wait_dequeue(ctok,func);
  95. func();
  96. }
  97. return NULL;
  98. }
  99. public:
  100. int
  101. add_thread(std::string const name_ = {})
  102. {
  103. int rv;
  104. pthread_t t;
  105. sigset_t oldset;
  106. sigset_t newset;
  107. std::string name;
  108. name = (name_.empty() ? _name : name_);
  109. sigfillset(&newset);
  110. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  111. rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
  112. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  113. if(rv != 0)
  114. {
  115. syslog_warning("threadpool: error spawning thread - %d (%s)",
  116. rv,
  117. strerror(rv));
  118. return -rv;
  119. }
  120. if(!name.empty())
  121. pthread_setname_np(t,name.c_str());
  122. {
  123. std::lock_guard<std::mutex> lg(_threads_mutex);
  124. _threads.push_back(t);
  125. }
  126. syslog_debug("threadpool: 1 thread added to pool '%s' named '%s'",
  127. _name.c_str(),
  128. name.c_str());
  129. return 0;
  130. }
  131. int
  132. remove_thread(void)
  133. {
  134. {
  135. std::lock_guard<std::mutex> lg(_threads_mutex);
  136. if(_threads.size() <= 1)
  137. return -EINVAL;
  138. }
  139. std::promise<pthread_t> promise;
  140. auto func = [&]()
  141. {
  142. pthread_t t;
  143. t = pthread_self();
  144. promise.set_value(t);
  145. {
  146. std::lock_guard<std::mutex> lg(_threads_mutex);
  147. for(auto i = _threads.begin(); i != _threads.end(); ++i)
  148. {
  149. if(*i != t)
  150. continue;
  151. _threads.erase(i);
  152. break;
  153. }
  154. }
  155. char name[16];
  156. pthread_getname_np(t,name,sizeof(name));
  157. syslog_debug("threadpool: 1 thread removed from pool '%s' named '%s'",
  158. _name.c_str(),
  159. name);
  160. pthread_exit(NULL);
  161. };
  162. enqueue_work(func);
  163. pthread_join(promise.get_future().get(),NULL);
  164. return 0;
  165. }
  166. int
  167. set_threads(std::size_t const count_)
  168. {
  169. int diff;
  170. {
  171. std::lock_guard<std::mutex> lg(_threads_mutex);
  172. diff = ((int)count_ - (int)_threads.size());
  173. }
  174. for(auto i = diff; i > 0; --i)
  175. add_thread();
  176. for(auto i = diff; i < 0; ++i)
  177. remove_thread();
  178. return diff;
  179. }
  180. public:
  181. template<typename FuncType>
  182. void
  183. enqueue_work(moodycamel::ProducerToken &ptok_,
  184. FuncType &&f_)
  185. {
  186. timespec ts = {0,10};
  187. while(true)
  188. {
  189. if(_queue.try_enqueue(ptok_,f_))
  190. return;
  191. ::nanosleep(&ts,NULL);
  192. ts.tv_nsec += 10;
  193. }
  194. }
  195. template<typename FuncType>
  196. void
  197. enqueue_work(FuncType &&f_)
  198. {
  199. timespec ts = {0,10};
  200. while(true)
  201. {
  202. if(_queue.try_enqueue(f_))
  203. return;
  204. ::nanosleep(&ts,NULL);
  205. ts.tv_nsec += 10;
  206. }
  207. }
  208. template<typename FuncType>
  209. [[nodiscard]]
  210. std::future<typename std::result_of<FuncType()>::type>
  211. enqueue_task(FuncType&& f_)
  212. {
  213. using TaskReturnType = typename std::result_of<FuncType()>::type;
  214. using Promise = std::promise<TaskReturnType>;
  215. auto promise = std::make_shared<Promise>();
  216. auto future = promise->get_future();
  217. auto work = [=]()
  218. {
  219. auto rv = f_();
  220. promise->set_value(rv);
  221. };
  222. timespec ts = {0,10};
  223. while(true)
  224. {
  225. if(_queue.try_enqueue(work))
  226. break;
  227. ::nanosleep(&ts,NULL);
  228. ts.tv_nsec += 10;
  229. }
  230. return future;
  231. }
  232. public:
  233. std::vector<pthread_t>
  234. threads() const
  235. {
  236. std::lock_guard<std::mutex> lg(_threads_mutex);
  237. return _threads;
  238. }
  239. moodycamel::ProducerToken
  240. ptoken()
  241. {
  242. return moodycamel::ProducerToken(_queue);
  243. }
  244. private:
  245. Queue _queue;
  246. private:
  247. std::string const _name;
  248. std::vector<pthread_t> _threads;
  249. mutable std::mutex _threads_mutex;
  250. };