From 61e31eecf687dc23f496027eba45ee8beefb05b4 Mon Sep 17 00:00:00 2001 From: Antonio SJ Musumeci Date: Tue, 29 Nov 2022 17:24:09 -0600 Subject: [PATCH] checkpoint --- libfuse/Makefile | 9 + libfuse/include/fuse_lowlevel.h | 3 +- libfuse/lib/fuse_i.h | 8 +- .../lib/{fuse_loop_mt.c => fuse_loop_mt.cpp} | 22 +- libfuse/lib/fuse_lowlevel.c | 9 +- libfuse/lib/fuse_session.c | 14 - libfuse/lib/pool.hpp | 161 +++++++++++ libfuse/lib/queue.hpp | 256 ++++++++++++++++++ 8 files changed, 455 insertions(+), 27 deletions(-) rename libfuse/lib/{fuse_loop_mt.c => fuse_loop_mt.cpp} (90%) create mode 100644 libfuse/lib/pool.hpp create mode 100644 libfuse/lib/queue.hpp diff --git a/libfuse/Makefile b/libfuse/Makefile index 2a387f00..e38d2b73 100644 --- a/libfuse/Makefile +++ b/libfuse/Makefile @@ -56,6 +56,12 @@ CFLAGS := \ -Wall \ -pipe \ -MMD +CXXFLAGS := \ + ${CXXFLAGS} \ + -std=c++17 \ + -Wall \ + -pipe \ + -MMD FUSERMOUNT_DIR = $(BINDIR) FUSE_FLAGS = \ -Iinclude \ @@ -100,6 +106,9 @@ mount.mergerfs: build/mount.mergerfs build/%.o: lib/%.c $(CC) $(CFLAGS) $(FUSE_FLAGS) -c $< -o $@ +build/%.o: lib/%.cpp + $(CXX) $(CXXFLAGS) $(FUSE_FLAGS) -c $< -o $@ + clean: rm -rf build diff --git a/libfuse/include/fuse_lowlevel.h b/libfuse/include/fuse_lowlevel.h index e432f759..a6c7eda1 100644 --- a/libfuse/include/fuse_lowlevel.h +++ b/libfuse/include/fuse_lowlevel.h @@ -1478,7 +1478,8 @@ void *fuse_session_data(struct fuse_session *se); int fuse_session_receive(struct fuse_session *se, struct fuse_buf *buf); void fuse_session_process(struct fuse_session *se, - const struct fuse_buf *buf); + const void *buf, + const size_t bufsize); int fuse_session_loop_mt(struct fuse_session *se, const int threads); diff --git a/libfuse/lib/fuse_i.h b/libfuse/lib/fuse_i.h index 958e3e35..7ac5d837 100644 --- a/libfuse/lib/fuse_i.h +++ b/libfuse/lib/fuse_i.h @@ -11,6 +11,8 @@ #include "fuse.h" #include "fuse_lowlevel.h" +#include "extern_c.h" + struct fuse_chan; struct fuse_ll; @@ -21,7 +23,7 @@ struct fuse_session struct fuse_chan *ch); void (*process_buf)(void *data, - const struct fuse_buf *buf, + const void *buf, struct fuse_chan *ch); void (*destroy)(void *data); @@ -83,6 +85,8 @@ struct fuse_cmd struct fuse_chan *ch; }; +EXTERN_C_BEGIN + struct fuse *fuse_new_common(struct fuse_chan *ch, struct fuse_args *args, const struct fuse_operations *op, size_t op_size); @@ -110,3 +114,5 @@ struct fuse *fuse_setup_common(int argc, char *argv[], int *fd); int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg); + +EXTERN_C_END diff --git a/libfuse/lib/fuse_loop_mt.c b/libfuse/lib/fuse_loop_mt.cpp similarity index 90% rename from libfuse/lib/fuse_loop_mt.c rename to libfuse/lib/fuse_loop_mt.cpp index 687d5bbc..3825226e 100644 --- a/libfuse/lib/fuse_loop_mt.c +++ b/libfuse/lib/fuse_loop_mt.cpp @@ -6,6 +6,8 @@ See the file COPYING.LIB. */ +#include "pool.hpp" + #include "fuse_i.h" #include "fuse_kernel.h" #include "fuse_lowlevel.h" @@ -65,6 +67,8 @@ static void list_del_worker(struct fuse_worker *w) static int fuse_loop_start_thread(struct fuse_mt *mt); +thread_pool tp; + static void* fuse_do_work(void *data) @@ -75,13 +79,13 @@ fuse_do_work(void *data) while(!fuse_session_exited(mt->se)) { int res; - struct fuse_buf fbuf; + struct fuse_buf fbuf = {0}; - fbuf = (struct fuse_buf){ .mem = w->buf, - .size = w->bufsize }; + fbuf.mem = (char*)malloc(w->bufsize); + fbuf.size = w->bufsize; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - res = fuse_session_receive(mt->se,&fbuf); + res = mt->se->receive_buf(mt->se,&fbuf,mt->se->ch); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); if(res == -EINTR) continue; @@ -98,7 +102,11 @@ fuse_do_work(void *data) if(mt->exit) return NULL; - fuse_session_process(mt->se,&fbuf); + tp.enqueue_work([=] { + mt->se->process_buf(mt->se->data,fbuf.mem,mt->se->ch); + + free(fbuf.mem); + }); } sem_post(&mt->finish); @@ -138,14 +146,14 @@ int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg) static int fuse_loop_start_thread(struct fuse_mt *mt) { int res; - struct fuse_worker *w = malloc(sizeof(struct fuse_worker)); + struct fuse_worker *w = (struct fuse_worker*)malloc(sizeof(struct fuse_worker)); if(!w) { fprintf(stderr, "fuse: failed to allocate worker structure\n"); return -1; } memset(w, 0, sizeof(struct fuse_worker)); w->bufsize = fuse_chan_bufsize(mt->se->ch); - w->buf = calloc(w->bufsize,1); + w->buf = (char*)calloc(w->bufsize,1); w->mt = mt; if(!w->buf) { fprintf(stderr, "fuse: failed to allocate read buffer\n"); diff --git a/libfuse/lib/fuse_lowlevel.c b/libfuse/lib/fuse_lowlevel.c index eeb10465..fc90d052 100644 --- a/libfuse/lib/fuse_lowlevel.c +++ b/libfuse/lib/fuse_lowlevel.c @@ -1945,16 +1945,17 @@ static struct { static void -fuse_ll_process_buf(void *data, - const struct fuse_buf *buf, - struct fuse_chan *ch) +fuse_ll_process_buf(void *data, + const void *fuse_msg_buf_, + struct fuse_chan *ch) { struct fuse_ll *f = (struct fuse_ll*)data; struct fuse_in_header *in; struct fuse_req *req; int err; - in = buf->mem; + // in = buf->mem; + in = fuse_msg_buf_; req = fuse_ll_alloc_req(f); if(req == NULL) diff --git a/libfuse/lib/fuse_session.c b/libfuse/lib/fuse_session.c index cb3587de..bc56c488 100644 --- a/libfuse/lib/fuse_session.c +++ b/libfuse/lib/fuse_session.c @@ -95,20 +95,6 @@ void *fuse_session_data(struct fuse_session *se) return se->data; } -int -fuse_session_receive(struct fuse_session *se_, - struct fuse_buf *buf_) -{ - return se_->receive_buf(se_,buf_,se_->ch); -} - -void -fuse_session_process(struct fuse_session *se_, - const struct fuse_buf *buf_) -{ - se_->process_buf(se_->data,buf_,se_->ch); -} - struct fuse_chan * fuse_chan_new(int fd, size_t bufsize) diff --git a/libfuse/lib/pool.hpp b/libfuse/lib/pool.hpp new file mode 100644 index 00000000..5247426a --- /dev/null +++ b/libfuse/lib/pool.hpp @@ -0,0 +1,161 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "queue.hpp" + + + +class simple_thread_pool +{ +public: + explicit simple_thread_pool(std::size_t thread_count = std::thread::hardware_concurrency()) + { + if(!thread_count) + throw std::invalid_argument("bad thread count! must be non-zero!"); + + auto worker = [this]() + { + while(true) + { + proc_t f; + if(!m_queue.pop(f)) + break; + f(); + } + }; + + m_threads.reserve(thread_count); + while(thread_count--) + m_threads.emplace_back(worker); + } + + ~simple_thread_pool() + { + m_queue.unblock(); + for(auto& thread : m_threads) + thread.join(); + } + + template + void enqueue_work(F&& f, Args&&... args) + { + m_queue.push([p = std::forward(f), t = std::make_tuple(std::forward(args)...)]() { std::apply(p, t); }); + } + + template + [[nodiscard]] auto enqueue_task(F&& f, Args&&... args) -> std::future> + { + using task_return_type = std::invoke_result_t; + using task_type = std::packaged_task; + + auto task = std::make_shared(std::bind(std::forward(f), std::forward(args)...)); + auto result = task->get_future(); + + m_queue.push([=]() { (*task)(); }); + + return result; + } + +private: + using proc_t = std::function; + using queue_t = unbounded_queue; + queue_t m_queue; + + using threads_t = std::vector; + threads_t m_threads; +}; + + + +class thread_pool +{ +public: + explicit thread_pool(std::size_t thread_count = std::thread::hardware_concurrency()) + : m_queues(thread_count), m_count(thread_count) + { + if(!thread_count) + throw std::invalid_argument("bad thread count! must be non-zero!"); + + auto worker = [this](auto i) + { + while(true) + { + proc_t f; + for(std::size_t n = 0; n < m_count * K; ++n) + if(m_queues[(i + n) % m_count].try_pop(f)) + break; + if(!f && !m_queues[i].pop(f)) + break; + f(); + } + }; + + m_threads.reserve(thread_count); + for(std::size_t i = 0; i < thread_count; ++i) + m_threads.emplace_back(worker, i); + } + + ~thread_pool() + { + for(auto& queue : m_queues) + queue.unblock(); + for(auto& thread : m_threads) + thread.join(); + } + + template + void enqueue_work(F&& f, Args&&... args) + { + auto work = [p = std::forward(f), t = std::make_tuple(std::forward(args)...)]() { std::apply(p, t); }; + auto i = m_index++; + + for(std::size_t n = 0; n < m_count * K; ++n) + if(m_queues[(i + n) % m_count].try_push(work)) + return; + + m_queues[i % m_count].push(std::move(work)); + } + + template + [[nodiscard]] auto enqueue_task(F&& f, Args&&... args) -> std::future> + { + using task_return_type = std::invoke_result_t; + using task_type = std::packaged_task; + + auto task = std::make_shared(std::bind(std::forward(f), std::forward(args)...)); + auto work = [=]() { (*task)(); }; + auto result = task->get_future(); + auto i = m_index++; + + for(auto n = 0; n < m_count * K; ++n) + if(m_queues[(i + n) % m_count].try_push(work)) + return result; + + m_queues[i % m_count].push(std::move(work)); + + return result; + } + +private: + using proc_t = std::function; + using queue_t = unbounded_queue; + using queues_t = std::vector; + queues_t m_queues; + + using threads_t = std::vector; + threads_t m_threads; + + const std::size_t m_count; + std::atomic_uint m_index = 0; + + inline static const unsigned int K = 2; +}; diff --git a/libfuse/lib/queue.hpp b/libfuse/lib/queue.hpp new file mode 100644 index 00000000..1ffc5287 --- /dev/null +++ b/libfuse/lib/queue.hpp @@ -0,0 +1,256 @@ +#pragma once + +#include +#include +#include +#include +#include + + + +template +class unbounded_queue +{ +public: + explicit unbounded_queue(bool block = true) + : m_block{ block } {} + + void push(const T& item) + { + { + std::scoped_lock guard(m_queue_lock); + m_queue.push(item); + } + m_condition.notify_one(); + } + + void push(T&& item) + { + { + std::scoped_lock guard(m_queue_lock); + m_queue.push(std::move(item)); + } + m_condition.notify_one(); + } + + template + void emplace(Args&&... args) + { + { + std::scoped_lock guard(m_queue_lock); + m_queue.emplace(std::forward(args)...); + } + m_condition.notify_one(); + } + + bool try_push(const T& item) + { + { + std::unique_lock lock(m_queue_lock, std::try_to_lock); + if(!lock) + return false; + m_queue.push(item); + } + m_condition.notify_one(); + return true; + } + + bool try_push(T&& item) + { + { + std::unique_lock lock(m_queue_lock, std::try_to_lock); + if(!lock) + return false; + m_queue.push(std::move(item)); + } + m_condition.notify_one(); + return true; + } + + bool pop(T& item) + { + std::unique_lock guard(m_queue_lock); + m_condition.wait(guard, [&]() { return !m_queue.empty() || !m_block; }); + if(m_queue.empty()) + return false; + item = std::move(m_queue.front()); + m_queue.pop(); + return true; + } + + bool try_pop(T& item) + { + std::unique_lock lock(m_queue_lock, std::try_to_lock); + if(!lock || m_queue.empty()) + return false; + item = std::move(m_queue.front()); + m_queue.pop(); + return true; + } + + std::size_t size() const + { + std::scoped_lock guard(m_queue_lock); + return m_queue.size(); + } + + bool empty() const + { + std::scoped_lock guard(m_queue_lock); + return m_queue.empty(); + } + + void block() + { + std::scoped_lock guard(m_queue_lock); + m_block = true; + } + + void unblock() + { + { + std::scoped_lock guard(m_queue_lock); + m_block = false; + } + m_condition.notify_all(); + } + + bool blocking() const + { + std::scoped_lock guard(m_queue_lock); + return m_block; + } + +private: + using queue_t = std::queue; + queue_t m_queue; + + bool m_block; + + mutable std::mutex m_queue_lock; + std::condition_variable m_condition; +}; + + + +template +class bounded_queue +{ +public: + explicit bounded_queue(std::size_t max_size, bool block = true) + : m_block{ block }, m_max_size{ max_size } + { + if(!m_max_size) + throw std::invalid_argument("bad queue max-size! must be non-zero!"); + } + + bool push(const T& item) + { + { + std::unique_lock guard(m_queue_lock); + m_condition_push.wait(guard, [&]() { return m_queue.size() < m_max_size || !m_block; }); + if(m_queue.size() == m_max_size) + return false; + m_queue.push(item); + } + m_condition_pop.notify_one(); + return true; + } + + bool push(T&& item) + { + { + std::unique_lock guard(m_queue_lock); + m_condition_push.wait(guard, [&]() { return m_queue.size() < m_max_size || !m_block; }); + if(m_queue.size() == m_max_size) + return false; + m_queue.push(std::move(item)); + } + m_condition_pop.notify_one(); + return true; + } + + template + bool emplace(Args&&... args) + { + { + std::unique_lock guard(m_queue_lock); + m_condition_push.wait(guard, [&]() { return m_queue.size() < m_max_size || !m_block; }); + if(m_queue.size() == m_max_size) + return false; + m_queue.emplace(std::forward(args)...); + } + m_condition_pop.notify_one(); + return true; + } + + bool pop(T& item) + { + { + std::unique_lock guard(m_queue_lock); + m_condition_pop.wait(guard, [&]() { return !m_queue.empty() || !m_block; }); + if(m_queue.empty()) + return false; + item = std::move(m_queue.front()); + m_queue.pop(); + } + m_condition_push.notify_one(); + return true; + } + + std::size_t size() const + { + std::scoped_lock guard(m_queue_lock); + return m_queue.size(); + } + + std::size_t capacity() const + { + return m_max_size; + } + + bool empty() const + { + std::scoped_lock guard(m_queue_lock); + return m_queue.empty(); + } + + bool full() const + { + std::scoped_lock lock(m_queue_lock); + return m_queue.size() == capacity(); + } + + void block() + { + std::scoped_lock guard(m_queue_lock); + m_block = true; + } + + void unblock() + { + { + std::scoped_lock guard(m_queue_lock); + m_block = false; + } + m_condition_push.notify_all(); + m_condition_pop.notify_all(); + } + + bool blocking() const + { + std::scoped_lock guard(m_queue_lock); + return m_block; + } + +private: + using queue_t = std::queue; + queue_t m_queue; + + bool m_block; + const std::size_t m_max_size; + + mutable std::mutex m_queue_lock; + std::condition_variable m_condition_push; + std::condition_variable m_condition_pop; +};