582 lines
21 KiB

  1. // Provides an efficient blocking version of moodycamel::ConcurrentQueue.
  2. // ©2015-2020 Cameron Desrochers. Distributed under the terms of the simplified
  3. // BSD license, available at the top of concurrentqueue.h.
  4. // Also dual-licensed under the Boost Software License (see LICENSE.md)
  5. // Uses Jeff Preshing's semaphore implementation (under the terms of its
  6. // separate zlib license, see lightweightsemaphore.h).
  7. #pragma once
  8. #include "concurrentqueue.h"
  9. #include "lightweightsemaphore.h"
  10. #include <type_traits>
  11. #include <cerrno>
  12. #include <memory>
  13. #include <chrono>
  14. #include <ctime>
  15. namespace moodycamel
  16. {
  17. // This is a blocking version of the queue. It has an almost identical interface to
  18. // the normal non-blocking version, with the addition of various wait_dequeue() methods
  19. // and the removal of producer-specific dequeue methods.
  20. template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
  21. class BlockingConcurrentQueue
  22. {
  23. private:
  24. typedef ::moodycamel::ConcurrentQueue<T, Traits> ConcurrentQueue;
  25. typedef ::moodycamel::LightweightSemaphore LightweightSemaphore;
  26. public:
  27. typedef typename ConcurrentQueue::producer_token_t producer_token_t;
  28. typedef typename ConcurrentQueue::consumer_token_t consumer_token_t;
  29. typedef typename ConcurrentQueue::index_t index_t;
  30. typedef typename ConcurrentQueue::size_t size_t;
  31. typedef typename std::make_signed<size_t>::type ssize_t;
  32. static const size_t BLOCK_SIZE = ConcurrentQueue::BLOCK_SIZE;
  33. static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD;
  34. static const size_t EXPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE;
  35. static const size_t IMPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE;
  36. static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = ConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
  37. static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = ConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE;
  38. static const size_t MAX_SUBQUEUE_SIZE = ConcurrentQueue::MAX_SUBQUEUE_SIZE;
  39. public:
  40. // Creates a queue with at least `capacity` element slots; note that the
  41. // actual number of elements that can be inserted without additional memory
  42. // allocation depends on the number of producers and the block size (e.g. if
  43. // the block size is equal to `capacity`, only a single block will be allocated
  44. // up-front, which means only a single producer will be able to enqueue elements
  45. // without an extra allocation -- blocks aren't shared between producers).
  46. // This method is not thread safe -- it is up to the user to ensure that the
  47. // queue is fully constructed before it starts being used by other threads (this
  48. // includes making the memory effects of construction visible, possibly with a
  49. // memory barrier).
  50. explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
  51. : inner(capacity), sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
  52. {
  53. assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
  54. if (!sema) {
  55. MOODYCAMEL_THROW(std::bad_alloc());
  56. }
  57. }
  58. BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
  59. : inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
  60. {
  61. assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
  62. if (!sema) {
  63. MOODYCAMEL_THROW(std::bad_alloc());
  64. }
  65. }
  66. // Disable copying and copy assignment
  67. BlockingConcurrentQueue(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
  68. BlockingConcurrentQueue& operator=(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
  69. // Moving is supported, but note that it is *not* a thread-safe operation.
  70. // Nobody can use the queue while it's being moved, and the memory effects
  71. // of that move must be propagated to other threads before they can use it.
  72. // Note: When a queue is moved, its tokens are still valid but can only be
  73. // used with the destination queue (i.e. semantically they are moved along
  74. // with the queue itself).
  75. BlockingConcurrentQueue(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
  76. : inner(std::move(other.inner)), sema(std::move(other.sema))
  77. { }
  78. inline BlockingConcurrentQueue& operator=(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
  79. {
  80. return swap_internal(other);
  81. }
  82. // Swaps this queue's state with the other's. Not thread-safe.
  83. // Swapping two queues does not invalidate their tokens, however
  84. // the tokens that were created for one queue must be used with
  85. // only the swapped queue (i.e. the tokens are tied to the
  86. // queue's movable state, not the object itself).
  87. inline void swap(BlockingConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
  88. {
  89. swap_internal(other);
  90. }
  91. private:
  92. BlockingConcurrentQueue& swap_internal(BlockingConcurrentQueue& other)
  93. {
  94. if (this == &other) {
  95. return *this;
  96. }
  97. inner.swap(other.inner);
  98. sema.swap(other.sema);
  99. return *this;
  100. }
  101. public:
  102. // Enqueues a single item (by copying it).
  103. // Allocates memory if required. Only fails if memory allocation fails (or implicit
  104. // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
  105. // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
  106. // Thread-safe.
  107. inline bool enqueue(T const& item)
  108. {
  109. if ((details::likely)(inner.enqueue(item))) {
  110. sema->signal();
  111. return true;
  112. }
  113. return false;
  114. }
  115. // Enqueues a single item (by moving it, if possible).
  116. // Allocates memory if required. Only fails if memory allocation fails (or implicit
  117. // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
  118. // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
  119. // Thread-safe.
  120. inline bool enqueue(T&& item)
  121. {
  122. if ((details::likely)(inner.enqueue(std::move(item)))) {
  123. sema->signal();
  124. return true;
  125. }
  126. return false;
  127. }
  128. // Enqueues a single item (by copying it) using an explicit producer token.
  129. // Allocates memory if required. Only fails if memory allocation fails (or
  130. // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
  131. // Thread-safe.
  132. inline bool enqueue(producer_token_t const& token, T const& item)
  133. {
  134. if ((details::likely)(inner.enqueue(token, item))) {
  135. sema->signal();
  136. return true;
  137. }
  138. return false;
  139. }
  140. // Enqueues a single item (by moving it, if possible) using an explicit producer token.
  141. // Allocates memory if required. Only fails if memory allocation fails (or
  142. // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
  143. // Thread-safe.
  144. inline bool enqueue(producer_token_t const& token, T&& item)
  145. {
  146. if ((details::likely)(inner.enqueue(token, std::move(item)))) {
  147. sema->signal();
  148. return true;
  149. }
  150. return false;
  151. }
  152. // Enqueues several items.
  153. // Allocates memory if required. Only fails if memory allocation fails (or
  154. // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
  155. // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
  156. // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
  157. // Thread-safe.
  158. template<typename It>
  159. inline bool enqueue_bulk(It itemFirst, size_t count)
  160. {
  161. if ((details::likely)(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
  162. sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
  163. return true;
  164. }
  165. return false;
  166. }
  167. // Enqueues several items using an explicit producer token.
  168. // Allocates memory if required. Only fails if memory allocation fails
  169. // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
  170. // Note: Use std::make_move_iterator if the elements should be moved
  171. // instead of copied.
  172. // Thread-safe.
  173. template<typename It>
  174. inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
  175. {
  176. if ((details::likely)(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
  177. sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
  178. return true;
  179. }
  180. return false;
  181. }
  182. // Enqueues a single item (by copying it).
  183. // Does not allocate memory. Fails if not enough room to enqueue (or implicit
  184. // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
  185. // is 0).
  186. // Thread-safe.
  187. inline bool try_enqueue(T const& item)
  188. {
  189. if (inner.try_enqueue(item)) {
  190. sema->signal();
  191. return true;
  192. }
  193. return false;
  194. }
  195. // Enqueues a single item (by moving it, if possible).
  196. // Does not allocate memory (except for one-time implicit producer).
  197. // Fails if not enough room to enqueue (or implicit production is
  198. // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
  199. // Thread-safe.
  200. inline bool try_enqueue(T&& item)
  201. {
  202. if (inner.try_enqueue(std::move(item))) {
  203. sema->signal();
  204. return true;
  205. }
  206. return false;
  207. }
  208. // Enqueues a single item (by copying it) using an explicit producer token.
  209. // Does not allocate memory. Fails if not enough room to enqueue.
  210. // Thread-safe.
  211. inline bool try_enqueue(producer_token_t const& token, T const& item)
  212. {
  213. if (inner.try_enqueue(token, item)) {
  214. sema->signal();
  215. return true;
  216. }
  217. return false;
  218. }
  219. // Enqueues a single item (by moving it, if possible) using an explicit producer token.
  220. // Does not allocate memory. Fails if not enough room to enqueue.
  221. // Thread-safe.
  222. inline bool try_enqueue(producer_token_t const& token, T&& item)
  223. {
  224. if (inner.try_enqueue(token, std::move(item))) {
  225. sema->signal();
  226. return true;
  227. }
  228. return false;
  229. }
  230. // Enqueues several items.
  231. // Does not allocate memory (except for one-time implicit producer).
  232. // Fails if not enough room to enqueue (or implicit production is
  233. // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
  234. // Note: Use std::make_move_iterator if the elements should be moved
  235. // instead of copied.
  236. // Thread-safe.
  237. template<typename It>
  238. inline bool try_enqueue_bulk(It itemFirst, size_t count)
  239. {
  240. if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
  241. sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
  242. return true;
  243. }
  244. return false;
  245. }
  246. // Enqueues several items using an explicit producer token.
  247. // Does not allocate memory. Fails if not enough room to enqueue.
  248. // Note: Use std::make_move_iterator if the elements should be moved
  249. // instead of copied.
  250. // Thread-safe.
  251. template<typename It>
  252. inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
  253. {
  254. if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
  255. sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
  256. return true;
  257. }
  258. return false;
  259. }
  260. // Attempts to dequeue from the queue.
  261. // Returns false if all producer streams appeared empty at the time they
  262. // were checked (so, the queue is likely but not guaranteed to be empty).
  263. // Never allocates. Thread-safe.
  264. template<typename U>
  265. inline bool try_dequeue(U& item)
  266. {
  267. if (sema->tryWait()) {
  268. while (!inner.try_dequeue(item)) {
  269. continue;
  270. }
  271. return true;
  272. }
  273. return false;
  274. }
  275. // Attempts to dequeue from the queue using an explicit consumer token.
  276. // Returns false if all producer streams appeared empty at the time they
  277. // were checked (so, the queue is likely but not guaranteed to be empty).
  278. // Never allocates. Thread-safe.
  279. template<typename U>
  280. inline bool try_dequeue(consumer_token_t& token, U& item)
  281. {
  282. if (sema->tryWait()) {
  283. while (!inner.try_dequeue(token, item)) {
  284. continue;
  285. }
  286. return true;
  287. }
  288. return false;
  289. }
  290. // Attempts to dequeue several elements from the queue.
  291. // Returns the number of items actually dequeued.
  292. // Returns 0 if all producer streams appeared empty at the time they
  293. // were checked (so, the queue is likely but not guaranteed to be empty).
  294. // Never allocates. Thread-safe.
  295. template<typename It>
  296. inline size_t try_dequeue_bulk(It itemFirst, size_t max)
  297. {
  298. size_t count = 0;
  299. max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
  300. while (count != max) {
  301. count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
  302. }
  303. return count;
  304. }
  305. // Attempts to dequeue several elements from the queue using an explicit consumer token.
  306. // Returns the number of items actually dequeued.
  307. // Returns 0 if all producer streams appeared empty at the time they
  308. // were checked (so, the queue is likely but not guaranteed to be empty).
  309. // Never allocates. Thread-safe.
  310. template<typename It>
  311. inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
  312. {
  313. size_t count = 0;
  314. max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
  315. while (count != max) {
  316. count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
  317. }
  318. return count;
  319. }
  320. // Blocks the current thread until there's something to dequeue, then
  321. // dequeues it.
  322. // Never allocates. Thread-safe.
  323. template<typename U>
  324. inline void wait_dequeue(U& item)
  325. {
  326. while (!sema->wait()) {
  327. continue;
  328. }
  329. while (!inner.try_dequeue(item)) {
  330. continue;
  331. }
  332. }
  333. // Blocks the current thread until either there's something to dequeue
  334. // or the timeout (specified in microseconds) expires. Returns false
  335. // without setting `item` if the timeout expires, otherwise assigns
  336. // to `item` and returns true.
  337. // Using a negative timeout indicates an indefinite timeout,
  338. // and is thus functionally equivalent to calling wait_dequeue.
  339. // Never allocates. Thread-safe.
  340. template<typename U>
  341. inline bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
  342. {
  343. if (!sema->wait(timeout_usecs)) {
  344. return false;
  345. }
  346. while (!inner.try_dequeue(item)) {
  347. continue;
  348. }
  349. return true;
  350. }
  351. // Blocks the current thread until either there's something to dequeue
  352. // or the timeout expires. Returns false without setting `item` if the
  353. // timeout expires, otherwise assigns to `item` and returns true.
  354. // Never allocates. Thread-safe.
  355. template<typename U, typename Rep, typename Period>
  356. inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout)
  357. {
  358. return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
  359. }
  360. // Blocks the current thread until there's something to dequeue, then
  361. // dequeues it using an explicit consumer token.
  362. // Never allocates. Thread-safe.
  363. template<typename U>
  364. inline void wait_dequeue(consumer_token_t& token, U& item)
  365. {
  366. while (!sema->wait()) {
  367. continue;
  368. }
  369. while (!inner.try_dequeue(token, item)) {
  370. continue;
  371. }
  372. }
  373. // Blocks the current thread until either there's something to dequeue
  374. // or the timeout (specified in microseconds) expires. Returns false
  375. // without setting `item` if the timeout expires, otherwise assigns
  376. // to `item` and returns true.
  377. // Using a negative timeout indicates an indefinite timeout,
  378. // and is thus functionally equivalent to calling wait_dequeue.
  379. // Never allocates. Thread-safe.
  380. template<typename U>
  381. inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::int64_t timeout_usecs)
  382. {
  383. if (!sema->wait(timeout_usecs)) {
  384. return false;
  385. }
  386. while (!inner.try_dequeue(token, item)) {
  387. continue;
  388. }
  389. return true;
  390. }
  391. // Blocks the current thread until either there's something to dequeue
  392. // or the timeout expires. Returns false without setting `item` if the
  393. // timeout expires, otherwise assigns to `item` and returns true.
  394. // Never allocates. Thread-safe.
  395. template<typename U, typename Rep, typename Period>
  396. inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::chrono::duration<Rep, Period> const& timeout)
  397. {
  398. return wait_dequeue_timed(token, item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
  399. }
  400. // Attempts to dequeue several elements from the queue.
  401. // Returns the number of items actually dequeued, which will
  402. // always be at least one (this method blocks until the queue
  403. // is non-empty) and at most max.
  404. // Never allocates. Thread-safe.
  405. template<typename It>
  406. inline size_t wait_dequeue_bulk(It itemFirst, size_t max)
  407. {
  408. size_t count = 0;
  409. max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
  410. while (count != max) {
  411. count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
  412. }
  413. return count;
  414. }
  415. // Attempts to dequeue several elements from the queue.
  416. // Returns the number of items actually dequeued, which can
  417. // be 0 if the timeout expires while waiting for elements,
  418. // and at most max.
  419. // Using a negative timeout indicates an indefinite timeout,
  420. // and is thus functionally equivalent to calling wait_dequeue_bulk.
  421. // Never allocates. Thread-safe.
  422. template<typename It>
  423. inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
  424. {
  425. size_t count = 0;
  426. max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
  427. while (count != max) {
  428. count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
  429. }
  430. return count;
  431. }
  432. // Attempts to dequeue several elements from the queue.
  433. // Returns the number of items actually dequeued, which can
  434. // be 0 if the timeout expires while waiting for elements,
  435. // and at most max.
  436. // Never allocates. Thread-safe.
  437. template<typename It, typename Rep, typename Period>
  438. inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
  439. {
  440. return wait_dequeue_bulk_timed<It&>(itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
  441. }
  442. // Attempts to dequeue several elements from the queue using an explicit consumer token.
  443. // Returns the number of items actually dequeued, which will
  444. // always be at least one (this method blocks until the queue
  445. // is non-empty) and at most max.
  446. // Never allocates. Thread-safe.
  447. template<typename It>
  448. inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
  449. {
  450. size_t count = 0;
  451. max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
  452. while (count != max) {
  453. count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
  454. }
  455. return count;
  456. }
  457. // Attempts to dequeue several elements from the queue using an explicit consumer token.
  458. // Returns the number of items actually dequeued, which can
  459. // be 0 if the timeout expires while waiting for elements,
  460. // and at most max.
  461. // Using a negative timeout indicates an indefinite timeout,
  462. // and is thus functionally equivalent to calling wait_dequeue_bulk.
  463. // Never allocates. Thread-safe.
  464. template<typename It>
  465. inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs)
  466. {
  467. size_t count = 0;
  468. max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
  469. while (count != max) {
  470. count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
  471. }
  472. return count;
  473. }
  474. // Attempts to dequeue several elements from the queue using an explicit consumer token.
  475. // Returns the number of items actually dequeued, which can
  476. // be 0 if the timeout expires while waiting for elements,
  477. // and at most max.
  478. // Never allocates. Thread-safe.
  479. template<typename It, typename Rep, typename Period>
  480. inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
  481. {
  482. return wait_dequeue_bulk_timed<It&>(token, itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
  483. }
  484. // Returns an estimate of the total number of elements currently in the queue. This
  485. // estimate is only accurate if the queue has completely stabilized before it is called
  486. // (i.e. all enqueue and dequeue operations have completed and their memory effects are
  487. // visible on the calling thread, and no further operations start while this method is
  488. // being called).
  489. // Thread-safe.
  490. inline size_t size_approx() const
  491. {
  492. return (size_t)sema->availableApprox();
  493. }
  494. // Returns true if the underlying atomic variables used by
  495. // the queue are lock-free (they should be on most platforms).
  496. // Thread-safe.
  497. static constexpr bool is_lock_free()
  498. {
  499. return ConcurrentQueue::is_lock_free();
  500. }
  501. private:
  502. template<typename U, typename A1, typename A2>
  503. static inline U* create(A1&& a1, A2&& a2)
  504. {
  505. void* p = (Traits::malloc)(sizeof(U));
  506. return p != nullptr ? new (p) U(std::forward<A1>(a1), std::forward<A2>(a2)) : nullptr;
  507. }
  508. template<typename U>
  509. static inline void destroy(U* p)
  510. {
  511. if (p != nullptr) {
  512. p->~U();
  513. }
  514. (Traits::free)(p);
  515. }
  516. private:
  517. ConcurrentQueue inner;
  518. std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore*)> sema;
  519. };
  520. template<typename T, typename Traits>
  521. inline void swap(BlockingConcurrentQueue<T, Traits>& a, BlockingConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
  522. {
  523. a.swap(b);
  524. }
  525. } // end namespace moodycamel