You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

427 lines
12 KiB

  1. // Provides an efficient implementation of a semaphore (LightweightSemaphore).
  2. // This is an extension of Jeff Preshing's sempahore implementation (licensed
  3. // under the terms of its separate zlib license) that has been adapted and
  4. // extended by Cameron Desrochers.
  5. #pragma once
  6. #include <cstddef> // For std::size_t
  7. #include <atomic>
  8. #include <type_traits> // For std::make_signed<T>
  9. #if defined(_WIN32)
  10. // Avoid including windows.h in a header; we only need a handful of
  11. // items, so we'll redeclare them here (this is relatively safe since
  12. // the API generally has to remain stable between Windows versions).
  13. // I know this is an ugly hack but it still beats polluting the global
  14. // namespace with thousands of generic names or adding a .cpp for nothing.
  15. extern "C" {
  16. struct _SECURITY_ATTRIBUTES;
  17. __declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName);
  18. __declspec(dllimport) int __stdcall CloseHandle(void* hObject);
  19. __declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds);
  20. __declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount);
  21. }
  22. #elif defined(__MACH__)
  23. #include <mach/mach.h>
  24. #elif defined(__MVS__)
  25. #include <zos-semaphore.h>
  26. #elif defined(__unix__)
  27. #include <semaphore.h>
  28. #if defined(__GLIBC_PREREQ) && defined(_GNU_SOURCE)
  29. #if __GLIBC_PREREQ(2,30)
  30. #define MOODYCAMEL_LIGHTWEIGHTSEMAPHORE_MONOTONIC
  31. #endif
  32. #endif
  33. #endif
  34. namespace moodycamel
  35. {
  36. namespace details
  37. {
  38. // Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's
  39. // portable + lightweight semaphore implementations, originally from
  40. // https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
  41. // LICENSE:
  42. // Copyright (c) 2015 Jeff Preshing
  43. //
  44. // This software is provided 'as-is', without any express or implied
  45. // warranty. In no event will the authors be held liable for any damages
  46. // arising from the use of this software.
  47. //
  48. // Permission is granted to anyone to use this software for any purpose,
  49. // including commercial applications, and to alter it and redistribute it
  50. // freely, subject to the following restrictions:
  51. //
  52. // 1. The origin of this software must not be misrepresented; you must not
  53. // claim that you wrote the original software. If you use this software
  54. // in a product, an acknowledgement in the product documentation would be
  55. // appreciated but is not required.
  56. // 2. Altered source versions must be plainly marked as such, and must not be
  57. // misrepresented as being the original software.
  58. // 3. This notice may not be removed or altered from any source distribution.
  59. #if defined(_WIN32)
  60. class Semaphore
  61. {
  62. private:
  63. void* m_hSema;
  64. Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  65. Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  66. public:
  67. Semaphore(int initialCount = 0)
  68. {
  69. assert(initialCount >= 0);
  70. const long maxLong = 0x7fffffff;
  71. m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr);
  72. assert(m_hSema);
  73. }
  74. ~Semaphore()
  75. {
  76. CloseHandle(m_hSema);
  77. }
  78. bool wait()
  79. {
  80. const unsigned long infinite = 0xffffffff;
  81. return WaitForSingleObject(m_hSema, infinite) == 0;
  82. }
  83. bool try_wait()
  84. {
  85. return WaitForSingleObject(m_hSema, 0) == 0;
  86. }
  87. bool timed_wait(std::uint64_t usecs)
  88. {
  89. return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) == 0;
  90. }
  91. void signal(int count = 1)
  92. {
  93. while (!ReleaseSemaphore(m_hSema, count, nullptr));
  94. }
  95. };
  96. #elif defined(__MACH__)
  97. //---------------------------------------------------------
  98. // Semaphore (Apple iOS and OSX)
  99. // Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html
  100. //---------------------------------------------------------
  101. class Semaphore
  102. {
  103. private:
  104. semaphore_t m_sema;
  105. Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  106. Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  107. public:
  108. Semaphore(int initialCount = 0)
  109. {
  110. assert(initialCount >= 0);
  111. kern_return_t rc = semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
  112. assert(rc == KERN_SUCCESS);
  113. (void)rc;
  114. }
  115. ~Semaphore()
  116. {
  117. semaphore_destroy(mach_task_self(), m_sema);
  118. }
  119. bool wait()
  120. {
  121. return semaphore_wait(m_sema) == KERN_SUCCESS;
  122. }
  123. bool try_wait()
  124. {
  125. return timed_wait(0);
  126. }
  127. bool timed_wait(std::uint64_t timeout_usecs)
  128. {
  129. mach_timespec_t ts;
  130. ts.tv_sec = static_cast<unsigned int>(timeout_usecs / 1000000);
  131. ts.tv_nsec = static_cast<int>((timeout_usecs % 1000000) * 1000);
  132. // added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
  133. kern_return_t rc = semaphore_timedwait(m_sema, ts);
  134. return rc == KERN_SUCCESS;
  135. }
  136. void signal()
  137. {
  138. while (semaphore_signal(m_sema) != KERN_SUCCESS);
  139. }
  140. void signal(int count)
  141. {
  142. while (count-- > 0)
  143. {
  144. while (semaphore_signal(m_sema) != KERN_SUCCESS);
  145. }
  146. }
  147. };
  148. #elif defined(__unix__) || defined(__MVS__)
  149. //---------------------------------------------------------
  150. // Semaphore (POSIX, Linux, zOS)
  151. //---------------------------------------------------------
  152. class Semaphore
  153. {
  154. private:
  155. sem_t m_sema;
  156. Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  157. Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  158. public:
  159. Semaphore(int initialCount = 0)
  160. {
  161. assert(initialCount >= 0);
  162. int rc = sem_init(&m_sema, 0, static_cast<unsigned int>(initialCount));
  163. assert(rc == 0);
  164. (void)rc;
  165. }
  166. ~Semaphore()
  167. {
  168. sem_destroy(&m_sema);
  169. }
  170. bool wait()
  171. {
  172. // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error
  173. int rc;
  174. do {
  175. rc = sem_wait(&m_sema);
  176. } while (rc == -1 && errno == EINTR);
  177. return rc == 0;
  178. }
  179. bool try_wait()
  180. {
  181. int rc;
  182. do {
  183. rc = sem_trywait(&m_sema);
  184. } while (rc == -1 && errno == EINTR);
  185. return rc == 0;
  186. }
  187. bool timed_wait(std::uint64_t usecs)
  188. {
  189. struct timespec ts;
  190. const int usecs_in_1_sec = 1000000;
  191. const int nsecs_in_1_sec = 1000000000;
  192. #ifdef MOODYCAMEL_LIGHTWEIGHTSEMAPHORE_MONOTONIC
  193. clock_gettime(CLOCK_MONOTONIC, &ts);
  194. #else
  195. clock_gettime(CLOCK_REALTIME, &ts);
  196. #endif
  197. ts.tv_sec += (time_t)(usecs / usecs_in_1_sec);
  198. ts.tv_nsec += (long)(usecs % usecs_in_1_sec) * 1000;
  199. // sem_timedwait bombs if you have more than 1e9 in tv_nsec
  200. // so we have to clean things up before passing it in
  201. if (ts.tv_nsec >= nsecs_in_1_sec) {
  202. ts.tv_nsec -= nsecs_in_1_sec;
  203. ++ts.tv_sec;
  204. }
  205. int rc;
  206. do {
  207. #ifdef MOODYCAMEL_LIGHTWEIGHTSEMAPHORE_MONOTONIC
  208. rc = sem_clockwait(&m_sema, CLOCK_MONOTONIC, &ts);
  209. #else
  210. rc = sem_timedwait(&m_sema, &ts);
  211. #endif
  212. } while (rc == -1 && errno == EINTR);
  213. return rc == 0;
  214. }
  215. void signal()
  216. {
  217. while (sem_post(&m_sema) == -1);
  218. }
  219. void signal(int count)
  220. {
  221. while (count-- > 0)
  222. {
  223. while (sem_post(&m_sema) == -1);
  224. }
  225. }
  226. };
  227. #else
  228. #error Unsupported platform! (No semaphore wrapper available)
  229. #endif
  230. } // end namespace details
  231. //---------------------------------------------------------
  232. // LightweightSemaphore
  233. //---------------------------------------------------------
  234. class LightweightSemaphore
  235. {
  236. public:
  237. typedef std::make_signed<std::size_t>::type ssize_t;
  238. private:
  239. std::atomic<ssize_t> m_count;
  240. details::Semaphore m_sema;
  241. int m_maxSpins;
  242. bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
  243. {
  244. ssize_t oldCount;
  245. int spin = m_maxSpins;
  246. while (--spin >= 0)
  247. {
  248. oldCount = m_count.load(std::memory_order_relaxed);
  249. if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
  250. return true;
  251. std::atomic_signal_fence(std::memory_order_acquire); // Prevent the compiler from collapsing the loop.
  252. }
  253. oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
  254. if (oldCount > 0)
  255. return true;
  256. if (timeout_usecs < 0)
  257. {
  258. if (m_sema.wait())
  259. return true;
  260. }
  261. if (timeout_usecs > 0 && m_sema.timed_wait((std::uint64_t)timeout_usecs))
  262. return true;
  263. // At this point, we've timed out waiting for the semaphore, but the
  264. // count is still decremented indicating we may still be waiting on
  265. // it. So we have to re-adjust the count, but only if the semaphore
  266. // wasn't signaled enough times for us too since then. If it was, we
  267. // need to release the semaphore too.
  268. while (true)
  269. {
  270. oldCount = m_count.load(std::memory_order_acquire);
  271. if (oldCount >= 0 && m_sema.try_wait())
  272. return true;
  273. if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
  274. return false;
  275. }
  276. }
  277. ssize_t waitManyWithPartialSpinning(ssize_t max, std::int64_t timeout_usecs = -1)
  278. {
  279. assert(max > 0);
  280. ssize_t oldCount;
  281. int spin = m_maxSpins;
  282. while (--spin >= 0)
  283. {
  284. oldCount = m_count.load(std::memory_order_relaxed);
  285. if (oldCount > 0)
  286. {
  287. ssize_t newCount = oldCount > max ? oldCount - max : 0;
  288. if (m_count.compare_exchange_strong(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
  289. return oldCount - newCount;
  290. }
  291. std::atomic_signal_fence(std::memory_order_acquire);
  292. }
  293. oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
  294. if (oldCount <= 0)
  295. {
  296. if ((timeout_usecs == 0) || (timeout_usecs < 0 && !m_sema.wait()) || (timeout_usecs > 0 && !m_sema.timed_wait((std::uint64_t)timeout_usecs)))
  297. {
  298. while (true)
  299. {
  300. oldCount = m_count.load(std::memory_order_acquire);
  301. if (oldCount >= 0 && m_sema.try_wait())
  302. break;
  303. if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
  304. return 0;
  305. }
  306. }
  307. }
  308. if (max > 1)
  309. return 1 + tryWaitMany(max - 1);
  310. return 1;
  311. }
  312. public:
  313. LightweightSemaphore(ssize_t initialCount = 0, int maxSpins = 10000) : m_count(initialCount), m_maxSpins(maxSpins)
  314. {
  315. assert(initialCount >= 0);
  316. assert(maxSpins >= 0);
  317. }
  318. bool tryWait()
  319. {
  320. ssize_t oldCount = m_count.load(std::memory_order_relaxed);
  321. while (oldCount > 0)
  322. {
  323. if (m_count.compare_exchange_weak(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
  324. return true;
  325. }
  326. return false;
  327. }
  328. bool wait()
  329. {
  330. return tryWait() || waitWithPartialSpinning();
  331. }
  332. bool wait(std::int64_t timeout_usecs)
  333. {
  334. return tryWait() || waitWithPartialSpinning(timeout_usecs);
  335. }
  336. // Acquires between 0 and (greedily) max, inclusive
  337. ssize_t tryWaitMany(ssize_t max)
  338. {
  339. assert(max >= 0);
  340. ssize_t oldCount = m_count.load(std::memory_order_relaxed);
  341. while (oldCount > 0)
  342. {
  343. ssize_t newCount = oldCount > max ? oldCount - max : 0;
  344. if (m_count.compare_exchange_weak(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
  345. return oldCount - newCount;
  346. }
  347. return 0;
  348. }
  349. // Acquires at least one, and (greedily) at most max
  350. ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
  351. {
  352. assert(max >= 0);
  353. ssize_t result = tryWaitMany(max);
  354. if (result == 0 && max > 0)
  355. result = waitManyWithPartialSpinning(max, timeout_usecs);
  356. return result;
  357. }
  358. ssize_t waitMany(ssize_t max)
  359. {
  360. ssize_t result = waitMany(max, -1);
  361. assert(result > 0);
  362. return result;
  363. }
  364. void signal(ssize_t count = 1)
  365. {
  366. assert(count >= 0);
  367. ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release);
  368. ssize_t toRelease = -oldCount < count ? -oldCount : count;
  369. if (toRelease > 0)
  370. {
  371. m_sema.signal((int)toRelease);
  372. }
  373. }
  374. std::size_t availableApprox() const
  375. {
  376. ssize_t count = m_count.load(std::memory_order_relaxed);
  377. return count > 0 ? static_cast<std::size_t>(count) : 0;
  378. }
  379. };
  380. } // end namespace moodycamel