You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

315 lines
6.3 KiB

  1. #pragma once
  2. #include "moodycamel/blockingconcurrentqueue.h"
  3. #include <atomic>
  4. #include <csignal>
  5. #include <cstring>
  6. #include <future>
  7. #include <memory>
  8. #include <mutex>
  9. #include <stdexcept>
  10. #include <string>
  11. #include <thread>
  12. #include <vector>
  13. #include <syslog.h>
  14. struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits
  15. {
  16. static const int MAX_SEMA_SPINS = 1;
  17. };
  18. class ThreadPool
  19. {
  20. private:
  21. using Func = std::function<void(void)>;
  22. using Queue = moodycamel::BlockingConcurrentQueue<Func,ThreadPoolTraits>;
  23. public:
  24. explicit
  25. ThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(),
  26. std::size_t const queue_depth_ = 1,
  27. std::string const name_ = {})
  28. : _queue(queue_depth_,thread_count_,thread_count_),
  29. _name(get_thread_name(name_))
  30. {
  31. syslog(LOG_DEBUG,
  32. "threadpool: spawning %zu threads of queue depth %zu named '%s'",
  33. thread_count_,
  34. queue_depth_,
  35. _name.c_str());
  36. sigset_t oldset;
  37. sigset_t newset;
  38. sigfillset(&newset);
  39. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  40. _threads.reserve(thread_count_);
  41. for(std::size_t i = 0; i < thread_count_; ++i)
  42. {
  43. int rv;
  44. pthread_t t;
  45. rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
  46. if(rv != 0)
  47. {
  48. syslog(LOG_WARNING,
  49. "threadpool: error spawning thread - %d (%s)",
  50. rv,
  51. strerror(rv));
  52. continue;
  53. }
  54. if(!_name.empty())
  55. pthread_setname_np(t,_name.c_str());
  56. _threads.push_back(t);
  57. }
  58. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  59. if(_threads.empty())
  60. throw std::runtime_error("threadpool: failed to spawn any threads");
  61. }
  62. ~ThreadPool()
  63. {
  64. syslog(LOG_DEBUG,
  65. "threadpool: destroying %zu threads named '%s'",
  66. _threads.size(),
  67. _name.c_str());
  68. auto func = []() { pthread_exit(NULL); };
  69. for(std::size_t i = 0; i < _threads.size(); i++)
  70. _queue.enqueue(func);
  71. for(auto t : _threads)
  72. pthread_cancel(t);
  73. for(auto t : _threads)
  74. pthread_join(t,NULL);
  75. }
  76. private:
  77. static
  78. std::string
  79. get_thread_name(std::string const name_)
  80. {
  81. if(!name_.empty())
  82. return name_;
  83. char name[16];
  84. pthread_getname_np(pthread_self(),name,sizeof(name));
  85. return name;
  86. }
  87. static
  88. void*
  89. start_routine(void *arg_)
  90. {
  91. ThreadPool *btp = static_cast<ThreadPool*>(arg_);
  92. ThreadPool::Func func;
  93. ThreadPool::Queue &q = btp->_queue;
  94. moodycamel::ConsumerToken ctok(btp->_queue);
  95. while(true)
  96. {
  97. q.wait_dequeue(ctok,func);
  98. func();
  99. }
  100. return NULL;
  101. }
  102. public:
  103. int
  104. add_thread(std::string const name_ = {})
  105. {
  106. int rv;
  107. pthread_t t;
  108. sigset_t oldset;
  109. sigset_t newset;
  110. std::string name;
  111. name = (name_.empty() ? _name : name_);
  112. sigfillset(&newset);
  113. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  114. rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
  115. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  116. if(rv != 0)
  117. {
  118. syslog(LOG_WARNING,
  119. "threadpool: error spawning thread - %d (%s)",
  120. rv,
  121. strerror(rv));
  122. return -rv;
  123. }
  124. if(!name.empty())
  125. pthread_setname_np(t,name.c_str());
  126. {
  127. std::lock_guard<std::mutex> lg(_threads_mutex);
  128. _threads.push_back(t);
  129. }
  130. syslog(LOG_DEBUG,
  131. "threadpool: 1 thread added to pool '%s' named '%s'",
  132. _name.c_str(),
  133. name.c_str());
  134. return 0;
  135. }
  136. int
  137. remove_thread(void)
  138. {
  139. {
  140. std::lock_guard<std::mutex> lg(_threads_mutex);
  141. if(_threads.size() <= 1)
  142. return -EINVAL;
  143. }
  144. std::promise<pthread_t> promise;
  145. auto func = [&]()
  146. {
  147. pthread_t t;
  148. t = pthread_self();
  149. promise.set_value(t);
  150. {
  151. std::lock_guard<std::mutex> lg(_threads_mutex);
  152. for(auto i = _threads.begin(); i != _threads.end(); ++i)
  153. {
  154. if(*i != t)
  155. continue;
  156. _threads.erase(i);
  157. break;
  158. }
  159. }
  160. char name[16];
  161. pthread_getname_np(t,name,sizeof(name));
  162. syslog(LOG_DEBUG,
  163. "threadpool: 1 thread removed from pool '%s' named '%s'",
  164. _name.c_str(),
  165. name);
  166. pthread_exit(NULL);
  167. };
  168. enqueue_work(func);
  169. pthread_join(promise.get_future().get(),NULL);
  170. return 0;
  171. }
  172. int
  173. set_threads(std::size_t const count_)
  174. {
  175. int diff;
  176. {
  177. std::lock_guard<std::mutex> lg(_threads_mutex);
  178. diff = ((int)count_ - (int)_threads.size());
  179. }
  180. for(auto i = diff; i > 0; --i)
  181. add_thread();
  182. for(auto i = diff; i < 0; ++i)
  183. remove_thread();
  184. return diff;
  185. }
  186. public:
  187. template<typename FuncType>
  188. void
  189. enqueue_work(moodycamel::ProducerToken &ptok_,
  190. FuncType &&f_)
  191. {
  192. timespec ts = {0,10};
  193. while(true)
  194. {
  195. if(_queue.try_enqueue(ptok_,f_))
  196. return;
  197. ::nanosleep(&ts,NULL);
  198. ts.tv_nsec += 10;
  199. }
  200. }
  201. template<typename FuncType>
  202. void
  203. enqueue_work(FuncType &&f_)
  204. {
  205. timespec ts = {0,10};
  206. while(true)
  207. {
  208. if(_queue.try_enqueue(f_))
  209. return;
  210. ::nanosleep(&ts,NULL);
  211. ts.tv_nsec += 10;
  212. }
  213. }
  214. template<typename FuncType>
  215. [[nodiscard]]
  216. std::future<typename std::result_of<FuncType()>::type>
  217. enqueue_task(FuncType&& f_)
  218. {
  219. using TaskReturnType = typename std::result_of<FuncType()>::type;
  220. using Promise = std::promise<TaskReturnType>;
  221. auto promise = std::make_shared<Promise>();
  222. auto future = promise->get_future();
  223. auto work = [=]()
  224. {
  225. auto rv = f_();
  226. promise->set_value(rv);
  227. };
  228. timespec ts = {0,10};
  229. while(true)
  230. {
  231. if(_queue.try_enqueue(work))
  232. break;
  233. ::nanosleep(&ts,NULL);
  234. ts.tv_nsec += 10;
  235. }
  236. return future;
  237. }
  238. public:
  239. std::vector<pthread_t>
  240. threads() const
  241. {
  242. std::lock_guard<std::mutex> lg(_threads_mutex);
  243. return _threads;
  244. }
  245. moodycamel::ProducerToken
  246. ptoken()
  247. {
  248. return moodycamel::ProducerToken(_queue);
  249. }
  250. private:
  251. Queue _queue;
  252. private:
  253. std::string const _name;
  254. std::vector<pthread_t> _threads;
  255. mutable std::mutex _threads_mutex;
  256. };