Browse Source

Use relaxed memory order for atomic counters

pull/1233/head
Antonio SJ Musumeci 10 months ago
parent
commit
0ed03a1535
  1. 7
      libfuse/lib/bounded_thread_pool.hpp
  2. 8
      libfuse/lib/fuse_msgbuf.cpp
  3. 125
      libfuse/lib/thread_pool.hpp
  4. 7
      src/unbounded_thread_pool.hpp

7
libfuse/lib/bounded_thread_pool.hpp

@ -85,7 +85,7 @@ public:
void
enqueue_work(F&& f_)
{
auto i = _index++;
auto i = _index.fetch_add(1,std::memory_order_relaxed);
for(std::size_t n = 0; n < (_count * K); ++n)
{
@ -104,10 +104,11 @@ public:
using TaskReturnType = typename std::result_of<F()>::type;
using Promise = std::promise<TaskReturnType>;
auto i = _index++;
auto i = _index.fetch_add(1,std::memory_order_relaxed);
auto promise = std::make_shared<Promise>();
auto future = promise->get_future();
auto work = [=]() {
auto work = [=]()
{
auto rv = f_();
promise->set_value(rv);
};

8
libfuse/lib/fuse_msgbuf.cpp

@ -115,7 +115,7 @@ _msgbuf_alloc(msgbuf_setup_func_t setup_func_)
if(msgbuf == NULL)
return NULL;
g_MSGBUF_ALLOC_COUNT++;
g_MSGBUF_ALLOC_COUNT.fetch_add(1,std::memory_order_relaxed);
}
else
{
@ -160,7 +160,7 @@ msgbuf_free(fuse_msgbuf_t *msgbuf_)
if(msgbuf_->size != (g_BUFSIZE - g_PAGESIZE))
{
msgbuf_destroy(msgbuf_);
g_MSGBUF_ALLOC_COUNT--;
g_MSGBUF_ALLOC_COUNT.fetch_sub(1,std::memory_order_relaxed);
return;
}
@ -206,7 +206,7 @@ msgbuf_gc_10percent()
for(auto msgbuf : togc)
{
msgbuf_destroy(msgbuf);
g_MSGBUF_ALLOC_COUNT--;
g_MSGBUF_ALLOC_COUNT.fetch_sub(1,std::memory_order_relaxed);
}
}
@ -223,6 +223,6 @@ msgbuf_gc()
for(auto msgbuf: oldstack)
{
msgbuf_destroy(msgbuf);
g_MSGBUF_ALLOC_COUNT--;
g_MSGBUF_ALLOC_COUNT.fetch_sub(1,std::memory_order_relaxed);
}
}

125
libfuse/lib/thread_pool.hpp

@ -1,125 +0,0 @@
#pragma once
#include "unbounded_queue.hpp"
#include "bounded_queue.hpp"
#include <tuple>
#include <atomic>
#include <vector>
#include <thread>
#include <memory>
#include <future>
#include <utility>
#include <functional>
#include <type_traits>
class ThreadPool
{
public:
explicit
ThreadPool(const std::size_t thread_count_ = std::thread::hardware_concurrency())
: _queues(thread_count_),
_count(thread_count_)
{
auto worker = [this](std::size_t i)
{
while(true)
{
Proc f;
for(std::size_t n = 0; n < (_count * K); ++n)
{
if(_queues[(i + n) % _count].try_pop(f))
break;
}
if(!f && !_queues[i].pop(f))
break;
f();
}
};
_threads.reserve(thread_count_);
for(std::size_t i = 0; i < thread_count_; ++i)
_threads.emplace_back(worker, i);
}
~ThreadPool()
{
for(auto& queue : _queues)
queue.unblock();
for(auto& thread : _threads)
thread.join();
}
template<typename F>
void
enqueue_work(F&& f_)
{
auto i = _index++;
for(std::size_t n = 0; n < (_count * K); ++n)
{
if(_queues[(i + n) % _count].try_push(f_))
return;
}
_queues[i % _count].push(std::move(f_));
}
template<typename F>
[[nodiscard]]
std::future<typename std::result_of<F()>::type>
enqueue_task(F&& f_)
{
using TaskReturnType = typename std::result_of<F()>::type;
using Promise = std::promise<TaskReturnType>;
auto i = _index++;
auto promise = std::make_shared<Promise>();
auto future = promise->get_future();
auto work = [=]() {
auto rv = f_();
promise->set_value(rv);
};
for(std::size_t n = 0; n < (_count * K); ++n)
{
if(_queues[(i + n) % _count].try_push(work))
return future;
}
_queues[i % _count].push(std::move(work));
return future;
}
public:
std::vector<pthread_t>
threads()
{
std::vector<pthread_t> rv;
for(auto &thread : _threads)
rv.push_back(thread.native_handle());
return rv;
}
private:
using Proc = std::function<void(void)>;
using Queue = UnboundedQueue<Proc>;
using Queues = std::vector<Queue>;
Queues _queues;
private:
std::vector<std::thread> _threads;
private:
const std::size_t _count;
std::atomic_uint _index;
static const unsigned int K = 2;
};

7
src/unbounded_thread_pool.hpp

@ -72,7 +72,7 @@ public:
void
enqueue_work(F&& f_)
{
auto i = _index++;
auto i = _index.fetch_add(1,std::memory_order_relaxed);
for(std::size_t n = 0; n < (_count * K); ++n)
{
@ -91,10 +91,11 @@ public:
using TaskReturnType = typename std::result_of<F()>::type;
using Promise = std::promise<TaskReturnType>;
auto i = _index++;
auto i = _index.fetch_add(1,std::memory_order_relaxed);
auto promise = std::make_shared<Promise>();
auto future = promise->get_future();
auto work = [=]() {
auto work = [=]()
{
auto rv = f_();
promise->set_value(rv);
};

Loading…
Cancel
Save