You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
4.1 KiB

  1. #pragma once
  2. #include "moodycamel/blockingconcurrentqueue.h"
  3. #include "syslog.h"
  4. #include <atomic>
  5. #include <csignal>
  6. #include <cstring>
  7. #include <future>
  8. #include <memory>
  9. #include <stdexcept>
  10. #include <string>
  11. #include <thread>
  12. #include <vector>
  13. struct BoundedThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits
  14. {
  15. static const int MAX_SEMA_SPINS = 1;
  16. };
  17. class BoundedThreadPool
  18. {
  19. private:
  20. using Func = std::function<void(void)>;
  21. using Queue = moodycamel::BlockingConcurrentQueue<Func,BoundedThreadPoolTraits>;
  22. public:
  23. explicit
  24. BoundedThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(),
  25. std::size_t const queue_depth_ = 1,
  26. std::string const name_ = {})
  27. : _queue(queue_depth_,thread_count_,thread_count_),
  28. _done(false),
  29. _name(name_)
  30. {
  31. syslog_debug("threadpool: spawning %zu threads of queue depth %zu named '%s'",
  32. thread_count_,
  33. queue_depth_,
  34. _name.c_str());
  35. sigset_t oldset;
  36. sigset_t newset;
  37. sigfillset(&newset);
  38. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  39. _threads.reserve(thread_count_);
  40. for(std::size_t i = 0; i < thread_count_; ++i)
  41. {
  42. int rv;
  43. pthread_t t;
  44. rv = pthread_create(&t,NULL,BoundedThreadPool::start_routine,this);
  45. if(rv != 0)
  46. {
  47. syslog_warning("threadpool: error spawning thread - %d (%s)",
  48. rv,
  49. strerror(rv));
  50. continue;
  51. }
  52. if(!_name.empty())
  53. pthread_setname_np(t,_name.c_str());
  54. _threads.push_back(t);
  55. }
  56. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  57. if(_threads.empty())
  58. throw std::runtime_error("threadpool: failed to spawn any threads");
  59. }
  60. ~BoundedThreadPool()
  61. {
  62. syslog_debug("threadpool: destroying %zu threads named '%s'",
  63. _threads.size(),
  64. _name.c_str());
  65. _done.store(true,std::memory_order_relaxed);
  66. for(auto t : _threads)
  67. pthread_cancel(t);
  68. Func f;
  69. while(_queue.try_dequeue(f))
  70. continue;
  71. for(auto t : _threads)
  72. pthread_join(t,NULL);
  73. }
  74. private:
  75. static
  76. void*
  77. start_routine(void *arg_)
  78. {
  79. BoundedThreadPool *btp = static_cast<BoundedThreadPool*>(arg_);
  80. BoundedThreadPool::Func func;
  81. std::atomic<bool> &done = btp->_done;
  82. BoundedThreadPool::Queue &q = btp->_queue;
  83. moodycamel::ConsumerToken ctok(btp->_queue);
  84. while(!done.load(std::memory_order_relaxed))
  85. {
  86. q.wait_dequeue(ctok,func);
  87. func();
  88. }
  89. return NULL;
  90. }
  91. public:
  92. template<typename FuncType>
  93. void
  94. enqueue_work(moodycamel::ProducerToken &ptok_,
  95. FuncType &&f_)
  96. {
  97. timespec ts = {0,10};
  98. while(true)
  99. {
  100. if(_queue.try_enqueue(ptok_,f_))
  101. return;
  102. ::nanosleep(&ts,NULL);
  103. ts.tv_nsec += 10;
  104. }
  105. }
  106. template<typename FuncType>
  107. void
  108. enqueue_work(FuncType &&f_)
  109. {
  110. timespec ts = {0,10};
  111. while(true)
  112. {
  113. if(_queue.try_enqueue(f_))
  114. return;
  115. ::nanosleep(&ts,NULL);
  116. ts.tv_nsec += 10;
  117. }
  118. }
  119. template<typename FuncType>
  120. [[nodiscard]]
  121. std::future<typename std::result_of<FuncType()>::type>
  122. enqueue_task(FuncType&& f_)
  123. {
  124. using TaskReturnType = typename std::result_of<FuncType()>::type;
  125. using Promise = std::promise<TaskReturnType>;
  126. auto promise = std::make_shared<Promise>();
  127. auto future = promise->get_future();
  128. auto work = [=]()
  129. {
  130. auto rv = f_();
  131. promise->set_value(rv);
  132. };
  133. timespec ts = {0,10};
  134. while(true)
  135. {
  136. if(_queue.try_enqueue(work))
  137. break;
  138. ::nanosleep(&ts,NULL);
  139. ts.tv_nsec += 10;
  140. }
  141. return future;
  142. }
  143. public:
  144. std::vector<pthread_t>
  145. threads() const
  146. {
  147. return _threads;
  148. }
  149. public:
  150. Queue&
  151. queue()
  152. {
  153. return _queue;
  154. }
  155. private:
  156. Queue _queue;
  157. private:
  158. std::atomic<bool> _done;
  159. std::string const _name;
  160. std::vector<pthread_t> _threads;
  161. };