Browse Source

checkpoint

foo
Antonio SJ Musumeci 2 years ago
parent
commit
61e31eecf6
  1. 9
      libfuse/Makefile
  2. 3
      libfuse/include/fuse_lowlevel.h
  3. 8
      libfuse/lib/fuse_i.h
  4. 22
      libfuse/lib/fuse_loop_mt.cpp
  5. 9
      libfuse/lib/fuse_lowlevel.c
  6. 14
      libfuse/lib/fuse_session.c
  7. 161
      libfuse/lib/pool.hpp
  8. 256
      libfuse/lib/queue.hpp

9
libfuse/Makefile

@ -56,6 +56,12 @@ CFLAGS := \
-Wall \ -Wall \
-pipe \ -pipe \
-MMD -MMD
CXXFLAGS := \
${CXXFLAGS} \
-std=c++17 \
-Wall \
-pipe \
-MMD
FUSERMOUNT_DIR = $(BINDIR) FUSERMOUNT_DIR = $(BINDIR)
FUSE_FLAGS = \ FUSE_FLAGS = \
-Iinclude \ -Iinclude \
@ -100,6 +106,9 @@ mount.mergerfs: build/mount.mergerfs
build/%.o: lib/%.c build/%.o: lib/%.c
$(CC) $(CFLAGS) $(FUSE_FLAGS) -c $< -o $@ $(CC) $(CFLAGS) $(FUSE_FLAGS) -c $< -o $@
build/%.o: lib/%.cpp
$(CXX) $(CXXFLAGS) $(FUSE_FLAGS) -c $< -o $@
clean: clean:
rm -rf build rm -rf build

3
libfuse/include/fuse_lowlevel.h

@ -1478,7 +1478,8 @@ void *fuse_session_data(struct fuse_session *se);
int fuse_session_receive(struct fuse_session *se, int fuse_session_receive(struct fuse_session *se,
struct fuse_buf *buf); struct fuse_buf *buf);
void fuse_session_process(struct fuse_session *se, void fuse_session_process(struct fuse_session *se,
const struct fuse_buf *buf);
const void *buf,
const size_t bufsize);
int fuse_session_loop_mt(struct fuse_session *se, const int threads); int fuse_session_loop_mt(struct fuse_session *se, const int threads);

8
libfuse/lib/fuse_i.h

@ -11,6 +11,8 @@
#include "fuse.h" #include "fuse.h"
#include "fuse_lowlevel.h" #include "fuse_lowlevel.h"
#include "extern_c.h"
struct fuse_chan; struct fuse_chan;
struct fuse_ll; struct fuse_ll;
@ -21,7 +23,7 @@ struct fuse_session
struct fuse_chan *ch); struct fuse_chan *ch);
void (*process_buf)(void *data, void (*process_buf)(void *data,
const struct fuse_buf *buf,
const void *buf,
struct fuse_chan *ch); struct fuse_chan *ch);
void (*destroy)(void *data); void (*destroy)(void *data);
@ -83,6 +85,8 @@ struct fuse_cmd
struct fuse_chan *ch; struct fuse_chan *ch;
}; };
EXTERN_C_BEGIN
struct fuse *fuse_new_common(struct fuse_chan *ch, struct fuse_args *args, struct fuse *fuse_new_common(struct fuse_chan *ch, struct fuse_args *args,
const struct fuse_operations *op, const struct fuse_operations *op,
size_t op_size); size_t op_size);
@ -110,3 +114,5 @@ struct fuse *fuse_setup_common(int argc, char *argv[],
int *fd); int *fd);
int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg); int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg);
EXTERN_C_END

22
libfuse/lib/fuse_loop_mt.c → libfuse/lib/fuse_loop_mt.cpp

