You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

130 lines
2.5 KiB

2 years ago
  1. #pragma once
  2. #include "bounded_queue.hpp"
  3. #include <tuple>
  4. #include <atomic>
  5. #include <vector>
  6. #include <thread>
  7. #include <memory>
  8. #include <future>
  9. #include <utility>
  10. #include <functional>
  11. #include <type_traits>
  12. class BoundedThreadPool
  13. {
  14. private:
  15. using Proc = std::function<void(void)>;
  16. using Queue = BoundedQueue<Proc>;
  17. using Queues = std::vector<std::shared_ptr<Queue>>;
  18. public:
  19. explicit
  20. BoundedThreadPool(const std::size_t thread_count_ = std::thread::hardware_concurrency())
  21. : _queues(),
  22. _count(thread_count_)
  23. {
  24. printf("threads: %d\n",thread_count_);
  25. for(std::size_t i = 0; i < thread_count_; i++)
  26. _queues.emplace_back(std::make_shared<Queue>(1));
  27. auto worker = [this](std::size_t i)
  28. {
  29. while(true)
  30. {
  31. Proc f;
  32. for(std::size_t n = 0; n < (_count * K); ++n)
  33. {
  34. if(_queues[(i + n) % _count]->pop(f))
  35. break;
  36. }
  37. if(!f && !_queues[i]->pop(f))
  38. break;
  39. f();
  40. }
  41. };
  42. _threads.reserve(thread_count_);
  43. for(std::size_t i = 0; i < thread_count_; ++i)
  44. _threads.emplace_back(worker, i);
  45. }
  46. ~BoundedThreadPool()
  47. {
  48. for(auto& queue : _queues)
  49. queue->unblock();
  50. for(auto& thread : _threads)
  51. thread.join();
  52. }
  53. template<typename F>
  54. void
  55. enqueue_work(F&& f_)
  56. {
  57. auto i = _index++;
  58. for(std::size_t n = 0; n < (_count * K); ++n)
  59. {
  60. if(_queues[(i + n) % _count]->push(f_))
  61. return;
  62. }
  63. _queues[i % _count]->push(std::move(f_));
  64. }
  65. template<typename F>
  66. [[nodiscard]]
  67. std::future<typename std::result_of<F()>::type>
  68. enqueue_task(F&& f_)
  69. {
  70. using TaskReturnType = typename std::result_of<F()>::type;
  71. using Promise = std::promise<TaskReturnType>;
  72. auto i = _index++;
  73. auto promise = std::make_shared<Promise>();
  74. auto future = promise->get_future();
  75. auto work = [=]() {
  76. auto rv = f_();
  77. promise->set_value(rv);
  78. };
  79. for(std::size_t n = 0; n < (_count * K); ++n)
  80. {
  81. if(_queues[(i + n) % _count]->push(work))
  82. return future;
  83. }
  84. _queues[i % _count]->push(std::move(work));
  85. return future;
  86. }
  87. public:
  88. std::vector<pthread_t>
  89. threads()
  90. {
  91. std::vector<pthread_t> rv;
  92. for(auto &thread : _threads)
  93. rv.push_back(thread.native_handle());
  94. return rv;
  95. }
  96. private:
  97. Queues _queues;
  98. private:
  99. std::vector<std::thread> _threads;
  100. private:
  101. const std::size_t _count;
  102. std::atomic_uint _index;
  103. static const unsigned int K = 2;
  104. };