From 5ab0fbcaeeed12f29c4104d01e48636058b6db27 Mon Sep 17 00:00:00 2001 From: Antonio SJ Musumeci Date: Sun, 23 Apr 2023 23:33:48 -0400 Subject: [PATCH] Add manual GC triggering + configurable process queue depth Yes, these are unrelated changes but somehow ended up being prototyped together and I'm too lazy to separate them. --- README.md | 44 ++- libfuse/Makefile | 12 +- libfuse/include/fuse.h | 5 + libfuse/include/fuse_lowlevel.h | 1 + libfuse/include/fuse_msgbuf.h | 3 +- libfuse/lib/bounded_queue.hpp | 31 +-- libfuse/lib/bounded_thread_pool.hpp | 143 ++++++++++ libfuse/lib/fuse.c | 401 ++++++++++++++++------------ libfuse/lib/fuse_loop_mt.cpp | 267 +++++++++--------- libfuse/lib/fuse_msgbuf.cpp | 106 ++++++-- libfuse/lib/fuse_msgbuf.hpp | 9 +- libfuse/lib/fuse_mt.c | 1 + libfuse/lib/fuse_signals.c | 16 +- libfuse/lib/lock.h | 13 + libfuse/lib/make_unique.hpp | 41 +++ libfuse/lib/node.c | 60 +++++ libfuse/lib/node.h | 30 +++ libfuse/lib/scope_guard.hpp | 369 +++++++++++++++++++++++++ libfuse/lib/syslog.c | 110 ++++++++ libfuse/lib/syslog.h | 30 +++ libfuse/lib/thread_pool.hpp | 1 + man/mergerfs.1 | 49 +++- src/config.cpp | 3 + src/config.hpp | 1 + src/fuse_ioctl.cpp | 14 +- src/mergerfs.cpp | 26 ++ src/option_parser.cpp | 1 + 27 files changed, 1442 insertions(+), 345 deletions(-) create mode 100644 libfuse/lib/bounded_thread_pool.hpp create mode 100644 libfuse/lib/lock.h create mode 100644 libfuse/lib/make_unique.hpp create mode 100644 libfuse/lib/node.c create mode 100644 libfuse/lib/node.h create mode 100644 libfuse/lib/scope_guard.hpp create mode 100644 libfuse/lib/syslog.c create mode 100644 libfuse/lib/syslog.h diff --git a/README.md b/README.md index 98263926..de37f70b 100644 --- a/README.md +++ b/README.md @@ -241,6 +241,11 @@ These options are the same regardless of whether you use them with the `read-thread-count` refers to the number of threads reading FUSE messages which are dispatched to process threads. -1 means disabled otherwise acts like `read-thread-count`. (default: -1) +* **process-thread-queue-depth=UINT**: Sets the number of requests any + single process thread can have queued up at one time. Meaning the + total memory usage of the queues is queue depth multiplied by the + number of process threads plus read thread count. 0 sets the depth + to the same as the process thread count. (default: 0) * **pin-threads=STR**: Selects a strategy to pin threads to CPUs (default: unset) * **scheduling-priority=INT**: Set mergerfs' scheduling @@ -1123,7 +1128,9 @@ issue: `umount -l `. Or you can let mergerfs do it by setting the option `lazy-umount-mountpoint=true`. -# RUNTIME CONFIG +# RUNTIME INTERFACES + +## RUNTIME CONFIG #### .mergerfs pseudo file #### @@ -1206,6 +1213,41 @@ following: * **user.mergerfs.allpaths**: a NUL ('\0') separated list of full paths to all files found +## SIGNALS + +* USR1: This will cause mergerfs to send invalidation notifications to + the kernel for all files. This will cause all unused files to be + released from memory. +* USR2: Trigger a general cleanup of currently unused memory. A more + thorough version of what happens every ~15 minutes. + + +## IOCTLS + +Found in `fuse_ioctl.cpp`: + +```C++ +typedef char IOCTL_BUF[4096]; +#define IOCTL_APP_TYPE 0xDF +#define IOCTL_FILE_INFO _IOWR(IOCTL_APP_TYPE,0,IOCTL_BUF) +#define IOCTL_GC _IO(IOCTL_APP_TYPE,1) +#define IOCTL_GC1 _IO(IOCTL_APP_TYPE,2) +#define IOCTL_INVALIDATE_ALL_NODES _IO(IOCTL_APP_TYPE,3) +``` + +* IOCTL\_FILE\_INFO: Same as the "file / directory xattrs" mentioned + above. Use a buffer size of 4096 bytes. Pass in a string of + "basepath", "relpath", "fullpath", or "allpaths". Receive details in + same buffer. +* IOCTL\_GC: Triggers a thorough garbage collection of excess + memory. Same as SIGUSR2. +* IOCTL\_GC1: Triggers a simple garbage collection of excess + memory. Same as what happens every 15 minutes normally. +* IOCTL\_INVALIDATE\_ALL\_NODES: Same as SIGUSR1. Send invalidation + notifications to the kernel for all files causing unused files to be + released from memory. + + # TOOLING * https://github.com/trapexit/mergerfs-tools diff --git a/libfuse/Makefile b/libfuse/Makefile index 8888d552..e68a5c7e 100644 --- a/libfuse/Makefile +++ b/libfuse/Makefile @@ -15,6 +15,12 @@ else OPT_FLAGS := -O2 endif +ifeq ($(LTO),1) +LTO_FLAGS := -flto +else +LTO_FLAGS := +endif + DESTDIR = PREFIX = /usr/local EXEC_PREFIX = $(PREFIX) @@ -39,12 +45,14 @@ SRC_C = \ lib/fuse_dirents.c \ lib/fuse_lowlevel.c \ lib/fuse_mt.c \ + lib/node.c \ lib/fuse_node.c \ lib/fuse_opt.c \ lib/fuse_session.c \ lib/fuse_signals.c \ lib/helper.c \ - lib/mount.c + lib/mount.c \ + lib/syslog.c SRC_CPP = \ lib/format.cpp \ lib/os.cpp \ @@ -59,12 +67,14 @@ CFLAGS ?= \ $(OPT_FLAGS) CFLAGS := \ ${CFLAGS} \ + $(LTO_FLAGS) \ -std=gnu99 \ -Wall \ -pipe \ -MMD CXXFLAGS := \ ${CXXFLAGS} \ + $(LTO_FLAGS) \ -std=c++11 \ -Wall \ -pipe \ diff --git a/libfuse/include/fuse.h b/libfuse/include/fuse.h index 567b0eec..c86c4536 100644 --- a/libfuse/include/fuse.h +++ b/libfuse/include/fuse.h @@ -629,6 +629,7 @@ void fuse_exit(struct fuse *f); int fuse_config_read_thread_count(const struct fuse *f); int fuse_config_process_thread_count(const struct fuse *f); +int fuse_config_process_thread_queue_depth(const struct fuse *f); const char* fuse_config_pin_threads(const struct fuse *f); /** @@ -764,6 +765,10 @@ void fuse_set_getcontext_func(struct fuse_context *(*func)(void)); /** Get session from fuse object */ struct fuse_session *fuse_get_session(struct fuse *f); +void fuse_gc1(); +void fuse_gc(); +void fuse_invalidate_all_nodes(); + EXTERN_C_END #endif /* _FUSE_H_ */ diff --git a/libfuse/include/fuse_lowlevel.h b/libfuse/include/fuse_lowlevel.h index 25670657..aedc1fcd 100644 --- a/libfuse/include/fuse_lowlevel.h +++ b/libfuse/include/fuse_lowlevel.h @@ -1460,6 +1460,7 @@ void fuse_session_process(struct fuse_session *se, int fuse_session_loop_mt(struct fuse_session *se, const int read_thread_count, const int process_thread_count, + const int process_thread_queue_depth, const char *pin_threads_type); /* ----------------------------------------------------------- * diff --git a/libfuse/include/fuse_msgbuf.h b/libfuse/include/fuse_msgbuf.h index 2a56b651..ee501de3 100644 --- a/libfuse/include/fuse_msgbuf.h +++ b/libfuse/include/fuse_msgbuf.h @@ -5,7 +5,6 @@ typedef struct fuse_msgbuf_t fuse_msgbuf_t; struct fuse_msgbuf_t { - char *mem; uint32_t size; - uint32_t used; + char *mem; }; diff --git a/libfuse/lib/bounded_queue.hpp b/libfuse/lib/bounded_queue.hpp index 3e58da77..6761647c 100644 --- a/libfuse/lib/bounded_queue.hpp +++ b/libfuse/lib/bounded_queue.hpp @@ -13,25 +13,26 @@ public: explicit BoundedQueue(std::size_t max_size_, bool block_ = true) - : _block(block), - _max_size(max_size_) + : _block(block_), + _max_size(max_size_ ? max_size_ : 1) { - if(_max_size == 0) - _max_size = 1; } + BoundedQueue(const BoundedQueue&) = delete; + BoundedQueue(BoundedQueue&&) = default; + bool push(const T& item_) { { - std::unique_lock guard(_queue_lock); + std::unique_lock guard(_queue_lock); _condition_push.wait(guard, [&]() { return _queue.size() < _max_size || !_block; }); if(_queue.size() == _max_size) return false; - _queue.push(item); + _queue.push(item_); } _condition_pop.notify_one(); @@ -43,7 +44,7 @@ public: push(T&& item_) { { - std::unique_lock guard(_queue_lock); + std::unique_lock guard(_queue_lock); _condition_push.wait(guard, [&]() { return _queue.size() < _max_size || !_block; }); @@ -61,7 +62,7 @@ public: emplace(Args&&... args_) { { - std::unique_lock guard(_queue_lock); + std::lock_guard guard(_queue_lock); _condition_push.wait(guard, [&]() { return _queue.size() < _max_size || !_block; }); @@ -80,7 +81,7 @@ public: pop(T& item_) { { - std::unique_lock guard(_queue_lock); + std::unique_lock guard(_queue_lock); _condition_pop.wait(guard, [&]() { return !_queue.empty() || !_block; }); if(_queue.empty()) @@ -99,7 +100,7 @@ public: std::size_t size() const { - std::lock_guard guard(_queue_lock); + std::lock_guard guard(_queue_lock); return _queue.size(); } @@ -113,7 +114,7 @@ public: bool empty() const { - std::lock_guard guard(_queue_lock); + std::lock_guard guard(_queue_lock); return _queue.empty(); } @@ -121,7 +122,7 @@ public: bool full() const { - std::lock_guard lock(_queue_lock); + std::lock_guard lock(_queue_lock); return (_queue.size() == capacity()); } @@ -129,7 +130,7 @@ public: void block() { - std::lock_guard guard(_queue_lock); + std::lock_guard guard(_queue_lock); _block = true; } @@ -137,7 +138,7 @@ public: unblock() { { - std::lock_guard guard(_queue_lock); + std::lock_guard guard(_queue_lock); _block = false; } @@ -148,7 +149,7 @@ public: bool blocking() const { - std::lock_guard guard(_queue_lock); + std::lock_guard guard(_queue_lock); return _block; } diff --git a/libfuse/lib/bounded_thread_pool.hpp b/libfuse/lib/bounded_thread_pool.hpp new file mode 100644 index 00000000..5a31bfcc --- /dev/null +++ b/libfuse/lib/bounded_thread_pool.hpp @@ -0,0 +1,143 @@ +#pragma once + +#include "bounded_queue.hpp" +#include "make_unique.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +class BoundedThreadPool +{ +private: + using Proc = std::function; + using Queue = BoundedQueue; + using Queues = std::vector>; + +public: + explicit + BoundedThreadPool(const std::size_t thread_count_ = std::thread::hardware_concurrency(), + const std::size_t queue_depth_ = 1) + : _queues(), + _count(thread_count_) + { + for(std::size_t i = 0; i < thread_count_; i++) + _queues.emplace_back(std::make_unique(queue_depth_)); + + auto worker = [this](std::size_t i) + { + while(true) + { + Proc f; + + for(std::size_t n = 0; n < (_count * K); ++n) + { + if(_queues[(i + n) % _count]->pop(f)) + break; + } + + if(!f && !_queues[i]->pop(f)) + break; + + f(); + } + }; + + sigset_t oldset; + sigset_t newset; + + sigfillset(&newset); + pthread_sigmask(SIG_BLOCK,&newset,&oldset); + + _threads.reserve(thread_count_); + for(std::size_t i = 0; i < thread_count_; ++i) + _threads.emplace_back(worker, i); + + pthread_sigmask(SIG_SETMASK,&oldset,NULL); + } + + ~BoundedThreadPool() + { + for(auto &queue : _queues) + queue->unblock(); + for(auto &thread : _threads) + pthread_cancel(thread.native_handle()); + for(auto &thread : _threads) + thread.join(); + } + + template + void + enqueue_work(F&& f_) + { + auto i = _index++; + + for(std::size_t n = 0; n < (_count * K); ++n) + { + if(_queues[(i + n) % _count]->push(f_)) + return; + } + + _queues[i % _count]->push(std::move(f_)); + } + + template + [[nodiscard]] + std::future::type> + enqueue_task(F&& f_) + { + using TaskReturnType = typename std::result_of::type; + using Promise = std::promise; + + auto i = _index++; + auto promise = std::make_shared(); + auto future = promise->get_future(); + auto work = [=]() { + auto rv = f_(); + promise->set_value(rv); + }; + + for(std::size_t n = 0; n < (_count * K); ++n) + { + if(_queues[(i + n) % _count]->push(work)) + return future; + } + + _queues[i % _count]->push(std::move(work)); + + return future; + } + +public: + std::vector + threads() + { + std::vector rv; + + for(auto &thread : _threads) + rv.push_back(thread.native_handle()); + + return rv; + } + +private: + Queues _queues; + +private: + std::vector _threads; + +private: + const std::size_t _count; + std::atomic_uint _index; + + static const unsigned int K = 2; +}; diff --git a/libfuse/lib/fuse.c b/libfuse/lib/fuse.c index 73c9aa09..b11f24b1 100644 --- a/libfuse/lib/fuse.c +++ b/libfuse/lib/fuse.c @@ -13,8 +13,8 @@ #include "fuse_node.h" #include "khash.h" #include "kvec.h" -#include "lfmp.h" +#include "node.h" #include "config.h" #include "fuse_dirents.h" #include "fuse_i.h" @@ -74,6 +74,7 @@ struct fuse_config int help; int read_thread_count; int process_thread_count; + int process_thread_queue_depth; char *pin_threads; }; @@ -89,21 +90,21 @@ struct lock_queue_element uint64_t nodeid1; const char *name1; char **path1; - struct node **wnode1; + node_t **wnode1; uint64_t nodeid2; const char *name2; char **path2; - struct node **wnode2; + node_t **wnode2; int err; bool done : 1; }; struct node_table { - struct node **array; - size_t use; - size_t size; - size_t split; + node_t **array; + size_t use; + size_t size; + size_t split; }; #define container_of(ptr,type,member) ({ \ @@ -122,8 +123,8 @@ struct list_head typedef struct remembered_node_t remembered_node_t; struct remembered_node_t { - struct node *node; - time_t time; + node_t *node; + time_t time; }; typedef struct nodeid_gen_t nodeid_gen_t; @@ -146,7 +147,6 @@ struct fuse struct lock_queue_element *lockq; pthread_t maintenance_thread; - lfmp_t node_fmp; kvec_t(remembered_node_t) remembered_nodes; }; @@ -160,28 +160,6 @@ struct lock struct lock *next; }; -struct node -{ - struct node *name_next; - struct node *id_next; - - uint64_t nodeid; - char *name; - struct node *parent; - - uint64_t nlookup; - uint32_t refctr; - uint32_t open_count; - uint64_t hidden_fh; - - int32_t treelock; - struct lock *locks; - - uint32_t stat_crc32b; - uint8_t is_stat_cache_valid:1; -}; - - #define TREELOCK_WRITE -1 #define TREELOCK_WAIT_OFFSET INT_MIN @@ -283,21 +261,6 @@ list_del(struct list_head *entry) prev->next = next; } -static -struct node* -alloc_node(struct fuse *f) -{ - return lfmp_calloc(&f->node_fmp); -} - -static -void -free_node_mem(struct fuse *f, - struct node *node) -{ - return lfmp_free(&f->node_fmp,node); -} - static size_t id_hash(struct fuse *f, @@ -313,12 +276,12 @@ id_hash(struct fuse *f, } static -struct node* +node_t* get_node_nocheck(struct fuse *f, uint64_t nodeid) { size_t hash = id_hash(f,nodeid); - struct node *node; + node_t *node; for(node = f->id_table.array[hash]; node != NULL; node = node->id_next) if(node->nodeid == nodeid) @@ -328,11 +291,11 @@ get_node_nocheck(struct fuse *f, } static -struct node* +node_t* get_node(struct fuse *f, const uint64_t nodeid) { - struct node *node = get_node_nocheck(f,nodeid); + node_t *node = get_node_nocheck(f,nodeid); if(!node) { @@ -347,7 +310,7 @@ get_node(struct fuse *f, static void remove_remembered_node(struct fuse *f_, - struct node *node_) + node_t *node_) { for(size_t i = 0; i < kv_size(f_->remembered_nodes); i++) { @@ -402,14 +365,14 @@ current_time() static void free_node(struct fuse *f_, - struct node *node_) + node_t *node_) { filename_free(f_,node_->name); if(node_->hidden_fh) f_->fs->op.free_hide(node_->hidden_fh); - free_node_mem(f_,node_); + node_free(node_); } static @@ -422,7 +385,7 @@ node_table_reduce(struct node_table *t) if(newsize < NODE_TABLE_MIN_SIZE) return; - newarray = realloc(t->array,sizeof(struct node *)* newsize); + newarray = realloc(t->array,sizeof(node_t*) * newsize); if(newarray != NULL) t->array = newarray; @@ -442,13 +405,13 @@ remerge_id(struct fuse *f) for(iter = 8; t->split > 0 && iter; iter--) { - struct node **upper; + node_t **upper; t->split--; upper = &t->array[t->split + t->size / 2]; if(*upper) { - struct node **nodep; + node_t **nodep; for(nodep = &t->array[t->split]; *nodep; nodep = &(*nodep)->id_next); @@ -463,9 +426,9 @@ remerge_id(struct fuse *f) static void unhash_id(struct fuse *f, - struct node *node) + node_t *node) { - struct node **nodep = &f->id_table.array[id_hash(f,node->nodeid)]; + node_t **nodep = &f->id_table.array[id_hash(f,node->nodeid)]; for(; *nodep != NULL; nodep = &(*nodep)->id_next) if(*nodep == node) @@ -486,12 +449,12 @@ node_table_resize(struct node_table *t) size_t newsize = t->size * 2; void *newarray; - newarray = realloc(t->array,sizeof(struct node *)* newsize); + newarray = realloc(t->array,sizeof(node_t*) * newsize); if(newarray == NULL) return -1; t->array = newarray; - memset(t->array + t->size,0,t->size * sizeof(struct node *)); + memset(t->array + t->size,0,t->size * sizeof(node_t*)); t->size = newsize; t->split = 0; @@ -503,8 +466,8 @@ void rehash_id(struct fuse *f) { struct node_table *t = &f->id_table; - struct node **nodep; - struct node **next; + node_t **nodep; + node_t **next; size_t hash; if(t->split == t->size / 2) @@ -514,7 +477,7 @@ rehash_id(struct fuse *f) t->split++; for(nodep = &t->array[hash]; *nodep != NULL; nodep = next) { - struct node *node = *nodep; + node_t *node = *nodep; size_t newhash = id_hash(f,node->nodeid); if(newhash != hash) @@ -537,7 +500,7 @@ rehash_id(struct fuse *f) static void hash_id(struct fuse *f, - struct node *node) + node_t *node) { size_t hash; @@ -573,7 +536,7 @@ name_hash(struct fuse *f, static void unref_node(struct fuse *f, - struct node *node); + node_t *node); static void @@ -587,13 +550,13 @@ remerge_name(struct fuse *f) for(iter = 8; t->split > 0 && iter; iter--) { - struct node **upper; + node_t **upper; t->split--; upper = &t->array[t->split + t->size / 2]; if(*upper) { - struct node **nodep; + node_t **nodep; for(nodep = &t->array[t->split]; *nodep; nodep = &(*nodep)->name_next); @@ -607,12 +570,12 @@ remerge_name(struct fuse *f) static void unhash_name(struct fuse *f, - struct node *node) + node_t *node) { if(node->name) { size_t hash = name_hash(f,node->parent->nodeid,node->name); - struct node **nodep = &f->name_table.array[hash]; + node_t **nodep = &f->name_table.array[hash]; for(; *nodep != NULL; nodep = &(*nodep)->name_next) if(*nodep == node) @@ -643,8 +606,8 @@ void rehash_name(struct fuse *f) { struct node_table *t = &f->name_table; - struct node **nodep; - struct node **next; + node_t **nodep; + node_t **next; size_t hash; if(t->split == t->size / 2) @@ -654,7 +617,7 @@ rehash_name(struct fuse *f) t->split++; for(nodep = &t->array[hash]; *nodep != NULL; nodep = next) { - struct node *node = *nodep; + node_t *node = *nodep; size_t newhash = name_hash(f,node->parent->nodeid,node->name); if(newhash != hash) @@ -677,12 +640,12 @@ rehash_name(struct fuse *f) static int hash_name(struct fuse *f, - struct node *node, + node_t *node, uint64_t parentid, const char *name) { size_t hash = name_hash(f,parentid,name); - struct node *parent = get_node(f,parentid); + node_t *parent = get_node(f,parentid); node->name = filename_strdup(f,name); if(node->name == NULL) return -1; @@ -710,20 +673,20 @@ remember_nodes(struct fuse *f_) static void delete_node(struct fuse *f, - struct node *node) + node_t *node) { assert(node->treelock == 0); unhash_name(f,node); if(remember_nodes(f)) remove_remembered_node(f,node); unhash_id(f,node); - free_node(f,node); + node_free(node); } static void unref_node(struct fuse *f, - struct node *node) + node_t *node) { assert(node->refctr > 0); node->refctr--; @@ -745,13 +708,13 @@ rand64(void) } static -struct node* +node_t* lookup_node(struct fuse *f, uint64_t parent, const char *name) { size_t hash; - struct node *node; + node_t *node; hash = name_hash(f,parent,name); for(node = f->name_table.array[hash]; node != NULL; node = node->name_next) @@ -763,7 +726,7 @@ lookup_node(struct fuse *f, static void -inc_nlookup(struct node *node) +inc_nlookup(node_t *node) { if(!node->nlookup) node->refctr++; @@ -771,12 +734,12 @@ inc_nlookup(struct node *node) } static -struct node* +node_t* find_node(struct fuse *f, uint64_t parent, const char *name) { - struct node *node; + node_t *node; pthread_mutex_lock(&f->lock); if(!name) @@ -786,7 +749,7 @@ find_node(struct fuse *f, if(node == NULL) { - node = alloc_node(f); + node = node_alloc(); if(node == NULL) goto out_err; @@ -856,10 +819,10 @@ static void unlock_path(struct fuse *f, uint64_t nodeid, - struct node *wnode, - struct node *end) + node_t *wnode, + node_t *end) { - struct node *node; + node_t *node; if(wnode) { @@ -884,14 +847,14 @@ try_get_path(struct fuse *f, uint64_t nodeid, const char *name, char **path, - struct node **wnodep, + node_t **wnodep, bool need_lock) { unsigned bufsize = 256; char *buf; char *s; - struct node *node; - struct node *wnode = NULL; + node_t *node; + node_t *wnode = NULL; int err; *path = NULL; @@ -980,8 +943,8 @@ try_get_path2(struct fuse *f, const char *name2, char **path1, char **path2, - struct node **wnode1, - struct node **wnode2) + node_t **wnode1, + node_t **wnode2) { int err; @@ -991,7 +954,7 @@ try_get_path2(struct fuse *f, err = try_get_path(f,nodeid2,name2,path2,wnode2,true); if(err) { - struct node *wn1 = wnode1 ? *wnode1 : NULL; + node_t *wn1 = wnode1 ? *wnode1 : NULL; unlock_path(f,nodeid1,wn1,NULL); free(*path1); @@ -1109,7 +1072,7 @@ get_path_common(struct fuse *f, uint64_t nodeid, const char *name, char **path, - struct node **wnode) + node_t **wnode) { int err; @@ -1156,7 +1119,7 @@ get_path_wrlock(struct fuse *f, uint64_t nodeid, const char *name, char **path, - struct node **wnode) + node_t **wnode) { return get_path_common(f,nodeid,name,path,wnode); } @@ -1170,8 +1133,8 @@ get_path2(struct fuse *f, const char *name2, char **path1, char **path2, - struct node **wnode1, - struct node **wnode2) + node_t **wnode1, + node_t **wnode2) { int err; @@ -1202,7 +1165,7 @@ static void free_path_wrlock(struct fuse *f, uint64_t nodeid, - struct node *wnode, + node_t *wnode, char *path) { pthread_mutex_lock(&f->lock); @@ -1228,8 +1191,8 @@ void free_path2(struct fuse *f, uint64_t nodeid1, uint64_t nodeid2, - struct node *wnode1, - struct node *wnode2, + node_t *wnode1, + node_t *wnode2, char *path1, char *path2) { @@ -1248,7 +1211,7 @@ forget_node(struct fuse *f, const uint64_t nodeid, const uint64_t nlookup) { - struct node *node; + node_t *node; if(nodeid == FUSE_ROOT_ID) return; @@ -1300,7 +1263,7 @@ forget_node(struct fuse *f, static void unlink_node(struct fuse *f, - struct node *node) + node_t *node) { if(remember_nodes(f)) { @@ -1316,7 +1279,7 @@ remove_node(struct fuse *f, uint64_t dir, const char *name) { - struct node *node; + node_t *node; pthread_mutex_lock(&f->lock); node = lookup_node(f,dir,name); @@ -1333,8 +1296,8 @@ rename_node(struct fuse *f, uint64_t newdir, const char *newname) { - struct node *node; - struct node *newnode; + node_t *node; + node_t *newnode; int err = 0; pthread_mutex_lock(&f->lock); @@ -1381,14 +1344,14 @@ req_fuse(fuse_req_t req) static int -node_open(const struct node *node_) +node_open(const node_t *node_) { return ((node_ != NULL) && (node_->open_count > 0)); } static void -update_stat(struct node *node_, +update_stat(node_t *node_, const struct stat *stnew_) { uint32_t crc32b; @@ -1408,7 +1371,7 @@ set_path_info(struct fuse *f, const char *name, struct fuse_entry_param *e) { - struct node *node; + node_t *node; node = find_node(f,nodeid,name); if(node == NULL) @@ -1591,7 +1554,7 @@ fuse_lib_lookup(fuse_req_t req, char *path; const char *name; struct fuse *f; - struct node *dot = NULL; + node_t *dot = NULL; struct fuse_entry_param e = {0}; name = fuse_hdr_arg(hdr_); @@ -1681,9 +1644,11 @@ fuse_lib_forget_multi(fuse_req_t req, entry = PARAM(arg); for(uint32_t i = 0; i < arg->count; i++) - forget_node(f, + { + forget_node(f, entry[i].nodeid, entry[i].nlookup); + } fuse_reply_none(req); } @@ -1698,7 +1663,7 @@ fuse_lib_getattr(fuse_req_t req, char *path; struct fuse *f; struct stat buf; - struct node *node; + node_t *node; fuse_timeouts_t timeout; fuse_file_info_t ffi = {0}; const struct fuse_getattr_in *arg; @@ -1759,7 +1724,7 @@ fuse_lib_setattr(fuse_req_t req, struct stat stbuf = {0}; char *path; int err; - struct node *node; + node_t *node; fuse_timeouts_t timeout; fuse_file_info_t *fi; fuse_file_info_t ffi = {0}; @@ -2019,7 +1984,7 @@ fuse_lib_unlink(fuse_req_t req, char *path; struct fuse *f; const char *name; - struct node *wnode; + node_t *wnode; name = PARAM(hdr_); @@ -2052,7 +2017,7 @@ fuse_lib_rmdir(fuse_req_t req, char *path; struct fuse *f; const char *name; - struct node *wnode; + node_t *wnode; name = PARAM(hdr_); @@ -2110,8 +2075,8 @@ fuse_lib_rename(fuse_req_t req, char *newpath; const char *oldname; const char *newname; - struct node *wnode1; - struct node *wnode2; + node_t *wnode1; + node_t *wnode2; struct fuse_rename_in *arg; arg = fuse_hdr_arg(hdr_); @@ -2179,7 +2144,7 @@ fuse_do_release(struct fuse *f, fuse_file_info_t *fi) { uint64_t fh; - struct node *node; + node_t *node; fh = 0; @@ -2277,7 +2242,7 @@ open_auto_cache(struct fuse *f, const char *path, fuse_file_info_t *fi) { - struct node *node; + node_t *node; fuse_timeouts_t timeout; pthread_mutex_lock(&f->lock); @@ -2917,11 +2882,11 @@ fuse_lib_copy_file_range(fuse_req_t req_, } static -struct lock* -locks_conflict(struct node *node, - const struct lock *lock) +lock_t* +locks_conflict(node_t *node, + const lock_t *lock) { - struct lock *l; + lock_t *l; for(l = node->locks; l; l = l->next) if(l->owner != lock->owner && @@ -2934,17 +2899,17 @@ locks_conflict(struct node *node, static void -delete_lock(struct lock **lockp) +delete_lock(lock_t **lockp) { - struct lock *l = *lockp; + lock_t *l = *lockp; *lockp = l->next; free(l); } static void -insert_lock(struct lock **pos, - struct lock *lock) +insert_lock(lock_t **pos, + lock_t *lock) { lock->next = *pos; *pos = lock; @@ -2952,17 +2917,17 @@ insert_lock(struct lock **pos, static int -locks_insert(struct node *node, - struct lock *lock) +locks_insert(node_t *node, + lock_t *lock) { - struct lock **lp; - struct lock *newl1 = NULL; - struct lock *newl2 = NULL; + lock_t **lp; + lock_t *newl1 = NULL; + lock_t *newl2 = NULL; if(lock->type != F_UNLCK || lock->start != 0 || lock->end != OFFSET_MAX) { - newl1 = malloc(sizeof(struct lock)); - newl2 = malloc(sizeof(struct lock)); + newl1 = malloc(sizeof(lock_t)); + newl2 = malloc(sizeof(lock_t)); if(!newl1 || !newl2) { @@ -2974,7 +2939,7 @@ locks_insert(struct node *node, for(lp = &node->locks; *lp;) { - struct lock *l = *lp; + lock_t *l = *lp; if(l->owner != lock->owner) goto skip; @@ -3038,9 +3003,9 @@ locks_insert(struct node *node, static void flock_to_lock(struct flock *flock, - struct lock *lock) + lock_t *lock) { - memset(lock,0,sizeof(struct lock)); + memset(lock,0,sizeof(lock_t)); lock->type = flock->l_type; lock->start = flock->l_start; lock->end = flock->l_len ? flock->l_start + flock->l_len - 1 : OFFSET_MAX; @@ -3049,7 +3014,7 @@ flock_to_lock(struct flock *flock, static void -lock_to_flock(struct lock *lock, +lock_to_flock(lock_t *lock, struct flock *flock) { flock->l_type = lock->type; @@ -3066,7 +3031,7 @@ fuse_flush_common(struct fuse *f, fuse_file_info_t *fi) { struct flock lock; - struct lock l; + lock_t l; int err; int errlock; @@ -3194,9 +3159,9 @@ fuse_lib_getlk(fuse_req_t req, { int err; struct fuse *f; - struct lock lk; + lock_t lk; struct flock flk; - struct lock *conflict; + lock_t *conflict; fuse_file_info_t ffi = {0}; const struct fuse_lk_in *arg; @@ -3239,7 +3204,7 @@ fuse_lib_setlk(fuse_req_t req, if(!err) { struct fuse *f = req_fuse(req); - struct lock l; + lock_t l; flock_to_lock(lock,&l); l.owner = fi->lock_owner; pthread_mutex_lock(&f->lock); @@ -3639,6 +3604,7 @@ static const struct fuse_opt fuse_lib_opts[] = FUSE_LIB_OPT("threads=%d", read_thread_count,0), FUSE_LIB_OPT("read-thread-count=%d", read_thread_count,0), FUSE_LIB_OPT("process-thread-count=%d", process_thread_count,-1), + FUSE_LIB_OPT("process-thread-queue-depth=%d", process_thread_queue_depth,-1), FUSE_LIB_OPT("pin-threads=%s", pin_threads, 0), FUSE_OPT_END }; @@ -3712,7 +3678,7 @@ int node_table_init(struct node_table *t) { t->size = NODE_TABLE_MIN_SIZE; - t->array = (struct node **)calloc(1,sizeof(struct node *) * t->size); + t->array = (node_t **)calloc(1,sizeof(node_t *) * t->size); if(t->array == NULL) { fprintf(stderr,"fuse: memory allocation failed\n"); @@ -3724,16 +3690,47 @@ node_table_init(struct node_table *t) return 0; } +static +struct fuse* +fuse_get_fuse_obj() +{ + static struct fuse f = {0}; + + return &f; +} + static void metrics_log_nodes_info(struct fuse *f_, FILE *file_) { char buf[1024]; + char time_str[64]; + struct tm tm; + struct timeval tv; + uint64_t sizeof_node; + float node_usage_ratio; + uint64_t node_slab_count; + uint64_t node_avail_objs; + uint64_t node_total_alloc_mem; + + gettimeofday(&tv,NULL); + localtime_r(&tv.tv_sec,&tm); + strftime(time_str,sizeof(time_str),"%Y-%m-%dT%H:%M:%S.000%z",&tm); + + sizeof_node = sizeof(node_t); + + lfmp_t *lfmp; + lfmp = node_lfmp(); + lfmp_lock(lfmp); + node_slab_count = fmp_slab_count(&lfmp->fmp); + node_usage_ratio = fmp_slab_usage_ratio(&lfmp->fmp); + node_avail_objs = fmp_avail_objs(&lfmp->fmp); + node_total_alloc_mem = fmp_total_allocated_memory(&lfmp->fmp); + lfmp_unlock(lfmp); - lfmp_lock(&f_->node_fmp); snprintf(buf,sizeof(buf), - "time: %"PRIu64"\n" + "time: %s\n" "sizeof(node): %"PRIu64"\n" "node id_table size: %"PRIu64"\n" "node id_table usage: %"PRIu64"\n" @@ -3745,22 +3742,29 @@ metrics_log_nodes_info(struct fuse *f_, "node memory pool usage ratio: %f\n" "node memory pool avail objs: %"PRIu64"\n" "node memory pool total allocated memory: %"PRIu64"\n" + "msgbuf bufsize: %"PRIu64"\n" + "msgbuf allocation count: %"PRIu64"\n" + "msgbuf available count: %"PRIu64"\n" + "msgbuf total allocated memory: %"PRIu64"\n" "\n" , - (uint64_t)time(NULL), - (uint64_t)sizeof(struct node), + time_str, + sizeof_node, (uint64_t)f_->id_table.size, (uint64_t)f_->id_table.use, - (uint64_t)(f_->id_table.size * sizeof(struct node*)), + (uint64_t)(f_->id_table.size * sizeof(node_t*)), (uint64_t)f_->name_table.size, (uint64_t)f_->name_table.use, - (uint64_t)(f_->name_table.size * sizeof(struct node*)), - (uint64_t)fmp_slab_count(&f_->node_fmp.fmp), - fmp_slab_usage_ratio(&f_->node_fmp.fmp), - (uint64_t)fmp_avail_objs(&f_->node_fmp.fmp), - (uint64_t)fmp_total_allocated_memory(&f_->node_fmp.fmp) + (uint64_t)(f_->name_table.size * sizeof(node_t*)), + node_slab_count, + node_usage_ratio, + node_avail_objs, + node_total_alloc_mem, + msgbuf_get_bufsize(), + msgbuf_alloc_count(), + msgbuf_avail_count(), + msgbuf_alloc_count() * msgbuf_get_bufsize() ); - lfmp_unlock(&f_->node_fmp); fputs(buf,file_); } @@ -3769,12 +3773,20 @@ static void metrics_log_nodes_info_to_tmp_dir(struct fuse *f_) { + int rv; FILE *file; char filepath[256]; + struct stat st; + char const *mode = "a"; + off_t const max_size = (1024 * 1024); sprintf(filepath,"/tmp/mergerfs.%d.info",getpid()); - file = fopen(filepath,"w"); + rv = lstat(filepath,&st); + if((rv == 0) && (st.st_size > max_size)) + mode = "w"; + + file = fopen(filepath,mode); if(file == NULL) return; @@ -3783,7 +3795,6 @@ metrics_log_nodes_info_to_tmp_dir(struct fuse *f_) fclose(file); } - static void fuse_malloc_trim(void) @@ -3793,16 +3804,60 @@ fuse_malloc_trim(void) #endif } +void +fuse_invalidate_all_nodes() +{ + struct fuse *f = fuse_get_fuse_obj(); + + syslog_info("invalidating file entries"); + + pthread_mutex_lock(&f->lock); + for(int i = 0; i < f->id_table.size; i++) + { + node_t *node; + + for(node = f->id_table.array[i]; node != NULL; node = node->id_next) + { + if(node->nodeid == FUSE_ROOT_ID) + continue; + if(node->parent->nodeid != FUSE_ROOT_ID) + continue; + + fuse_lowlevel_notify_inval_entry(f->se->ch, + node->parent->nodeid, + node->name, + strlen(node->name)); + } + } + pthread_mutex_unlock(&f->lock); +} + +void +fuse_gc() +{ + syslog_info("running thorough garbage collection"); + node_gc(); + msgbuf_gc(); + fuse_malloc_trim(); +} + +void +fuse_gc1() +{ + syslog_info("running basic garbage collection"); + node_gc1(); + msgbuf_gc_10percent(); + fuse_malloc_trim(); +} + static void* fuse_maintenance_loop(void *fuse_) { - int gc; int loops; int sleep_time; struct fuse *f = (struct fuse*)fuse_; - gc = 0; loops = 0; sleep_time = 60; while(1) @@ -3811,14 +3866,7 @@ fuse_maintenance_loop(void *fuse_) fuse_prune_remembered_nodes(f); if((loops % 15) == 0) - { - fuse_malloc_trim(); - gc = 1; - } - - // Trigger a followup gc if this gc succeeds - if(!f->conf.nogc && gc) - gc = lfmp_gc(&f->node_fmp); + fuse_gc1(); if(g_LOG_METRICS) metrics_log_nodes_info_to_tmp_dir(f); @@ -3852,14 +3900,14 @@ fuse_new_common(struct fuse_chan *ch, size_t op_size) { struct fuse *f; - struct node *root; + node_t *root; struct fuse_fs *fs; struct fuse_lowlevel_ops llop = fuse_path_ops; if(fuse_create_context_key() == -1) goto out; - f = (struct fuse *)calloc(1,sizeof(struct fuse)); + f = fuse_get_fuse_obj(); if(f == NULL) { fprintf(stderr,"fuse: failed to allocate fuse object\n"); @@ -3902,10 +3950,9 @@ fuse_new_common(struct fuse_chan *ch, fuse_mutex_init(&f->lock); - lfmp_init(&f->node_fmp,sizeof(struct node),256); kv_init(f->remembered_nodes); - root = alloc_node(f); + root = node_alloc(); if(root == NULL) { fprintf(stderr,"fuse: memory allocation failed\n"); @@ -3933,7 +3980,7 @@ fuse_new_common(struct fuse_chan *ch, fs->op.destroy = NULL; free(f->fs); out_free: - free(f); + // free(f); out_delete_context_key: fuse_delete_context_key(); out: @@ -3963,7 +4010,7 @@ fuse_destroy(struct fuse *f) for(i = 0; i < f->id_table.size; i++) { - struct node *node; + node_t *node; for(node = f->id_table.array[i]; node != NULL; node = node->id_next) { @@ -3978,8 +4025,8 @@ fuse_destroy(struct fuse *f) for(i = 0; i < f->id_table.size; i++) { - struct node *node; - struct node *next; + node_t *node; + node_t *next; for(node = f->id_table.array[i]; node != NULL; node = next) { @@ -3993,9 +4040,7 @@ fuse_destroy(struct fuse *f) free(f->name_table.array); pthread_mutex_destroy(&f->lock); fuse_session_destroy(f->se); - lfmp_destroy(&f->node_fmp); kv_destroy(f->remembered_nodes); - free(f); fuse_delete_context_key(); } @@ -4011,6 +4056,12 @@ fuse_config_process_thread_count(const struct fuse *f_) return f_->conf.process_thread_count; } +int +fuse_config_process_thread_queue_depth(const struct fuse *f_) +{ + return f_->conf.process_thread_queue_depth; +} + const char* fuse_config_pin_threads(const struct fuse *f_) diff --git a/libfuse/lib/fuse_loop_mt.cpp b/libfuse/lib/fuse_loop_mt.cpp index dc726ff4..867c6d31 100644 --- a/libfuse/lib/fuse_loop_mt.cpp +++ b/libfuse/lib/fuse_loop_mt.cpp @@ -2,9 +2,11 @@ #define _GNU_SOURCE #endif -#include "thread_pool.hpp" +#include "bounded_thread_pool.hpp" #include "cpu.hpp" #include "fmt/core.h" +#include "scope_guard.hpp" +#include "syslog.h" #include "fuse_i.h" #include "fuse_kernel.h" @@ -27,33 +29,6 @@ #include #include - -struct fuse_worker_data_t -{ - struct fuse_session *se; - sem_t finished; - std::function msgbuf_processor; - std::shared_ptr tp; -}; - -class WorkerCleanup -{ -public: - WorkerCleanup(fuse_worker_data_t *wd_) - : _wd(wd_) - { - } - - ~WorkerCleanup() - { - fuse_session_exit(_wd->se); - sem_post(&_wd->finished); - } - -private: - fuse_worker_data_t *_wd; -}; - static bool retriable_receive_error(const int err_) @@ -77,54 +52,113 @@ fatal_receive_error(const int err_) } static -void* +void handle_receive_error(const int rv_, fuse_msgbuf_t *msgbuf_) { msgbuf_free(msgbuf_); - fprintf(stderr, - "mergerfs: error reading from /dev/fuse - %s (%d)\n", - strerror(-rv_), - -rv_); - - return NULL; + fmt::print(stderr, + "mergerfs: error reading from /dev/fuse - {} ({})\n", + strerror(-rv_), + -rv_); } -static -void* -fuse_do_work(void *data) +struct AsyncWorker { - fuse_worker_data_t *wd = (fuse_worker_data_t*)data; - fuse_session *se = wd->se; - auto &process_msgbuf = wd->msgbuf_processor; - WorkerCleanup workercleanup(wd); + fuse_session *_se; + sem_t *_finished; + std::shared_ptr _process_tp; + + AsyncWorker(fuse_session *se_, + sem_t *finished_, + std::shared_ptr process_tp_) + : _se(se_), + _finished(finished_), + _process_tp(process_tp_) + { + } - while(!fuse_session_exited(se)) - { - int rv; - fuse_msgbuf_t *msgbuf; - - msgbuf = msgbuf_alloc(); - - do - { - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL); - rv = se->receive_buf(se,msgbuf); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL); - if(rv == 0) - return NULL; - if(retriable_receive_error(rv)) - continue; - if(fatal_receive_error(rv)) - return handle_receive_error(rv,msgbuf); - } while(false); - - process_msgbuf(wd,msgbuf); - } + inline + void + operator()() const + { + DEFER{ fuse_session_exit(_se); }; + DEFER{ sem_post(_finished); }; + + while(!fuse_session_exited(_se)) + { + int rv; + fuse_msgbuf_t *msgbuf; + + msgbuf = msgbuf_alloc(); + + do + { + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL); + rv = _se->receive_buf(_se,msgbuf); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL); + if(rv == 0) + return; + if(retriable_receive_error(rv)) + continue; + if(fatal_receive_error(rv)) + return handle_receive_error(rv,msgbuf); + } while(false); + + _process_tp->enqueue_work([=] { + _se->process_buf(_se,msgbuf); + msgbuf_free(msgbuf); + }); + } + } +}; - return NULL; -} +struct SyncWorker +{ + fuse_session *_se; + sem_t *_finished; + + SyncWorker(fuse_session *se_, + sem_t *finished_) + + : _se(se_), + _finished(finished_) + { + } + + inline + void + operator()() const + { + DEFER{ fuse_session_exit(_se); }; + DEFER{ sem_post(_finished); }; + + while(!fuse_session_exited(_se)) + { + int rv; + fuse_msgbuf_t *msgbuf; + + msgbuf = msgbuf_alloc(); + + do + { + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL); + rv = _se->receive_buf(_se,msgbuf); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL); + if(rv == 0) + return; + if(retriable_receive_error(rv)) + continue; + if(fatal_receive_error(rv)) + return handle_receive_error(rv,msgbuf); + } while(false); + + _se->process_buf(_se,msgbuf); + msgbuf_free(msgbuf); + } + } +}; int fuse_start_thread(pthread_t *thread_id, @@ -174,7 +208,8 @@ calculate_thread_count(const int raw_thread_count_) static void calculate_thread_counts(int *read_thread_count_, - int *process_thread_count_) + int *process_thread_count_, + int *process_thread_queue_depth_) { if((*read_thread_count_ == -1) && (*process_thread_count_ == -1)) { @@ -190,27 +225,9 @@ calculate_thread_counts(int *read_thread_count_, if(*process_thread_count_ != -1) *process_thread_count_ = ::calculate_thread_count(*process_thread_count_); } -} - -static -void -process_msgbuf_sync(fuse_worker_data_t *wd_, - fuse_msgbuf_t *msgbuf_) -{ - wd_->se->process_buf(wd_->se,msgbuf_); - msgbuf_free(msgbuf_); -} - -static -void -process_msgbuf_async(fuse_worker_data_t *wd_, - fuse_msgbuf_t *msgbuf_) -{ - const auto func = [=] { - process_msgbuf_sync(wd_,msgbuf_); - }; - wd_->tp->enqueue_work(func); + if(*process_thread_queue_depth_ <= 0) + *process_thread_queue_depth_ = *process_thread_count_; } static @@ -425,63 +442,71 @@ pin_threads(const std::vector read_threads_, return ::pin_threads_R1PPSP(read_threads_,process_threads_); } +static +void +wait(fuse_session *se_, + sem_t *finished_sem_) +{ + while(!fuse_session_exited(se_)) + sem_wait(finished_sem_); +} + int fuse_session_loop_mt(struct fuse_session *se_, const int raw_read_thread_count_, const int raw_process_thread_count_, + const int raw_process_thread_queue_depth_, const char *pin_threads_type_) { - int err; + sem_t finished; int read_thread_count; int process_thread_count; - fuse_worker_data_t wd = {0}; + int process_thread_queue_depth; std::vector read_threads; std::vector process_threads; + std::unique_ptr read_tp; + std::shared_ptr process_tp; - read_thread_count = raw_read_thread_count_; - process_thread_count = raw_process_thread_count_; - ::calculate_thread_counts(&read_thread_count,&process_thread_count); + sem_init(&finished,0,0); + + read_thread_count = raw_read_thread_count_; + process_thread_count = raw_process_thread_count_; + process_thread_queue_depth = raw_process_thread_queue_depth_; + ::calculate_thread_counts(&read_thread_count, + &process_thread_count, + &process_thread_queue_depth); if(process_thread_count > 0) + process_tp = std::make_shared(process_thread_count,process_thread_queue_depth); + + read_tp = std::make_unique(read_thread_count); + if(process_tp) { - wd.tp = std::make_shared(process_thread_count); - wd.msgbuf_processor = process_msgbuf_async; - process_threads = wd.tp->threads(); + for(auto i = 0; i < read_thread_count; i++) + read_tp->enqueue_work(AsyncWorker(se_,&finished,process_tp)); } else { - wd.msgbuf_processor = process_msgbuf_sync; + for(auto i = 0; i < read_thread_count; i++) + read_tp->enqueue_work(SyncWorker(se_,&finished)); } - wd.se = se_; - sem_init(&wd.finished,0,0); + if(read_tp) + read_threads = read_tp->threads(); + if(process_tp) + process_threads = process_tp->threads(); - err = 0; - for(int i = 0; i < read_thread_count; i++) - { - pthread_t thread_id; - err = fuse_start_thread(&thread_id,fuse_do_work,&wd); - assert(err == 0); - read_threads.push_back(thread_id); - } - - if(pin_threads_type_ != NULL) + if(pin_threads_type_ != nullptr) ::pin_threads(read_threads,process_threads,pin_threads_type_); - if(!err) - { - /* sem_wait() is interruptible */ - while(!fuse_session_exited(se_)) - sem_wait(&wd.finished); + syslog_info("read-thread-count=%d; process-thread-count=%d; process-thread-queue-depth=%d", + read_thread_count, + process_thread_count, + process_thread_queue_depth); - for(const auto &thread_id : read_threads) - pthread_cancel(thread_id); + ::wait(se_,&finished); - for(const auto &thread_id : read_threads) - pthread_join(thread_id,NULL); - } - - sem_destroy(&wd.finished); + sem_destroy(&finished); - return err; + return 0; } diff --git a/libfuse/lib/fuse_msgbuf.cpp b/libfuse/lib/fuse_msgbuf.cpp index fd167612..573e5273 100644 --- a/libfuse/lib/fuse_msgbuf.cpp +++ b/libfuse/lib/fuse_msgbuf.cpp @@ -20,17 +20,20 @@ #include +#include #include #include #include -#include +#include static std::uint32_t g_PAGESIZE = 0; static std::uint32_t g_BUFSIZE = 0; +static std::atomic_uint64_t g_MSGBUF_ALLOC_COUNT; + static std::mutex g_MUTEX; -static std::stack g_MSGBUF_STACK; +static std::vector g_MSGBUF_STACK; static __attribute__((constructor)) @@ -38,6 +41,7 @@ void msgbuf_constructor() { g_PAGESIZE = sysconf(_SC_PAGESIZE); + // +2 because to do O_DIRECT we need to offset the buffer to align g_BUFSIZE = (g_PAGESIZE * (FUSE_MAX_MAX_PAGES + 2)); } @@ -46,10 +50,10 @@ __attribute__((destructor)) void msgbuf_destroy() { - // TODO: cleanup? + } -uint32_t +uint64_t msgbuf_get_bufsize() { return g_BUFSIZE; @@ -58,7 +62,7 @@ msgbuf_get_bufsize() void msgbuf_set_bufsize(const uint32_t size_in_pages_) { - g_BUFSIZE = (size_in_pages_ * g_PAGESIZE); + g_BUFSIZE = ((size_in_pages_ + 1) * g_PAGESIZE); } static @@ -85,40 +89,104 @@ msgbuf_alloc() { g_MUTEX.unlock(); - msgbuf = (fuse_msgbuf_t*)malloc(sizeof(fuse_msgbuf_t)); + msgbuf = (fuse_msgbuf_t*)page_aligned_malloc(g_BUFSIZE); if(msgbuf == NULL) return NULL; - msgbuf->mem = (char*)page_aligned_malloc(g_BUFSIZE); - if(msgbuf->mem == NULL) - { - free(msgbuf); - return NULL; - } + msgbuf->mem = (((char*)msgbuf) + g_PAGESIZE); - msgbuf->size = g_BUFSIZE; + msgbuf->size = g_BUFSIZE - g_PAGESIZE; + g_MSGBUF_ALLOC_COUNT++; } else { - msgbuf = g_MSGBUF_STACK.top(); - g_MSGBUF_STACK.pop(); + msgbuf = g_MSGBUF_STACK.back(); + g_MSGBUF_STACK.pop_back(); g_MUTEX.unlock(); } return msgbuf; } +static +void +msgbuf_destroy(fuse_msgbuf_t *msgbuf_) +{ + // free(msgbuf_->mem); + free(msgbuf_); +} + void msgbuf_free(fuse_msgbuf_t *msgbuf_) { std::lock_guard lck(g_MUTEX); - if(msgbuf_->size != g_BUFSIZE) + if(msgbuf_->size != (g_BUFSIZE - g_PAGESIZE)) { - free(msgbuf_->mem); - free(msgbuf_); + msgbuf_destroy(msgbuf_); + g_MSGBUF_ALLOC_COUNT--; return; } - g_MSGBUF_STACK.push(msgbuf_); + g_MSGBUF_STACK.emplace_back(msgbuf_); + +} + +uint64_t +msgbuf_alloc_count() +{ + return g_MSGBUF_ALLOC_COUNT; +} + +uint64_t +msgbuf_avail_count() +{ + std::lock_guard lck(g_MUTEX); + + return g_MSGBUF_STACK.size(); +} + +void +msgbuf_gc_10percent() +{ + std::vector togc; + + { + std::size_t size; + std::size_t ten_percent; + + std::lock_guard lck(g_MUTEX); + + size = g_MSGBUF_STACK.size(); + ten_percent = (size / 10); + + for(std::size_t i = 0; i < ten_percent; i++) + { + togc.push_back(g_MSGBUF_STACK.back()); + g_MSGBUF_STACK.pop_back(); + } + } + + for(auto msgbuf : togc) + { + msgbuf_destroy(msgbuf); + g_MSGBUF_ALLOC_COUNT--; + } +} + +void +msgbuf_gc() +{ + std::vector oldstack; + + { + std::lock_guard lck(g_MUTEX); + oldstack.swap(g_MSGBUF_STACK); + } + + for(auto msgbuf: oldstack) + { + msgbuf_destroy(msgbuf); + g_MSGBUF_ALLOC_COUNT--; + } } diff --git a/libfuse/lib/fuse_msgbuf.hpp b/libfuse/lib/fuse_msgbuf.hpp index 8ff0f77f..1fe67f59 100644 --- a/libfuse/lib/fuse_msgbuf.hpp +++ b/libfuse/lib/fuse_msgbuf.hpp @@ -24,10 +24,15 @@ EXTERN_C_BEGIN void msgbuf_set_bufsize(const uint32_t size); -uint32_t msgbuf_get_bufsize(); +uint64_t msgbuf_get_bufsize(); fuse_msgbuf_t* msgbuf_alloc(); -fuse_msgbuf_t* msgbuf_alloc_memonly(); void msgbuf_free(fuse_msgbuf_t *msgbuf); +void msgbuf_gc(); +void msgbuf_gc_10percent(); + +uint64_t msgbuf_alloc_count(); +uint64_t msgbuf_avail_count(); + EXTERN_C_END diff --git a/libfuse/lib/fuse_mt.c b/libfuse/lib/fuse_mt.c index 03812a29..e53e8f90 100644 --- a/libfuse/lib/fuse_mt.c +++ b/libfuse/lib/fuse_mt.c @@ -29,6 +29,7 @@ fuse_loop_mt(struct fuse *f) res = fuse_session_loop_mt(fuse_get_session(f), fuse_config_read_thread_count(f), fuse_config_process_thread_count(f), + fuse_config_process_thread_queue_depth(f), fuse_config_pin_threads(f)); fuse_stop_maintenance_thread(f); diff --git a/libfuse/lib/fuse_signals.c b/libfuse/lib/fuse_signals.c index ad550d8f..ba9c70be 100644 --- a/libfuse/lib/fuse_signals.c +++ b/libfuse/lib/fuse_signals.c @@ -14,9 +14,11 @@ static struct fuse_session *fuse_instance; -static void exit_handler(int sig) +static +void +exit_handler(int sig) { - (void) sig; + (void)sig; if (fuse_instance) fuse_session_exit(fuse_instance); } @@ -46,10 +48,10 @@ static int set_one_signal_handler(int sig, void (*handler)(int), int remove) int fuse_set_signal_handlers(struct fuse_session *se) { - if (set_one_signal_handler(SIGHUP, exit_handler, 0) == -1 || - set_one_signal_handler(SIGINT, exit_handler, 0) == -1 || - set_one_signal_handler(SIGTERM, exit_handler, 0) == -1 || - set_one_signal_handler(SIGPIPE, SIG_IGN, 0) == -1) + if((set_one_signal_handler(SIGINT, exit_handler, 0) == -1) || + (set_one_signal_handler(SIGTERM, exit_handler, 0) == -1) || + (set_one_signal_handler(SIGQUIT, exit_handler, 0) == -1) || + (set_one_signal_handler(SIGPIPE, SIG_IGN, 0) == -1)) return -1; fuse_instance = se; @@ -64,8 +66,8 @@ void fuse_remove_signal_handlers(struct fuse_session *se) else fuse_instance = NULL; - set_one_signal_handler(SIGHUP, exit_handler, 1); set_one_signal_handler(SIGINT, exit_handler, 1); set_one_signal_handler(SIGTERM, exit_handler, 1); + set_one_signal_handler(SIGQUIT, exit_handler, 1); set_one_signal_handler(SIGPIPE, SIG_IGN, 1); } diff --git a/libfuse/lib/lock.h b/libfuse/lib/lock.h new file mode 100644 index 00000000..9fa58be5 --- /dev/null +++ b/libfuse/lib/lock.h @@ -0,0 +1,13 @@ +#include +#include + +typedef struct lock_s lock_t; +struct lock_s +{ + int type; + off_t start; + off_t end; + pid_t pid; + uint64_t owner; + lock_t *next; +}; diff --git a/libfuse/lib/make_unique.hpp b/libfuse/lib/make_unique.hpp new file mode 100644 index 00000000..98260bca --- /dev/null +++ b/libfuse/lib/make_unique.hpp @@ -0,0 +1,41 @@ +#include +#include +#include +#include + +namespace std +{ + template struct _Unique_if + { + typedef unique_ptr _Single_object; + }; + + template struct _Unique_if + { + typedef unique_ptr _Unknown_bound; + }; + + template struct _Unique_if + { + typedef void _Known_bound; + }; + + template + typename _Unique_if::_Single_object + make_unique(Args&&... args) + { + return unique_ptr(new T(std::forward(args)...)); + } + + template + typename _Unique_if::_Unknown_bound + make_unique(size_t n) + { + typedef typename remove_extent::type U; + return unique_ptr(new U[n]()); + } + + template + typename _Unique_if::_Known_bound + make_unique(Args&&...) = delete; +} diff --git a/libfuse/lib/node.c b/libfuse/lib/node.c new file mode 100644 index 00000000..3436a7c5 --- /dev/null +++ b/libfuse/lib/node.c @@ -0,0 +1,60 @@ +#include "node.h" + +#include "lfmp.h" + +static lfmp_t g_NODE_FMP; + +static +__attribute__((constructor)) +void +node_constructor() +{ + lfmp_init(&g_NODE_FMP,sizeof(node_t),256); +} + +static +__attribute__((destructor)) +void +node_destructor() +{ + lfmp_destroy(&g_NODE_FMP); +} + +node_t * +node_alloc() +{ + return lfmp_calloc(&g_NODE_FMP); +} + +void +node_free(node_t *node_) +{ + lfmp_free(&g_NODE_FMP,node_); +} + +int +node_gc1() +{ + return lfmp_gc(&g_NODE_FMP); +} + +void +node_gc() +{ + int rv; + int fails; + + fails = 0; + do + { + rv = node_gc1(); + if(rv == 0) + fails++; + } while(rv || (fails < 3)); +} + +lfmp_t* +node_lfmp() +{ + return &g_NODE_FMP; +} diff --git a/libfuse/lib/node.h b/libfuse/lib/node.h new file mode 100644 index 00000000..c5636de0 --- /dev/null +++ b/libfuse/lib/node.h @@ -0,0 +1,30 @@ +#include "lock.h" +#include "lfmp.h" + +typedef struct node_s node_t; +struct node_s +{ + node_t *name_next; + node_t *id_next; + + uint64_t nodeid; + char *name; + node_t *parent; + + uint64_t nlookup; + uint32_t refctr; + uint32_t open_count; + uint64_t hidden_fh; + + int32_t treelock; + lock_t *locks; + + uint32_t stat_crc32b; + uint8_t is_stat_cache_valid:1; +}; + +node_t *node_alloc(); +void node_free(node_t*); +int node_gc1(); +void node_gc(); +lfmp_t *node_lfmp(); diff --git a/libfuse/lib/scope_guard.hpp b/libfuse/lib/scope_guard.hpp new file mode 100644 index 00000000..36ee88f9 --- /dev/null +++ b/libfuse/lib/scope_guard.hpp @@ -0,0 +1,369 @@ +// _____ _____ _ _____ +// / ____| / ____| | | / ____|_ _ +// | (___ ___ ___ _ __ ___ | | __ _ _ __ _ _ __ __| | | | _| |_ _| |_ +// \___ \ / __/ _ \| '_ \ / _ \ | | |_ | | | |/ _` | '__/ _` | | | |_ _|_ _| +// ____) | (_| (_) | |_) | __/ | |__| | |_| | (_| | | | (_| | | |____|_| |_| +// |_____/ \___\___/| .__/ \___| \_____|\__,_|\__,_|_| \__,_| \_____| +// | | https://github.com/Neargye/scope_guard +// |_| version 0.9.1 +// +// Licensed under the MIT License . +// SPDX-License-Identifier: MIT +// Copyright (c) 2018 - 2021 Daniil Goncharov . +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +#ifndef NEARGYE_SCOPE_GUARD_HPP +#define NEARGYE_SCOPE_GUARD_HPP + +#define SCOPE_GUARD_VERSION_MAJOR 0 +#define SCOPE_GUARD_VERSION_MINOR 9 +#define SCOPE_GUARD_VERSION_PATCH 1 + +#include +#if (defined(_MSC_VER) && _MSC_VER >= 1900) || ((defined(__clang__) || defined(__GNUC__)) && __cplusplus >= 201700L) +#include +#endif + +// scope_guard throwable settings: +// SCOPE_GUARD_NO_THROW_CONSTRUCTIBLE requires nothrow constructible action. +// SCOPE_GUARD_MAY_THROW_ACTION action may throw exceptions. +// SCOPE_GUARD_NO_THROW_ACTION requires noexcept action. +// SCOPE_GUARD_SUPPRESS_THROW_ACTION exceptions during action will be suppressed. +// SCOPE_GUARD_CATCH_HANDLER exceptions handler. If SCOPE_GUARD_SUPPRESS_THROW_ACTIONS is not defined, it will do nothing. + +#if !defined(SCOPE_GUARD_MAY_THROW_ACTION) && !defined(SCOPE_GUARD_NO_THROW_ACTION) && !defined(SCOPE_GUARD_SUPPRESS_THROW_ACTION) +# define SCOPE_GUARD_MAY_THROW_ACTION +#elif (defined(SCOPE_GUARD_MAY_THROW_ACTION) + defined(SCOPE_GUARD_NO_THROW_ACTION) + defined(SCOPE_GUARD_SUPPRESS_THROW_ACTION)) > 1 +# error Only one of SCOPE_GUARD_MAY_THROW_ACTION and SCOPE_GUARD_NO_THROW_ACTION and SCOPE_GUARD_SUPPRESS_THROW_ACTION may be defined. +#endif + +#if !defined(SCOPE_GUARD_CATCH_HANDLER) +# define SCOPE_GUARD_CATCH_HANDLER /* Suppress exception.*/ +#endif + +namespace scope_guard { + +namespace detail { + +#if defined(SCOPE_GUARD_SUPPRESS_THROW_ACTION) && (defined(__cpp_exceptions) || defined(__EXCEPTIONS) || (_HAS_EXCEPTIONS)) +# define NEARGYE_NOEXCEPT(...) noexcept +# define NEARGYE_TRY try { +# define NEARGYE_CATCH } catch (...) { SCOPE_GUARD_CATCH_HANDLER } +#else +# define NEARGYE_NOEXCEPT(...) noexcept(__VA_ARGS__) +# define NEARGYE_TRY +# define NEARGYE_CATCH +#endif + +#define NEARGYE_MOV(...) static_cast::type&&>(__VA_ARGS__) +#define NEARGYE_FWD(...) static_cast(__VA_ARGS__) + +// NEARGYE_NODISCARD encourages the compiler to issue a warning if the return value is discarded. +#if !defined(NEARGYE_NODISCARD) +# if defined(__clang__) +# if (__clang_major__ * 10 + __clang_minor__) >= 39 && __cplusplus >= 201703L +# define NEARGYE_NODISCARD [[nodiscard]] +# else +# define NEARGYE_NODISCARD __attribute__((__warn_unused_result__)) +# endif +# elif defined(__GNUC__) +# if __GNUC__ >= 7 && __cplusplus >= 201703L +# define NEARGYE_NODISCARD [[nodiscard]] +# else +# define NEARGYE_NODISCARD __attribute__((__warn_unused_result__)) +# endif +# elif defined(_MSC_VER) +# if _MSC_VER >= 1911 && defined(_MSVC_LANG) && _MSVC_LANG >= 201703L +# define NEARGYE_NODISCARD [[nodiscard]] +# elif defined(_Check_return_) +# define NEARGYE_NODISCARD _Check_return_ +# else +# define NEARGYE_NODISCARD +# endif +# else +# define NEARGYE_NODISCARD +# endif +#endif + +#if defined(_MSC_VER) && _MSC_VER < 1900 +inline int uncaught_exceptions() noexcept { + return *(reinterpret_cast(static_cast(static_cast(_getptd())) + (sizeof(void*) == 8 ? 0x100 : 0x90))); +} +#elif (defined(__clang__) || defined(__GNUC__)) && __cplusplus < 201700L +struct __cxa_eh_globals; +extern "C" __cxa_eh_globals* __cxa_get_globals() noexcept; +inline int uncaught_exceptions() noexcept { + return static_cast(*(reinterpret_cast(static_cast(static_cast(__cxa_get_globals())) + sizeof(void*)))); +} +#else +inline int uncaught_exceptions() noexcept { + return std::uncaught_exceptions(); +} +#endif + +class on_exit_policy { + bool execute_; + + public: + explicit on_exit_policy(bool execute) noexcept : execute_{execute} {} + + void dismiss() noexcept { + execute_ = false; + } + + bool should_execute() const noexcept { + return execute_; + } +}; + +class on_fail_policy { + int ec_; + + public: + explicit on_fail_policy(bool execute) noexcept : ec_{execute ? uncaught_exceptions() : -1} {} + + void dismiss() noexcept { + ec_ = -1; + } + + bool should_execute() const noexcept { + return ec_ != -1 && ec_ < uncaught_exceptions(); + } +}; + +class on_success_policy { + int ec_; + + public: + explicit on_success_policy(bool execute) noexcept : ec_{execute ? uncaught_exceptions() : -1} {} + + void dismiss() noexcept { + ec_ = -1; + } + + bool should_execute() const noexcept { + return ec_ != -1 && ec_ >= uncaught_exceptions(); + } +}; + +template +struct is_noarg_returns_void_action + : std::false_type {}; + +template +struct is_noarg_returns_void_action())())> + : std::true_type {}; + +template ::value> +struct is_nothrow_invocable_action + : std::false_type {}; + +template +struct is_nothrow_invocable_action + : std::integral_constant())())> {}; + +template +class scope_guard { + using A = typename std::decay::type; + + static_assert(is_noarg_returns_void_action::value, + "scope_guard requires no-argument action, that returns void."); + static_assert(std::is_same::value || std::is_same::value || std::is_same::value, + "scope_guard requires on_exit_policy, on_fail_policy or on_success_policy."); +#if defined(SCOPE_GUARD_NO_THROW_ACTION) + static_assert(is_nothrow_invocable_action::value, + "scope_guard requires noexcept invocable action."); +#endif +#if defined(SCOPE_GUARD_NO_THROW_CONSTRUCTIBLE) + static_assert(std::is_nothrow_move_constructible::value, + "scope_guard requires nothrow constructible action."); +#endif + + P policy_; + A action_; + + void* operator new(std::size_t) = delete; + void operator delete(void*) = delete; + + public: + scope_guard() = delete; + scope_guard(const scope_guard&) = delete; + scope_guard& operator=(const scope_guard&) = delete; + scope_guard& operator=(scope_guard&&) = delete; + + scope_guard(scope_guard&& other) noexcept(std::is_nothrow_move_constructible::value) + : policy_{false}, + action_{NEARGYE_MOV(other.action_)} { + policy_ = NEARGYE_MOV(other.policy_); + other.policy_.dismiss(); + } + + scope_guard(const A& action) = delete; + scope_guard(A& action) = delete; + + explicit scope_guard(A&& action) noexcept(std::is_nothrow_move_constructible::value) + : policy_{true}, + action_{NEARGYE_MOV(action)} {} + + void dismiss() noexcept { + policy_.dismiss(); + } + + ~scope_guard() NEARGYE_NOEXCEPT(is_nothrow_invocable_action::value) { + if (policy_.should_execute()) { + NEARGYE_TRY + action_(); + NEARGYE_CATCH + } + } +}; + +template +using scope_exit = scope_guard; + +template ::value, int>::type = 0> +NEARGYE_NODISCARD scope_exit make_scope_exit(F&& action) noexcept(noexcept(scope_exit{NEARGYE_FWD(action)})) { + return scope_exit{NEARGYE_FWD(action)}; +} + +template +using scope_fail = scope_guard; + +template ::value, int>::type = 0> +NEARGYE_NODISCARD scope_fail make_scope_fail(F&& action) noexcept(noexcept(scope_fail{NEARGYE_FWD(action)})) { + return scope_fail{NEARGYE_FWD(action)}; +} + +template +using scope_success = scope_guard; + +template ::value, int>::type = 0> +NEARGYE_NODISCARD scope_success make_scope_success(F&& action) noexcept(noexcept(scope_success{NEARGYE_FWD(action)})) { + return scope_success{NEARGYE_FWD(action)}; +} + +struct scope_exit_tag {}; + +template ::value, int>::type = 0> +scope_exit operator<<(scope_exit_tag, F&& action) noexcept(noexcept(scope_exit{NEARGYE_FWD(action)})) { + return scope_exit{NEARGYE_FWD(action)}; +} + +struct scope_fail_tag {}; + +template ::value, int>::type = 0> +scope_fail operator<<(scope_fail_tag, F&& action) noexcept(noexcept(scope_fail{NEARGYE_FWD(action)})) { + return scope_fail{NEARGYE_FWD(action)}; +} + +struct scope_success_tag {}; + +template ::value, int>::type = 0> +scope_success operator<<(scope_success_tag, F&& action) noexcept(noexcept(scope_success{NEARGYE_FWD(action)})) { + return scope_success{NEARGYE_FWD(action)}; +} + +#undef NEARGYE_MOV +#undef NEARGYE_FWD +#undef NEARGYE_NOEXCEPT +#undef NEARGYE_TRY +#undef NEARGYE_CATCH +#undef NEARGYE_NODISCARD + +} // namespace scope_guard::detail + +using detail::make_scope_exit; +using detail::make_scope_fail; +using detail::make_scope_success; + +} // namespace scope_guard + +// NEARGYE_MAYBE_UNUSED suppresses compiler warnings on unused entities, if any. +#if !defined(NEARGYE_MAYBE_UNUSED) +# if defined(__clang__) +# if (__clang_major__ * 10 + __clang_minor__) >= 39 && __cplusplus >= 201703L +# define NEARGYE_MAYBE_UNUSED [[maybe_unused]] +# else +# define NEARGYE_MAYBE_UNUSED __attribute__((__unused__)) +# endif +# elif defined(__GNUC__) +# if __GNUC__ >= 7 && __cplusplus >= 201703L +# define NEARGYE_MAYBE_UNUSED [[maybe_unused]] +# else +# define NEARGYE_MAYBE_UNUSED __attribute__((__unused__)) +# endif +# elif defined(_MSC_VER) +# if _MSC_VER >= 1911 && defined(_MSVC_LANG) && _MSVC_LANG >= 201703L +# define NEARGYE_MAYBE_UNUSED [[maybe_unused]] +# else +# define NEARGYE_MAYBE_UNUSED __pragma(warning(suppress : 4100 4101 4189)) +# endif +# else +# define NEARGYE_MAYBE_UNUSED +# endif +#endif + +#if !defined(NEARGYE_STR_CONCAT) +# define NEARGYE_STR_CONCAT_(s1, s2) s1##s2 +# define NEARGYE_STR_CONCAT(s1, s2) NEARGYE_STR_CONCAT_(s1, s2) +#endif + +#if !defined(NEARGYE_COUNTER) +# if defined(__COUNTER__) +# define NEARGYE_COUNTER __COUNTER__ +# elif defined(__LINE__) +# define NEARGYE_COUNTER __LINE__ +# endif +#endif + +#if defined(SCOPE_GUARD_NO_THROW_ACTION) +# define NEARGYE_MAKE_SCOPE_GUARD_ACTION [&]() noexcept -> void +#else +# define NEARGYE_MAKE_SCOPE_GUARD_ACTION [&]() -> void +#endif + +#define NEARGYE_MAKE_SCOPE_EXIT ::scope_guard::detail::scope_exit_tag{} << NEARGYE_MAKE_SCOPE_GUARD_ACTION +#define NEARGYE_MAKE_SCOPE_FAIL ::scope_guard::detail::scope_fail_tag{} << NEARGYE_MAKE_SCOPE_GUARD_ACTION +#define NEARGYE_MAKE_SCOPE_SUCCESS ::scope_guard::detail::scope_success_tag{} << NEARGYE_MAKE_SCOPE_GUARD_ACTION + +#define NEARGYE_SCOPE_GUARD_WITH_(g, i) for (int i = 1; i--; g) +#define NEARGYE_SCOPE_GUARD_WITH(g) NEARGYE_SCOPE_GUARD_WITH_(g, NEARGYE_STR_CONCAT(NEARGYE_INTERNAL_OBJECT_, NEARGYE_COUNTER)) + +// SCOPE_EXIT executing action on scope exit. +#define MAKE_SCOPE_EXIT(name) auto name = NEARGYE_MAKE_SCOPE_EXIT +#define SCOPE_EXIT NEARGYE_MAYBE_UNUSED const MAKE_SCOPE_EXIT(NEARGYE_STR_CONCAT(NEARGYE_SCOPE_EXIT_, NEARGYE_COUNTER)) +#define WITH_SCOPE_EXIT(guard) NEARGYE_SCOPE_GUARD_WITH(NEARGYE_MAKE_SCOPE_EXIT{ guard }) + +// SCOPE_FAIL executing action on scope exit when an exception has been thrown before scope exit. +#define MAKE_SCOPE_FAIL(name) auto name = NEARGYE_MAKE_SCOPE_FAIL +#define SCOPE_FAIL NEARGYE_MAYBE_UNUSED const MAKE_SCOPE_FAIL(NEARGYE_STR_CONCAT(NEARGYE_SCOPE_FAIL_, NEARGYE_COUNTER)) +#define WITH_SCOPE_FAIL(guard) NEARGYE_SCOPE_GUARD_WITH(NEARGYE_MAKE_SCOPE_FAIL{ guard }) + +// SCOPE_SUCCESS executing action on scope exit when no exceptions have been thrown before scope exit. +#define MAKE_SCOPE_SUCCESS(name) auto name = NEARGYE_MAKE_SCOPE_SUCCESS +#define SCOPE_SUCCESS NEARGYE_MAYBE_UNUSED const MAKE_SCOPE_SUCCESS(NEARGYE_STR_CONCAT(NEARGYE_SCOPE_SUCCESS_, NEARGYE_COUNTER)) +#define WITH_SCOPE_SUCCESS(guard) NEARGYE_SCOPE_GUARD_WITH(NEARGYE_MAKE_SCOPE_SUCCESS{ guard }) + +// DEFER executing action on scope exit. +#define MAKE_DEFER(name) MAKE_SCOPE_EXIT(name) +#define DEFER SCOPE_EXIT +#define WITH_DEFER(guard) WITH_SCOPE_EXIT(guard) + +#endif // NEARGYE_SCOPE_GUARD_HPP diff --git a/libfuse/lib/syslog.c b/libfuse/lib/syslog.c new file mode 100644 index 00000000..8702fbba --- /dev/null +++ b/libfuse/lib/syslog.c @@ -0,0 +1,110 @@ +/* + ISC License + + Copyright (c) 2023, Antonio SJ Musumeci + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#include +#include +#include + + +static bool g_SYSLOG_ENABLED = false; + +void +syslog_open() +{ + const char *ident = "mergerfs"; + const int option = (LOG_CONS|LOG_PID); + const int facility = LOG_USER; + + openlog(ident,option,facility); + g_SYSLOG_ENABLED = true; +} + +void +syslog_close() +{ + closelog(); + g_SYSLOG_ENABLED = false; +} + +static +void +syslog_vlog(const int priority_, + const char *format_, + va_list valist_) +{ + if(g_SYSLOG_ENABLED == false) + return; + + vsyslog(priority_,format_,valist_); +} + +void +syslog_log(const int priority_, + const char *format_, + ...) +{ + va_list valist; + + va_start(valist,format_); + syslog_vlog(priority_,format_,valist); + va_end(valist); +} + +void +syslog_info(const char *format_, + ...) +{ + va_list valist; + + va_start(valist,format_); + syslog_vlog(LOG_INFO,format_,valist); + va_end(valist); +} + +void +syslog_notice(const char *format_, + ...) +{ + va_list valist; + + va_start(valist,format_); + syslog_vlog(LOG_NOTICE,format_,valist); + va_end(valist); +} + +void +syslog_warning(const char *format_, + ...) +{ + va_list valist; + + va_start(valist,format_); + syslog_vlog(LOG_WARNING,format_,valist); + va_end(valist); +} + +void +syslog_error(const char *format_, + ...) +{ + va_list valist; + + va_start(valist,format_); + syslog_vlog(LOG_ERR,format_,valist); + va_end(valist); +} diff --git a/libfuse/lib/syslog.h b/libfuse/lib/syslog.h new file mode 100644 index 00000000..a617ad39 --- /dev/null +++ b/libfuse/lib/syslog.h @@ -0,0 +1,30 @@ +/* + ISC License + + Copyright (c) 2023, Antonio SJ Musumeci + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#pragma once + +#include + + +void syslog_open(); +void syslog_log(const int priority, const char *format, ...); +void syslog_info(const char *format, ...); +void syslog_notice(const char *format, ...); +void syslog_warning(const char *format, ...); +void syslog_error(const char *format, ...); +void syslog_close(); diff --git a/libfuse/lib/thread_pool.hpp b/libfuse/lib/thread_pool.hpp index c8b5ac32..3737a186 100644 --- a/libfuse/lib/thread_pool.hpp +++ b/libfuse/lib/thread_pool.hpp @@ -1,6 +1,7 @@ #pragma once #include "unbounded_queue.hpp" +#include "bounded_queue.hpp" #include #include diff --git a/man/mergerfs.1 b/man/mergerfs.1 index afb6601b..ffed3fd2 100644 --- a/man/mergerfs.1 +++ b/man/mergerfs.1 @@ -311,6 +311,13 @@ reading FUSE messages which are dispatched to process threads. -1 means disabled otherwise acts like \f[C]read-thread-count\f[R]. (default: -1) .IP \[bu] 2 +\f[B]process-thread-queue-depth=INT\f[R]: Sets the number of requests +any single process thread can have queued up at one time. +Meaning the total memory usage of the queues is queue depth multiplied +by the number of process threads plus read thread count. +-1 sets the depth to the same as the process thread count. +(default: -1) +.IP \[bu] 2 \f[B]pin-threads=STR\f[R]: Selects a strategy to pin threads to CPUs (default: unset) .IP \[bu] 2 @@ -1480,7 +1487,8 @@ Before mounting over top the mount point with the new instance of mergerfs issue: \f[C]umount -l \f[R]. Or you can let mergerfs do it by setting the option \f[C]lazy-umount-mountpoint=true\f[R]. -.SH RUNTIME CONFIG +.SH RUNTIME INTERFACES +.SS RUNTIME CONFIG .SS .mergerfs pseudo file .IP .nf @@ -1596,6 +1604,45 @@ given the getattr policy .IP \[bu] 2 \f[B]user.mergerfs.allpaths\f[R]: a NUL (`\[rs]0') separated list of full paths to all files found +.SS SIGNALS +.IP \[bu] 2 +USR1: This will cause mergerfs to send invalidation notifications to the +kernel for all files. +This will cause all unused files to be released from memory. +.IP \[bu] 2 +USR2: Trigger a general cleanup of currently unused memory. +A more thorough version of what happens every \[ti]15 minutes. +.SS IOCTLS +.PP +Found in \f[C]fuse_ioctl.cpp\f[R]: +.IP +.nf +\f[C] +typedef char IOCTL_BUF[4096]; +#define IOCTL_APP_TYPE 0xDF +#define IOCTL_FILE_INFO _IOWR(IOCTL_APP_TYPE,0,IOCTL_BUF) +#define IOCTL_GC _IO(IOCTL_APP_TYPE,1) +#define IOCTL_GC1 _IO(IOCTL_APP_TYPE,2) +#define IOCTL_INVALIDATE_ALL_NODES _IO(IOCTL_APP_TYPE,3) +\f[R] +.fi +.IP \[bu] 2 +IOCTL_FILE_INFO: Same as the \[lq]file / directory xattrs\[rq] mentioned +above. +Use a buffer size of 4096 bytes. +Pass in a string of \[lq]basepath\[rq], \[lq]relpath\[rq], +\[lq]fullpath\[rq], or \[lq]allpaths\[rq]. +Receive details in same buffer. +.IP \[bu] 2 +IOCTL_GC: Triggers a thorough garbage collection of excess memory. +Same as SIGUSR2. +.IP \[bu] 2 +IOCTL_GC1: Triggers a simple garbage collection of excess memory. +Same as what happens every 15 minutes normally. +.IP \[bu] 2 +IOCTL_INVALIDATE_ALL_NODES: Same as SIGUSR1. +Send invalidation notifications to the kernel for all files causing +unused files to be released from memory. .SH TOOLING .IP \[bu] 2 https://github.com/trapexit/mergerfs-tools diff --git a/src/config.cpp b/src/config.cpp index e2420c8a..16a5999d 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -65,6 +65,7 @@ namespace l IFERT("pid"); IFERT("pin-threads"); IFERT("process-thread-count"); + IFERT("process-thread-queue-depth"); IFERT("read-thread-count"); IFERT("readdirplus"); IFERT("scheduling-priority"); @@ -123,6 +124,7 @@ Config::Config() symlinkify_timeout(3600), fuse_read_thread_count(0), fuse_process_thread_count(-1), + fuse_process_thread_queue_depth(0), fuse_pin_threads("false"), version(MERGERFS_VERSION), writeback_cache(false), @@ -200,6 +202,7 @@ Config::Config() _map["threads"] = &fuse_read_thread_count; _map["read-thread-count"] = &fuse_read_thread_count; _map["process-thread-count"] = &fuse_process_thread_count; + _map["process-thread-queue-depth"] = &fuse_process_thread_queue_depth; _map["version"] = &version; _map["xattr"] = &xattr; } diff --git a/src/config.hpp b/src/config.hpp index 75f6cbe1..387489b7 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -147,6 +147,7 @@ public: ConfigUINT64 symlinkify_timeout; ConfigINT fuse_read_thread_count; ConfigINT fuse_process_thread_count; + ConfigINT fuse_process_thread_queue_depth; ConfigSTR fuse_pin_threads; ConfigSTR version; ConfigBOOL writeback_cache; diff --git a/src/fuse_ioctl.cpp b/src/fuse_ioctl.cpp index 0a8f434f..dca3ea89 100644 --- a/src/fuse_ioctl.cpp +++ b/src/fuse_ioctl.cpp @@ -42,7 +42,10 @@ using std::vector; typedef char IOCTL_BUF[4096]; #define IOCTL_APP_TYPE 0xDF -#define IOCTL_FILE_INFO _IOWR(IOCTL_APP_TYPE,0,IOCTL_BUF) +#define IOCTL_FILE_INFO _IOWR(IOCTL_APP_TYPE,0,IOCTL_BUF) +#define IOCTL_GC _IO(IOCTL_APP_TYPE,1) +#define IOCTL_GC1 _IO(IOCTL_APP_TYPE,2) +#define IOCTL_INVALIDATE_ALL_NODES _IO(IOCTL_APP_TYPE,3) // From linux/btrfs.h #define BTRFS_IOCTL_MAGIC 0x94 @@ -334,6 +337,15 @@ namespace l { case IOCTL_FILE_INFO: return l::file_info(ffi_,data_); + case IOCTL_GC: + fuse_gc(); + return 0; + case IOCTL_GC1: + fuse_gc1(); + return 0; + case IOCTL_INVALIDATE_ALL_NODES: + fuse_invalidate_all_nodes(); + return 0; } return -ENOTTY; diff --git a/src/mergerfs.cpp b/src/mergerfs.cpp index 39d7cde9..704e003f 100644 --- a/src/mergerfs.cpp +++ b/src/mergerfs.cpp @@ -76,6 +76,7 @@ #include "fuse.h" +#include #include #include @@ -195,6 +196,30 @@ namespace l } } + static + void + usr1_signal_handler(int signal_) + { + syslog_info("Received SIGUSR1 - invalidating all nodes"); + fuse_invalidate_all_nodes(); + } + + static + void + usr2_signal_handler(int signal_) + { + syslog_info("Received SIGUSR2 - triggering thorough gc"); + fuse_gc(); + } + + static + void + setup_signal_handlers() + { + std::signal(SIGUSR1,l::usr1_signal_handler); + std::signal(SIGUSR2,l::usr2_signal_handler); + } + int main(const int argc_, char **argv_) @@ -223,6 +248,7 @@ namespace l l::wait_for_mount(cfg); l::setup_resources(cfg->scheduling_priority); + l::setup_signal_handlers(); l::get_fuse_operations(ops,cfg->nullrw); if(cfg->lazy_umount_mountpoint) diff --git a/src/option_parser.cpp b/src/option_parser.cpp index fdd817a4..701f9094 100644 --- a/src/option_parser.cpp +++ b/src/option_parser.cpp @@ -82,6 +82,7 @@ set_fuse_threads(Config::Write &cfg_, { set_kv_option("read-thread-count",cfg_->fuse_read_thread_count.to_string(),args_); set_kv_option("process-thread-count",cfg_->fuse_process_thread_count.to_string(),args_); + set_kv_option("process-thread-queue-depth",cfg_->fuse_process_thread_queue_depth.to_string(),args_); set_kv_option("pin-threads",cfg_->fuse_pin_threads.to_string(),args_); }