@ -6,6 +6,8 @@
See the file COPYING.LIB. See the file COPYING.LIB.
*/ */
#include "pool.hpp"
#include "fuse_i.h" #include "fuse_i.h"
#include "fuse_kernel.h" #include "fuse_kernel.h"
#include "fuse_lowlevel.h" #include "fuse_lowlevel.h"
@ -65,6 +67,8 @@ static void list_del_worker(struct fuse_worker *w)
static int fuse_loop_start_thread(struct fuse_mt *mt); static int fuse_loop_start_thread(struct fuse_mt *mt);
thread_pool tp;
static static
void* void*
fuse_do_work(void *data) fuse_do_work(void *data)
@ -75,13 +79,13 @@ fuse_do_work(void *data)
while(!fuse_session_exited(mt->se)) while(!fuse_session_exited(mt->se))
{ {
int res; int res;
struct fuse_buf fbuf;
struct fuse_buf fbuf = {0};
fbuf = (struct fuse_buf){ .mem = w->buf,
.size = w->bufsize };
fbuf.mem = (char*)malloc(w->bufsize);
fbuf.size = w->bufsize;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
res = fuse_session_receive(mt->se,&fbuf);
res = mt->se->receive_buf(mt->se,&fbuf,mt->se->ch);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
if(res == -EINTR) if(res == -EINTR)
continue; continue;
@ -98,7 +102,11 @@ fuse_do_work(void *data)
if(mt->exit) if(mt->exit)
return NULL; return NULL;
fuse_session_process(mt->se,&fbuf);
tp.enqueue_work([=] {
mt->se->process_buf(mt->se->data,fbuf.mem,mt->se->ch);
free(fbuf.mem);
});
} }
sem_post(&mt->finish); sem_post(&mt->finish);
@ -138,14 +146,14 @@ int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg)
static int fuse_loop_start_thread(struct fuse_mt *mt) static int fuse_loop_start_thread(struct fuse_mt *mt)
{ {
int res; int res;
struct fuse_worker *w = malloc(sizeof(struct fuse_worker));
struct fuse_worker *w = (struct fuse_worker*)malloc(sizeof(struct fuse_worker));
if(!w) { if(!w) {
fprintf(stderr, "fuse: failed to allocate worker structure\n"); fprintf(stderr, "fuse: failed to allocate worker structure\n");
return -1; return -1;
} }
memset(w, 0, sizeof(struct fuse_worker)); memset(w, 0, sizeof(struct fuse_worker));
w->bufsize = fuse_chan_bufsize(mt->se->ch); w->bufsize = fuse_chan_bufsize(mt->se->ch);
w->buf = calloc(w->bufsize,1);
w->buf = (char*)calloc(w->bufsize,1);
w->mt = mt; w->mt = mt;
if(!w->buf) { if(!w->buf) {
fprintf(stderr, "fuse: failed to allocate read buffer\n"); fprintf(stderr, "fuse: failed to allocate read buffer\n");

9
libfuse/lib/fuse_lowlevel.c

@ -1945,16 +1945,17 @@ static struct {
static static
void void
fuse_ll_process_buf(void *data,
const struct fuse_buf *buf,
struct fuse_chan *ch)
fuse_ll_process_buf(void *data,
const void *fuse_msg_buf_,
struct fuse_chan *ch)
{ {
struct fuse_ll *f = (struct fuse_ll*)data; struct fuse_ll *f = (struct fuse_ll*)data;
struct fuse_in_header *in; struct fuse_in_header *in;
struct fuse_req *req; struct fuse_req *req;
int err; int err;
in = buf->mem;
// in = buf->mem;
in = fuse_msg_buf_;
req = fuse_ll_alloc_req(f); req = fuse_ll_alloc_req(f);
if(req == NULL) if(req == NULL)

14
libfuse/lib/fuse_session.c

@ -95,20 +95,6 @@ void *fuse_session_data(struct fuse_session *se)
return se->data; return se->data;
} }
int
fuse_session_receive(struct fuse_session *se_,
struct fuse_buf *buf_)
{
return se_->receive_buf(se_,buf_,se_->ch);
}
void
fuse_session_process(struct fuse_session *se_,
const struct fuse_buf *buf_)
{
se_->process_buf(se_->data,buf_,se_->ch);
}
struct fuse_chan * struct fuse_chan *
fuse_chan_new(int fd, fuse_chan_new(int fd,
size_t bufsize) size_t bufsize)

161
libfuse/lib/pool.hpp

@ -0,0 +1,161 @@
#pragma once
#include <tuple>
#include <atomic>
#include <vector>
#include <thread>
#include <memory>
#include <future>
#include <utility>
#include <stdexcept>
#include <functional>
#include <type_traits>
#include "queue.hpp"
class simple_thread_pool
{
public:
explicit simple_thread_pool(std::size_t thread_count = std::thread::hardware_concurrency())
{
if(!thread_count)
throw std::invalid_argument("bad thread count! must be non-zero!");
auto worker = [this]()
{
while(true)
{
proc_t f;
if(!m_queue.pop(f))
break;
f();
}
};
m_threads.reserve(thread_count);
while(thread_count--)
m_threads.emplace_back(worker);
}
~simple_thread_pool()
{
m_queue.unblock();
for(auto& thread : m_threads)
thread.join();
}
template<typename F, typename... Args>
void enqueue_work(F&& f, Args&&... args)
{
m_queue.push([p = std::forward<F>(f), t = std::make_tuple(std::forward<Args>(args)...)]() { std::apply(p, t); });
}
template<typename F, typename... Args>
[[nodiscard]] auto enqueue_task(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
{
using task_return_type = std::invoke_result_t<F, Args...>;
using task_type = std::packaged_task<task_return_type()>;
auto task = std::make_shared<task_type>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
auto result = task->get_future();
m_queue.push([=]() { (*task)(); });
return result;
}
private:
using proc_t = std::function<void(void)>;
using queue_t = unbounded_queue<proc_t>;
queue_t m_queue;
using threads_t = std::vector<std::thread>;
threads_t m_threads;
};
class thread_pool
{
public:
explicit thread_pool(std::size_t thread_count = std::thread::hardware_concurrency())
: m_queues(thread_count), m_count(thread_count)
{
if(!thread_count)
throw std::invalid_argument("bad thread count! must be non-zero!");
auto worker = [this](auto i)
{
while(true)
{
proc_t f;
for(std::size_t n = 0; n < m_count * K; ++n)
if(m_queues[(i + n) % m_count].try_pop(f))
break;
if(!f && !m_queues[i].pop(f))
break;
f();
}
};
m_threads.reserve(thread_count);
for(std::size_t i = 0; i < thread_count; ++i)
m_threads.emplace_back(worker, i);
}
~thread_pool()
{
for(auto& queue : m_queues)
queue.unblock();
for(auto& thread : m_threads)
thread.join();
}
template<typename F, typename... Args>
void enqueue_work(F&& f, Args&&... args)
{
auto work = [p = std::forward<F>(f), t = std::make_tuple(std::forward<Args>(args)...)]() { std::apply(p, t); };
auto i = m_index++;
for(std::size_t n = 0; n < m_count * K; ++n)
if(m_queues[(i + n) % m_count].try_push(work))
return;
m_queues[i % m_count].push(std::move(work));
}
template<typename F, typename... Args>
[[nodiscard]] auto enqueue_task(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>>
{
using task_return_type = std::invoke_result_t<F, Args...>;
using task_type = std::packaged_task<task_return_type()>;
auto task = std::make_shared<task_type>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
auto work = [=]() { (*task)(); };
auto result = task->get_future();
auto i = m_index++;
for(auto n = 0; n < m_count * K; ++n)
if(m_queues[(i + n) % m_count].try_push(work))
return result;
m_queues[i % m_count].push(std::move(work));
return result;
}
private:
using proc_t = std::function<void(void)>;
using queue_t = unbounded_queue<proc_t>;
using queues_t = std::vector<queue_t>;
queues_t m_queues;
using threads_t = std::vector<std::thread>;
threads_t m_threads;
const std::size_t m_count;
std::atomic_uint m_index = 0;
inline static const unsigned int K = 2;
};

256
libfuse/lib/queue.hpp

@ -0,0 +1,256 @@
#pragma once
#include <mutex>
#include <queue>
#include <utility>
#include <stdexcept>
#include <condition_variable>
template<typename T>
class unbounded_queue
{
public:
explicit unbounded_queue(bool block = true)
: m_block{ block } {}
void push(const T& item)
{
{
std::scoped_lock guard(m_queue_lock);
m_queue.push(item);
}
m_condition.notify_one();
}
void push(T&& item)
{
{
std::scoped_lock guard(m_queue_lock);
m_queue.push(std::move(item));
}
m_condition.notify_one();
}
template<typename... Args>
void emplace(Args&&... args)
{
{
std::scoped_lock guard(m_queue_lock);
m_queue.emplace(std::forward<Args>(args)...);
}
m_condition.notify_one();
}
bool try_push(const T& item)
{
{
std::unique_lock lock(m_queue_lock, std::try_to_lock);
if(!lock)
return false;
m_queue.push(item);
}
m_condition.notify_one();
return true;
}
bool try_push(T&& item)
{
{
std::unique_lock lock(m_queue_lock, std::try_to_lock);
if(!lock)
return false;
m_queue.push(std::move(item));
}
m_condition.notify_one();
return true;
}
bool pop(T& item)
{
std::unique_lock guard(m_queue_lock);
m_condition.wait(guard, [&]() { return !m_queue.empty() || !m_block; });
if(m_queue.empty())
return false;
item = std::move(m_queue.front());
m_queue.pop();
return true;
}
bool try_pop(T& item)
{
std::unique_lock lock(m_queue_lock, std::try_to_lock);
if(!lock || m_queue.empty())
return false;
item = std::move(m_queue.front());
m_queue.pop();
return true;
}
std::size_t size() const
{
std::scoped_lock guard(m_queue_lock);
return m_queue.size();
}
bool empty() const
{
std::scoped_lock guard(m_queue_lock);
return m_queue.empty();
}
void block()
{
std::scoped_lock guard(m_queue_lock);
m_block = true;
}
void unblock()
{
{
std::scoped_lock guard(m_queue_lock);
m_block = false;
}
m_condition.notify_all();
}
bool blocking() const
{
std::scoped_lock guard(m_queue_lock);
return m_block;
}
private:
using queue_t = std::queue<T>;
queue_t m_queue;
bool m_block;
mutable std::mutex m_queue_lock;
std::condition_variable m_condition;
};
template<typename T>
class bounded_queue
{
public:
explicit bounded_queue(std::size_t max_size, bool block = true)
: m_block{ block }, m_max_size{ max_size }
{
if(!m_max_size)
throw std::invalid_argument("bad queue max-size! must be non-zero!");
}
bool push(const T& item)
{
{
std::unique_lock guard(m_queue_lock);
m_condition_push.wait(guard, [&]() { return m_queue.size() < m_max_size || !m_block; });
if(m_queue.size() == m_max_size)
return false;
m_queue.push(item);
}
m_condition_pop.notify_one();
return true;
}
bool push(T&& item)
{
{
std::unique_lock guard(m_queue_lock);
m_condition_push.wait(guard, [&]() { return m_queue.size() < m_max_size || !m_block; });
if(m_queue.size() == m_max_size)
return false;
m_queue.push(std::move(item));
}
m_condition_pop.notify_one();
return true;
}
template<typename... Args>
bool emplace(Args&&... args)
{
{
std::unique_lock guard(m_queue_lock);
m_condition_push.wait(guard, [&]() { return m_queue.size() < m_max_size || !m_block; });
if(m_queue.size() == m_max_size)
return false;
m_queue.emplace(std::forward<Args>(args)...);
}
m_condition_pop.notify_one();
return true;
}
bool pop(T& item)
{
{
std::unique_lock guard(m_queue_lock);
m_condition_pop.wait(guard, [&]() { return !m_queue.empty() || !m_block; });
if(m_queue.empty())
return false;
item = std::move(m_queue.front());
m_queue.pop();
}
m_condition_push.notify_one();
return true;
}
std::size_t size() const
{
std::scoped_lock guard(m_queue_lock);
return m_queue.size();
}
std::size_t capacity() const
{
return m_max_size;
}
bool empty() const
{
std::scoped_lock guard(m_queue_lock);
return m_queue.empty();
}
bool full() const
{
std::scoped_lock lock(m_queue_lock);
return m_queue.size() == capacity();
}
void block()
{
std::scoped_lock guard(m_queue_lock);
m_block = true;
}
void unblock()
{
{
std::scoped_lock guard(m_queue_lock);
m_block = false;
}
m_condition_push.notify_all();
m_condition_pop.notify_all();
}
bool blocking() const
{
std::scoped_lock guard(m_queue_lock);
return m_block;
}
private:
using queue_t = std::queue<T>;
queue_t m_queue;
bool m_block;
const std::size_t m_max_size;
mutable std::mutex m_queue_lock;
std::condition_variable m_condition_push;
std::condition_variable m_condition_pop;
};
Loading…
Cancel
Save