From 0ed03a153549c75142332cd44641ba0328df350e Mon Sep 17 00:00:00 2001 From: Antonio SJ Musumeci Date: Mon, 21 Aug 2023 13:33:58 -0500 Subject: [PATCH] Use relaxed memory order for atomic counters --- libfuse/lib/bounded_thread_pool.hpp | 7 +- libfuse/lib/fuse_msgbuf.cpp | 8 +- libfuse/lib/thread_pool.hpp | 125 ---------------------------- src/unbounded_thread_pool.hpp | 7 +- 4 files changed, 12 insertions(+), 135 deletions(-) delete mode 100644 libfuse/lib/thread_pool.hpp diff --git a/libfuse/lib/bounded_thread_pool.hpp b/libfuse/lib/bounded_thread_pool.hpp index 707fa1d8..d0e3bf2d 100644 --- a/libfuse/lib/bounded_thread_pool.hpp +++ b/libfuse/lib/bounded_thread_pool.hpp @@ -85,7 +85,7 @@ public: void enqueue_work(F&& f_) { - auto i = _index++; + auto i = _index.fetch_add(1,std::memory_order_relaxed); for(std::size_t n = 0; n < (_count * K); ++n) { @@ -104,10 +104,11 @@ public: using TaskReturnType = typename std::result_of::type; using Promise = std::promise; - auto i = _index++; + auto i = _index.fetch_add(1,std::memory_order_relaxed); auto promise = std::make_shared(); auto future = promise->get_future(); - auto work = [=]() { + auto work = [=]() + { auto rv = f_(); promise->set_value(rv); }; diff --git a/libfuse/lib/fuse_msgbuf.cpp b/libfuse/lib/fuse_msgbuf.cpp index e683874d..828a4c44 100644 --- a/libfuse/lib/fuse_msgbuf.cpp +++ b/libfuse/lib/fuse_msgbuf.cpp @@ -115,7 +115,7 @@ _msgbuf_alloc(msgbuf_setup_func_t setup_func_) if(msgbuf == NULL) return NULL; - g_MSGBUF_ALLOC_COUNT++; + g_MSGBUF_ALLOC_COUNT.fetch_add(1,std::memory_order_relaxed); } else { @@ -160,7 +160,7 @@ msgbuf_free(fuse_msgbuf_t *msgbuf_) if(msgbuf_->size != (g_BUFSIZE - g_PAGESIZE)) { msgbuf_destroy(msgbuf_); - g_MSGBUF_ALLOC_COUNT--; + g_MSGBUF_ALLOC_COUNT.fetch_sub(1,std::memory_order_relaxed); return; } @@ -206,7 +206,7 @@ msgbuf_gc_10percent() for(auto msgbuf : togc) { msgbuf_destroy(msgbuf); - g_MSGBUF_ALLOC_COUNT--; + g_MSGBUF_ALLOC_COUNT.fetch_sub(1,std::memory_order_relaxed); } } @@ -223,6 +223,6 @@ msgbuf_gc() for(auto msgbuf: oldstack) { msgbuf_destroy(msgbuf); - g_MSGBUF_ALLOC_COUNT--; + g_MSGBUF_ALLOC_COUNT.fetch_sub(1,std::memory_order_relaxed); } } diff --git a/libfuse/lib/thread_pool.hpp b/libfuse/lib/thread_pool.hpp deleted file mode 100644 index 3737a186..00000000 --- a/libfuse/lib/thread_pool.hpp +++ /dev/null @@ -1,125 +0,0 @@ -#pragma once - -#include "unbounded_queue.hpp" -#include "bounded_queue.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -class ThreadPool -{ -public: - explicit - ThreadPool(const std::size_t thread_count_ = std::thread::hardware_concurrency()) - : _queues(thread_count_), - _count(thread_count_) - { - auto worker = [this](std::size_t i) - { - while(true) - { - Proc f; - - for(std::size_t n = 0; n < (_count * K); ++n) - { - if(_queues[(i + n) % _count].try_pop(f)) - break; - } - - if(!f && !_queues[i].pop(f)) - break; - - f(); - } - }; - - _threads.reserve(thread_count_); - for(std::size_t i = 0; i < thread_count_; ++i) - _threads.emplace_back(worker, i); - } - - ~ThreadPool() - { - for(auto& queue : _queues) - queue.unblock(); - for(auto& thread : _threads) - thread.join(); - } - - template - void - enqueue_work(F&& f_) - { - auto i = _index++; - - for(std::size_t n = 0; n < (_count * K); ++n) - { - if(_queues[(i + n) % _count].try_push(f_)) - return; - } - - _queues[i % _count].push(std::move(f_)); - } - - template - [[nodiscard]] - std::future::type> - enqueue_task(F&& f_) - { - using TaskReturnType = typename std::result_of::type; - using Promise = std::promise; - - auto i = _index++; - auto promise = std::make_shared(); - auto future = promise->get_future(); - auto work = [=]() { - auto rv = f_(); - promise->set_value(rv); - }; - - for(std::size_t n = 0; n < (_count * K); ++n) - { - if(_queues[(i + n) % _count].try_push(work)) - return future; - } - - _queues[i % _count].push(std::move(work)); - - return future; - } - -public: - std::vector - threads() - { - std::vector rv; - - for(auto &thread : _threads) - rv.push_back(thread.native_handle()); - - return rv; - } - -private: - using Proc = std::function; - using Queue = UnboundedQueue; - using Queues = std::vector; - Queues _queues; - -private: - std::vector _threads; - -private: - const std::size_t _count; - std::atomic_uint _index; - - static const unsigned int K = 2; -}; diff --git a/src/unbounded_thread_pool.hpp b/src/unbounded_thread_pool.hpp index 6f02e1b3..1bfa1dcd 100644 --- a/src/unbounded_thread_pool.hpp +++ b/src/unbounded_thread_pool.hpp @@ -72,7 +72,7 @@ public: void enqueue_work(F&& f_) { - auto i = _index++; + auto i = _index.fetch_add(1,std::memory_order_relaxed); for(std::size_t n = 0; n < (_count * K); ++n) { @@ -91,10 +91,11 @@ public: using TaskReturnType = typename std::result_of::type; using Promise = std::promise; - auto i = _index++; + auto i = _index.fetch_add(1,std::memory_order_relaxed); auto promise = std::make_shared(); auto future = promise->get_future(); - auto work = [=]() { + auto work = [=]() + { auto rv = f_(); promise->set_value(rv); };