You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

310 lines
6.3 KiB

  1. #pragma once
  2. #include "moodycamel/blockingconcurrentqueue.h"
  3. #include "syslog.h"
  4. #include <atomic>
  5. #include <csignal>
  6. #include <cstring>
  7. #include <future>
  8. #include <memory>
  9. #include <mutex>
  10. #include <stdexcept>
  11. #include <string>
  12. #include <thread>
  13. #include <vector>
  14. struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits
  15. {
  16. static const int MAX_SEMA_SPINS = 1;
  17. };
  18. class ThreadPool
  19. {
  20. private:
  21. using Func = std::function<void(void)>;
  22. using Queue = moodycamel::BlockingConcurrentQueue<Func,ThreadPoolTraits>;
  23. public:
  24. explicit
  25. ThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(),
  26. std::size_t const queue_depth_ = 1,
  27. std::string const name_ = {})
  28. : _queue(queue_depth_,thread_count_,thread_count_),
  29. _name(get_thread_name(name_))
  30. {
  31. syslog_debug("threadpool: spawning %zu threads of queue depth %zu named '%s'",
  32. thread_count_,
  33. queue_depth_,
  34. _name.c_str());
  35. sigset_t oldset;
  36. sigset_t newset;
  37. sigfillset(&newset);
  38. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  39. _threads.reserve(thread_count_);
  40. for(std::size_t i = 0; i < thread_count_; ++i)
  41. {
  42. int rv;
  43. pthread_t t;
  44. rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
  45. if(rv != 0)
  46. {
  47. syslog_warning("threadpool: error spawning thread - %d (%s)",
  48. rv,
  49. strerror(rv));
  50. continue;
  51. }
  52. if(!_name.empty())
  53. pthread_setname_np(t,_name.c_str());
  54. _threads.push_back(t);
  55. }
  56. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  57. if(_threads.empty())
  58. throw std::runtime_error("threadpool: failed to spawn any threads");
  59. }
  60. ~ThreadPool()
  61. {
  62. syslog_debug("threadpool: destroying %zu threads named '%s'",
  63. _threads.size(),
  64. _name.c_str());
  65. for(auto t : _threads)
  66. pthread_cancel(t);
  67. Func f;
  68. while(_queue.try_dequeue(f))
  69. continue;
  70. for(auto t : _threads)
  71. pthread_join(t,NULL);
  72. }
  73. private:
  74. static
  75. std::string
  76. get_thread_name(std::string const name_)
  77. {
  78. if(!name_.empty())
  79. return name_;
  80. char name[16];
  81. pthread_getname_np(pthread_self(),name,sizeof(name));
  82. return name;
  83. }
  84. static
  85. void*
  86. start_routine(void *arg_)
  87. {
  88. ThreadPool *btp = static_cast<ThreadPool*>(arg_);
  89. ThreadPool::Func func;
  90. ThreadPool::Queue &q = btp->_queue;
  91. moodycamel::ConsumerToken ctok(btp->_queue);
  92. while(true)
  93. {
  94. q.wait_dequeue(ctok,func);
  95. func();
  96. }
  97. return NULL;
  98. }
  99. public:
  100. int
  101. add_thread(std::string const name_ = {})
  102. {
  103. int rv;
  104. pthread_t t;
  105. sigset_t oldset;
  106. sigset_t newset;
  107. std::string name;
  108. name = (name_.empty() ? _name : name_);
  109. sigfillset(&newset);
  110. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  111. rv = pthread_create(&t,NULL,ThreadPool::start_routine,this);
  112. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  113. if(rv != 0)
  114. {
  115. syslog_warning("threadpool: error spawning thread - %d (%s)",
  116. rv,
  117. strerror(rv));
  118. return -rv;
  119. }
  120. if(!name.empty())
  121. pthread_setname_np(t,name.c_str());
  122. {
  123. std::lock_guard<std::mutex> lg(_threads_mutex);
  124. _threads.push_back(t);
  125. }
  126. syslog_debug("threadpool: 1 thread added to pool '%s' named '%s'",
  127. _name.c_str(),
  128. name.c_str());
  129. return 0;
  130. }
  131. int
  132. remove_thread(void)
  133. {
  134. {
  135. std::lock_guard<std::mutex> lg(_threads_mutex);
  136. if(_threads.size() <= 1)
  137. return -EINVAL;
  138. }
  139. std::promise<pthread_t> promise;
  140. auto func = [&]()
  141. {
  142. pthread_t t;
  143. t = pthread_self();
  144. promise.set_value(t);
  145. {
  146. std::lock_guard<std::mutex> lg(_threads_mutex);
  147. for(auto i = _threads.begin(); i != _threads.end(); ++i)
  148. {
  149. if(*i != t)
  150. continue;
  151. _threads.erase(i);
  152. break;
  153. }
  154. }
  155. char name[16];
  156. pthread_getname_np(t,name,sizeof(name));
  157. syslog_debug("threadpool: 1 thread removed from pool '%s' named '%s'",
  158. _name.c_str(),
  159. name);
  160. pthread_exit(NULL);
  161. };
  162. enqueue_work(func);
  163. pthread_join(promise.get_future().get(),NULL);
  164. return 0;
  165. }
  166. int
  167. set_threads(std::size_t const count_)
  168. {
  169. int diff;
  170. {
  171. std::lock_guard<std::mutex> lg(_threads_mutex);
  172. diff = ((int)count_ - (int)_threads.size());
  173. }
  174. syslog_debug("diff: %d",diff);
  175. for(auto i = diff; i > 0; --i)
  176. add_thread();
  177. for(auto i = diff; i < 0; ++i)
  178. remove_thread();
  179. return diff;
  180. }
  181. public:
  182. template<typename FuncType>
  183. void
  184. enqueue_work(moodycamel::ProducerToken &ptok_,
  185. FuncType &&f_)
  186. {
  187. timespec ts = {0,10};
  188. while(true)
  189. {
  190. if(_queue.try_enqueue(ptok_,f_))
  191. return;
  192. ::nanosleep(&ts,NULL);
  193. ts.tv_nsec += 10;
  194. }
  195. }
  196. template<typename FuncType>
  197. void
  198. enqueue_work(FuncType &&f_)
  199. {
  200. timespec ts = {0,10};
  201. while(true)
  202. {
  203. if(_queue.try_enqueue(f_))
  204. return;
  205. ::nanosleep(&ts,NULL);
  206. ts.tv_nsec += 10;
  207. }
  208. }
  209. template<typename FuncType>
  210. [[nodiscard]]
  211. std::future<typename std::result_of<FuncType()>::type>
  212. enqueue_task(FuncType&& f_)
  213. {
  214. using TaskReturnType = typename std::result_of<FuncType()>::type;
  215. using Promise = std::promise<TaskReturnType>;
  216. auto promise = std::make_shared<Promise>();
  217. auto future = promise->get_future();
  218. auto work = [=]()
  219. {
  220. auto rv = f_();
  221. promise->set_value(rv);
  222. };
  223. timespec ts = {0,10};
  224. while(true)
  225. {
  226. if(_queue.try_enqueue(work))
  227. break;
  228. ::nanosleep(&ts,NULL);
  229. ts.tv_nsec += 10;
  230. }
  231. return future;
  232. }
  233. public:
  234. std::vector<pthread_t>
  235. threads() const
  236. {
  237. std::lock_guard<std::mutex> lg(_threads_mutex);
  238. return _threads;
  239. }
  240. moodycamel::ProducerToken
  241. ptoken()
  242. {
  243. return moodycamel::ProducerToken(_queue);
  244. }
  245. private:
  246. Queue _queue;
  247. private:
  248. std::string const _name;
  249. std::vector<pthread_t> _threads;
  250. mutable std::mutex _threads_mutex;
  251. };