Browse Source

Add manual GC triggering + configurable process queue depth

Yes, these are unrelated changes but somehow ended up being
prototyped together and I'm too lazy to separate them.
pull/1204/head
Antonio SJ Musumeci 2 years ago
parent
commit
5ab0fbcaee
  1. 44
      README.md
  2. 12
      libfuse/Makefile
  3. 5
      libfuse/include/fuse.h
  4. 1
      libfuse/include/fuse_lowlevel.h
  5. 3
      libfuse/include/fuse_msgbuf.h
  6. 31
      libfuse/lib/bounded_queue.hpp
  7. 143
      libfuse/lib/bounded_thread_pool.hpp
  8. 391
      libfuse/lib/fuse.c
  9. 219
      libfuse/lib/fuse_loop_mt.cpp
  10. 106
      libfuse/lib/fuse_msgbuf.cpp
  11. 9
      libfuse/lib/fuse_msgbuf.hpp
  12. 1
      libfuse/lib/fuse_mt.c
  13. 14
      libfuse/lib/fuse_signals.c
  14. 13
      libfuse/lib/lock.h
  15. 41
      libfuse/lib/make_unique.hpp
  16. 60
      libfuse/lib/node.c
  17. 30
      libfuse/lib/node.h
  18. 369
      libfuse/lib/scope_guard.hpp
  19. 110
      libfuse/lib/syslog.c
  20. 30
      libfuse/lib/syslog.h
  21. 1
      libfuse/lib/thread_pool.hpp
  22. 49
      man/mergerfs.1
  23. 3
      src/config.cpp
  24. 1
      src/config.hpp
  25. 12
      src/fuse_ioctl.cpp
  26. 26
      src/mergerfs.cpp
  27. 1
      src/option_parser.cpp

44
README.md

@ -241,6 +241,11 @@ These options are the same regardless of whether you use them with the
`read-thread-count` refers to the number of threads reading FUSE
messages which are dispatched to process threads. -1 means disabled
otherwise acts like `read-thread-count`. (default: -1)
* **process-thread-queue-depth=UINT**: Sets the number of requests any
single process thread can have queued up at one time. Meaning the
total memory usage of the queues is queue depth multiplied by the
number of process threads plus read thread count. 0 sets the depth
to the same as the process thread count. (default: 0)
* **pin-threads=STR**: Selects a strategy to pin threads to CPUs
(default: unset)
* **scheduling-priority=INT**: Set mergerfs' scheduling
@ -1123,7 +1128,9 @@ issue: `umount -l <mergerfs_mountpoint>`. Or you can let mergerfs do
it by setting the option `lazy-umount-mountpoint=true`.
# RUNTIME CONFIG
# RUNTIME INTERFACES
## RUNTIME CONFIG
#### .mergerfs pseudo file ####
@ -1206,6 +1213,41 @@ following:
* **user.mergerfs.allpaths**: a NUL ('\0') separated list of full paths to all files found
## SIGNALS
* USR1: This will cause mergerfs to send invalidation notifications to
the kernel for all files. This will cause all unused files to be
released from memory.
* USR2: Trigger a general cleanup of currently unused memory. A more
thorough version of what happens every ~15 minutes.
## IOCTLS
Found in `fuse_ioctl.cpp`:
```C++
typedef char IOCTL_BUF[4096];
#define IOCTL_APP_TYPE 0xDF
#define IOCTL_FILE_INFO _IOWR(IOCTL_APP_TYPE,0,IOCTL_BUF)
#define IOCTL_GC _IO(IOCTL_APP_TYPE,1)
#define IOCTL_GC1 _IO(IOCTL_APP_TYPE,2)
#define IOCTL_INVALIDATE_ALL_NODES _IO(IOCTL_APP_TYPE,3)
```
* IOCTL\_FILE\_INFO: Same as the "file / directory xattrs" mentioned
above. Use a buffer size of 4096 bytes. Pass in a string of
"basepath", "relpath", "fullpath", or "allpaths". Receive details in
same buffer.
* IOCTL\_GC: Triggers a thorough garbage collection of excess
memory. Same as SIGUSR2.
* IOCTL\_GC1: Triggers a simple garbage collection of excess
memory. Same as what happens every 15 minutes normally.
* IOCTL\_INVALIDATE\_ALL\_NODES: Same as SIGUSR1. Send invalidation
notifications to the kernel for all files causing unused files to be
released from memory.
# TOOLING
* https://github.com/trapexit/mergerfs-tools

12
libfuse/Makefile

@ -15,6 +15,12 @@ else
OPT_FLAGS := -O2
endif
ifeq ($(LTO),1)
LTO_FLAGS := -flto
else
LTO_FLAGS :=
endif
DESTDIR =
PREFIX = /usr/local
EXEC_PREFIX = $(PREFIX)
@ -39,12 +45,14 @@ SRC_C = \
lib/fuse_dirents.c \
lib/fuse_lowlevel.c \
lib/fuse_mt.c \
lib/node.c \
lib/fuse_node.c \
lib/fuse_opt.c \
lib/fuse_session.c \
lib/fuse_signals.c \
lib/helper.c \
lib/mount.c
lib/mount.c \
lib/syslog.c
SRC_CPP = \
lib/format.cpp \
lib/os.cpp \
@ -59,12 +67,14 @@ CFLAGS ?= \
$(OPT_FLAGS)
CFLAGS := \
${CFLAGS} \
$(LTO_FLAGS) \
-std=gnu99 \
-Wall \
-pipe \
-MMD
CXXFLAGS := \
${CXXFLAGS} \
$(LTO_FLAGS) \
-std=c++11 \
-Wall \
-pipe \

5
libfuse/include/fuse.h

@ -629,6 +629,7 @@ void fuse_exit(struct fuse *f);
int fuse_config_read_thread_count(const struct fuse *f);
int fuse_config_process_thread_count(const struct fuse *f);
int fuse_config_process_thread_queue_depth(const struct fuse *f);
const char* fuse_config_pin_threads(const struct fuse *f);
/**
@ -764,6 +765,10 @@ void fuse_set_getcontext_func(struct fuse_context *(*func)(void));
/** Get session from fuse object */
struct fuse_session *fuse_get_session(struct fuse *f);
void fuse_gc1();
void fuse_gc();
void fuse_invalidate_all_nodes();
EXTERN_C_END
#endif /* _FUSE_H_ */

1
libfuse/include/fuse_lowlevel.h

@ -1460,6 +1460,7 @@ void fuse_session_process(struct fuse_session *se,
int fuse_session_loop_mt(struct fuse_session *se,
const int read_thread_count,
const int process_thread_count,
const int process_thread_queue_depth,
const char *pin_threads_type);
/* ----------------------------------------------------------- *

3
libfuse/include/fuse_msgbuf.h

@ -5,7 +5,6 @@
typedef struct fuse_msgbuf_t fuse_msgbuf_t;
struct fuse_msgbuf_t
{
char *mem;
uint32_t size;
uint32_t used;
char *mem;
};

31
libfuse/lib/bounded_queue.hpp

@ -13,25 +13,26 @@ public:
explicit
BoundedQueue(std::size_t max_size_,
bool block_ = true)
: _block(block),
_max_size(max_size_)
: _block(block_),
_max_size(max_size_ ? max_size_ : 1)
{
if(_max_size == 0)
_max_size = 1;
}
BoundedQueue(const BoundedQueue&) = delete;
BoundedQueue(BoundedQueue&&) = default;
bool
push(const T& item_)
{
{
std::unique_lock guard(_queue_lock);
std::unique_lock<std::mutex> guard(_queue_lock);
_condition_push.wait(guard, [&]() { return _queue.size() < _max_size || !_block; });
if(_queue.size() == _max_size)
return false;
_queue.push(item);
_queue.push(item_);
}
_condition_pop.notify_one();
@ -43,7 +44,7 @@ public:
push(T&& item_)
{
{
std::unique_lock guard(_queue_lock);
std::unique_lock<std::mutex> guard(_queue_lock);
_condition_push.wait(guard, [&]() { return _queue.size() < _max_size || !_block; });
@ -61,7 +62,7 @@ public:
emplace(Args&&... args_)
{
{
std::unique_lock guard(_queue_lock);
std::lock_guard<std::mutex> guard(_queue_lock);
_condition_push.wait(guard, [&]() { return _queue.size() < _max_size || !_block; });
@ -80,7 +81,7 @@ public:
pop(T& item_)
{
{
std::unique_lock guard(_queue_lock);
std::unique_lock<std::mutex> guard(_queue_lock);
_condition_pop.wait(guard, [&]() { return !_queue.empty() || !_block; });
if(_queue.empty())
@ -99,7 +100,7 @@ public:
std::size_t
size() const
{
std::lock_guard guard(_queue_lock);
std::lock_guard<std::mutex> guard(_queue_lock);
return _queue.size();
}
@ -113,7 +114,7 @@ public:
bool
empty() const
{
std::lock_guard guard(_queue_lock);
std::lock_guard<std::mutex> guard(_queue_lock);
return _queue.empty();
}
@ -121,7 +122,7 @@ public:
bool
full() const
{
std::lock_guard lock(_queue_lock);
std::lock_guard<std::mutex> lock(_queue_lock);
return (_queue.size() == capacity());
}
@ -129,7 +130,7 @@ public:
void
block()
{
std::lock_guard guard(_queue_lock);
std::lock_guard<std::mutex> guard(_queue_lock);
_block = true;
}
@ -137,7 +138,7 @@ public:
unblock()
{
{
std::lock_guard guard(_queue_lock);
std::lock_guard<std::mutex> guard(_queue_lock);
_block = false;
}
@ -148,7 +149,7 @@ public:
bool
blocking() const
{
std::lock_guard guard(_queue_lock);
std::lock_guard<std::mutex> guard(_queue_lock);
return _block;
}

143
libfuse/lib/bounded_thread_pool.hpp

@ -0,0 +1,143 @@
#pragma once
#include "bounded_queue.hpp"
#include "make_unique.hpp"
#include <signal.h>
#include <tuple>
#include <atomic>
#include <vector>
#include <thread>
#include <memory>
#include <future>
#include <utility>
#include <functional>
#include <type_traits>
class BoundedThreadPool
{
private:
using Proc = std::function<void(void)>;
using Queue = BoundedQueue<Proc>;
using Queues = std::vector<std::unique_ptr<Queue>>;
public:
explicit
BoundedThreadPool(const std::size_t thread_count_ = std::thread::hardware_concurrency(),
const std::size_t queue_depth_ = 1)
: _queues(),
_count(thread_count_)
{
for(std::size_t i = 0; i < thread_count_; i++)
_queues.emplace_back(std::make_unique<Queue>(queue_depth_));
auto worker = [this](std::size_t i)
{
while(true)
{
Proc f;
for(std::size_t n = 0; n < (_count * K); ++n)
{
if(_queues[(i + n) % _count]->pop(f))
break;
}
if(!f && !_queues[i]->pop(f))
break;
f();
}
};
sigset_t oldset;
sigset_t newset;
sigfillset(&newset);
pthread_sigmask(SIG_BLOCK,&newset,&oldset);
_threads.reserve(thread_count_);
for(std::size_t i = 0; i < thread_count_; ++i)
_threads.emplace_back(worker, i);
pthread_sigmask(SIG_SETMASK,&oldset,NULL);
}
~BoundedThreadPool()
{
for(auto &queue : _queues)
queue->unblock();
for(auto &thread : _threads)
pthread_cancel(thread.native_handle());
for(auto &thread : _threads)
thread.join();
}
template<typename F>
void
enqueue_work(F&& f_)
{
auto i = _index++;
for(std::size_t n = 0; n < (_count * K); ++n)
{
if(_queues[(i + n) % _count]->push(f_))
return;
}
_queues[i % _count]->push(std::move(f_));
}
template<typename F>
[[nodiscard]]
std::future<typename std::result_of<F()>::type>
enqueue_task(F&& f_)
{
using TaskReturnType = typename std::result_of<F()>::type;
using Promise = std::promise<TaskReturnType>;
auto i = _index++;
auto promise = std::make_shared<Promise>();
auto future = promise->get_future();
auto work = [=]() {
auto rv = f_();
promise->set_value(rv);
};
for(std::size_t n = 0; n < (_count * K); ++n)
{
if(_queues[(i + n) % _count]->push(work))
return future;
}
_queues[i % _count]->push(std::move(work));
return future;
}
public:
std::vector<pthread_t>
threads()
{
std::vector<pthread_t> rv;
for(auto &thread : _threads)
rv.push_back(thread.native_handle());
return rv;
}
private:
Queues _queues;
private:
std::vector<std::thread> _threads;
private:
const std::size_t _count;
std::atomic_uint _index;
static const unsigned int K = 2;
};

391
libfuse/lib/fuse.c

@ -13,8 +13,8 @@
#include "fuse_node.h"
#include "khash.h"
#include "kvec.h"
#include "lfmp.h"
#include "node.h"
#include "config.h"
#include "fuse_dirents.h"
#include "fuse_i.h"
@ -74,6 +74,7 @@ struct fuse_config
int help;
int read_thread_count;
int process_thread_count;
int process_thread_queue_depth;
char *pin_threads;
};
@ -89,18 +90,18 @@ struct lock_queue_element
uint64_t nodeid1;
const char *name1;
char **path1;
struct node **wnode1;
node_t **wnode1;
uint64_t nodeid2;
const char *name2;
char **path2;
struct node **wnode2;
node_t **wnode2;
int err;
bool done : 1;
};
struct node_table
{
struct node **array;
node_t **array;
size_t use;
size_t size;
size_t split;
@ -122,7 +123,7 @@ struct list_head
typedef struct remembered_node_t remembered_node_t;
struct remembered_node_t
{
struct node *node;
node_t *node;
time_t time;
};
@ -146,7 +147,6 @@ struct fuse
struct lock_queue_element *lockq;
pthread_t maintenance_thread;
lfmp_t node_fmp;
kvec_t(remembered_node_t) remembered_nodes;
};
@ -160,28 +160,6 @@ struct lock
struct lock *next;
};
struct node
{
struct node *name_next;
struct node *id_next;
uint64_t nodeid;
char *name;
struct node *parent;
uint64_t nlookup;
uint32_t refctr;
uint32_t open_count;
uint64_t hidden_fh;
int32_t treelock;
struct lock *locks;
uint32_t stat_crc32b;
uint8_t is_stat_cache_valid:1;
};
#define TREELOCK_WRITE -1
#define TREELOCK_WAIT_OFFSET INT_MIN
@ -283,21 +261,6 @@ list_del(struct list_head *entry)
prev->next = next;
}
static
struct node*
alloc_node(struct fuse *f)
{
return lfmp_calloc(&f->node_fmp);
}
static
void
free_node_mem(struct fuse *f,
struct node *node)
{
return lfmp_free(&f->node_fmp,node);
}
static
size_t
id_hash(struct fuse *f,
@ -313,12 +276,12 @@ id_hash(struct fuse *f,
}
static
struct node*
node_t*
get_node_nocheck(struct fuse *f,
uint64_t nodeid)
{
size_t hash = id_hash(f,nodeid);
struct node *node;
node_t *node;
for(node = f->id_table.array[hash]; node != NULL; node = node->id_next)
if(node->nodeid == nodeid)
@ -328,11 +291,11 @@ get_node_nocheck(struct fuse *f,
}
static
struct node*
node_t*
get_node(struct fuse *f,
const uint64_t nodeid)
{
struct node *node = get_node_nocheck(f,nodeid);
node_t *node = get_node_nocheck(f,nodeid);
if(!node)
{
@ -347,7 +310,7 @@ get_node(struct fuse *f,
static
void
remove_remembered_node(struct fuse *f_,
struct node *node_)
node_t *node_)
{
for(size_t i = 0; i < kv_size(f_->remembered_nodes); i++)
{
@ -402,14 +365,14 @@ current_time()
static
void
free_node(struct fuse *f_,
struct node *node_)
node_t *node_)
{
filename_free(f_,node_->name);
if(node_->hidden_fh)
f_->fs->op.free_hide(node_->hidden_fh);
free_node_mem(f_,node_);
node_free(node_);
}
static
@ -422,7 +385,7 @@ node_table_reduce(struct node_table *t)
if(newsize < NODE_TABLE_MIN_SIZE)
return;
newarray = realloc(t->array,sizeof(struct node *)* newsize);
newarray = realloc(t->array,sizeof(node_t*) * newsize);
if(newarray != NULL)
t->array = newarray;
@ -442,13 +405,13 @@ remerge_id(struct fuse *f)
for(iter = 8; t->split > 0 && iter; iter--)
{
struct node **upper;
node_t **upper;
t->split--;
upper = &t->array[t->split + t->size / 2];
if(*upper)
{
struct node **nodep;
node_t **nodep;
for(nodep = &t->array[t->split]; *nodep;
nodep = &(*nodep)->id_next);
@ -463,9 +426,9 @@ remerge_id(struct fuse *f)
static
void
unhash_id(struct fuse *f,
struct node *node)
node_t *node)
{
struct node **nodep = &f->id_table.array[id_hash(f,node->nodeid)];
node_t **nodep = &f->id_table.array[id_hash(f,node->nodeid)];
for(; *nodep != NULL; nodep = &(*nodep)->id_next)
if(*nodep == node)
@ -486,12 +449,12 @@ node_table_resize(struct node_table *t)
size_t newsize = t->size * 2;
void *newarray;
newarray = realloc(t->array,sizeof(struct node *)* newsize);
newarray = realloc(t->array,sizeof(node_t*) * newsize);
if(newarray == NULL)
return -1;
t->array = newarray;
memset(t->array + t->size,0,t->size * sizeof(struct node *));
memset(t->array + t->size,0,t->size * sizeof(node_t*));
t->size = newsize;
t->split = 0;
@ -503,8 +466,8 @@ void
rehash_id(struct fuse *f)
{
struct node_table *t = &f->id_table;
struct node **nodep;
struct node **next;
node_t **nodep;
node_t **next;
size_t hash;
if(t->split == t->size / 2)
@ -514,7 +477,7 @@ rehash_id(struct fuse *f)
t->split++;
for(nodep = &t->array[hash]; *nodep != NULL; nodep = next)
{
struct node *node = *nodep;
node_t *node = *nodep;
size_t newhash = id_hash(f,node->nodeid);
if(newhash != hash)
@ -537,7 +500,7 @@ rehash_id(struct fuse *f)
static
void
hash_id(struct fuse *f,
struct node *node)
node_t *node)
{
size_t hash;
@ -573,7 +536,7 @@ name_hash(struct fuse *f,
static
void
unref_node(struct fuse *f,
struct node *node);
node_t *node);
static
void
@ -587,13 +550,13 @@ remerge_name(struct fuse *f)
for(iter = 8; t->split > 0 && iter; iter--)
{
struct node **upper;
node_t **upper;
t->split--;
upper = &t->array[t->split + t->size / 2];
if(*upper)
{
struct node **nodep;
node_t **nodep;
for(nodep = &t->array[t->split]; *nodep; nodep = &(*nodep)->name_next);
@ -607,12 +570,12 @@ remerge_name(struct fuse *f)
static
void
unhash_name(struct fuse *f,
struct node *node)
node_t *node)
{
if(node->name)
{
size_t hash = name_hash(f,node->parent->nodeid,node->name);
struct node **nodep = &f->name_table.array[hash];
node_t **nodep = &f->name_table.array[hash];
for(; *nodep != NULL; nodep = &(*nodep)->name_next)
if(*nodep == node)
@ -643,8 +606,8 @@ void
rehash_name(struct fuse *f)
{
struct node_table *t = &f->name_table;
struct node **nodep;
struct node **next;
node_t **nodep;
node_t **next;
size_t hash;
if(t->split == t->size / 2)
@ -654,7 +617,7 @@ rehash_name(struct fuse *f)
t->split++;
for(nodep = &t->array[hash]; *nodep != NULL; nodep = next)
{
struct node *node = *nodep;
node_t *node = *nodep;
size_t newhash = name_hash(f,node->parent->nodeid,node->name);
if(newhash != hash)
@ -677,12 +640,12 @@ rehash_name(struct fuse *f)
static
int
hash_name(struct fuse *f,
struct node *node,
node_t *node,
uint64_t parentid,
const char *name)
{
size_t hash = name_hash(f,parentid,name);
struct node *parent = get_node(f,parentid);
node_t *parent = get_node(f,parentid);
node->name = filename_strdup(f,name);
if(node->name == NULL)
return -1;
@ -710,20 +673,20 @@ remember_nodes(struct fuse *f_)
static
void
delete_node(struct fuse *f,
struct node *node)
node_t *node)
{
assert(node->treelock == 0);
unhash_name(f,node);
if(remember_nodes(f))
remove_remembered_node(f,node);
unhash_id(f,node);
free_node(f,node);
node_free(node);
}
static
void
unref_node(struct fuse *f,
struct node *node)
node_t *node)
{
assert(node->refctr > 0);
node->refctr--;
@ -745,13 +708,13 @@ rand64(void)
}
static
struct node*
node_t*
lookup_node(struct fuse *f,
uint64_t parent,
const char *name)
{
size_t hash;
struct node *node;
node_t *node;
hash = name_hash(f,parent,name);
for(node = f->name_table.array[hash]; node != NULL; node = node->name_next)
@ -763,7 +726,7 @@ lookup_node(struct fuse *f,
static
void
inc_nlookup(struct node *node)
inc_nlookup(node_t *node)
{
if(!node->nlookup)
node->refctr++;
@ -771,12 +734,12 @@ inc_nlookup(struct node *node)
}
static
struct node*
node_t*
find_node(struct fuse *f,
uint64_t parent,
const char *name)
{
struct node *node;
node_t *node;
pthread_mutex_lock(&f->lock);
if(!name)
@ -786,7 +749,7 @@ find_node(struct fuse *f,
if(node == NULL)
{
node = alloc_node(f);
node = node_alloc();
if(node == NULL)
goto out_err;
@ -856,10 +819,10 @@ static
void
unlock_path(struct fuse *f,
uint64_t nodeid,
struct node *wnode,
struct node *end)
node_t *wnode,
node_t *end)
{
struct node *node;
node_t *node;
if(wnode)
{
@ -884,14 +847,14 @@ try_get_path(struct fuse *f,
uint64_t nodeid,
const char *name,
char **path,
struct node **wnodep,
node_t **wnodep,
bool need_lock)
{
unsigned bufsize = 256;
char *buf;
char *s;
struct node *node;
struct node *wnode = NULL;
node_t *node;
node_t *wnode = NULL;
int err;
*path = NULL;
@ -980,8 +943,8 @@ try_get_path2(struct fuse *f,
const char *name2,
char **path1,
char **path2,
struct node **wnode1,
struct node **wnode2)
node_t **wnode1,
node_t **wnode2)
{
int err;
@ -991,7 +954,7 @@ try_get_path2(struct fuse *f,
err = try_get_path(f,nodeid2,name2,path2,wnode2,true);
if(err)
{
struct node *wn1 = wnode1 ? *wnode1 : NULL;
node_t *wn1 = wnode1 ? *wnode1 : NULL;
unlock_path(f,nodeid1,wn1,NULL);
free(*path1);
@ -1109,7 +1072,7 @@ get_path_common(struct fuse *f,
uint64_t nodeid,
const char *name,
char **path,
struct node **wnode)
node_t **wnode)
{
int err;
@ -1156,7 +1119,7 @@ get_path_wrlock(struct fuse *f,
uint64_t nodeid,
const char *name,
char **path,
struct node **wnode)
node_t **wnode)
{
return get_path_common(f,nodeid,name,path,wnode);
}
@ -1170,8 +1133,8 @@ get_path2(struct fuse *f,
const char *name2,
char **path1,
char **path2,
struct node **wnode1,
struct node **wnode2)
node_t **wnode1,
node_t **wnode2)
{
int err;
@ -1202,7 +1165,7 @@ static
void
free_path_wrlock(struct fuse *f,
uint64_t nodeid,
struct node *wnode,
node_t *wnode,
char *path)
{
pthread_mutex_lock(&f->lock);
@ -1228,8 +1191,8 @@ void
free_path2(struct fuse *f,
uint64_t nodeid1,
uint64_t nodeid2,
struct node *wnode1,
struct node *wnode2,
node_t *wnode1,
node_t *wnode2,
char *path1,
char *path2)
{
@ -1248,7 +1211,7 @@ forget_node(struct fuse *f,
const uint64_t nodeid,
const uint64_t nlookup)
{
struct node *node;
node_t *node;
if(nodeid == FUSE_ROOT_ID)
return;
@ -1300,7 +1263,7 @@ forget_node(struct fuse *f,
static
void
unlink_node(struct fuse *f,
struct node *node)
node_t *node)
{
if(remember_nodes(f))
{
@ -1316,7 +1279,7 @@ remove_node(struct fuse *f,
uint64_t dir,
const char *name)
{
struct node *node;
node_t *node;
pthread_mutex_lock(&f->lock);
node = lookup_node(f,dir,name);
@ -1333,8 +1296,8 @@ rename_node(struct fuse *f,
uint64_t newdir,
const char *newname)
{
struct node *node;
struct node *newnode;
node_t *node;
node_t *newnode;
int err = 0;
pthread_mutex_lock(&f->lock);
@ -1381,14 +1344,14 @@ req_fuse(fuse_req_t req)
static
int
node_open(const struct node *node_)
node_open(const node_t *node_)
{
return ((node_ != NULL) && (node_->open_count > 0));
}
static
void
update_stat(struct node *node_,
update_stat(node_t *node_,
const struct stat *stnew_)
{
uint32_t crc32b;
@ -1408,7 +1371,7 @@ set_path_info(struct fuse *f,
const char *name,
struct fuse_entry_param *e)
{
struct node *node;
node_t *node;
node = find_node(f,nodeid,name);
if(node == NULL)
@ -1591,7 +1554,7 @@ fuse_lib_lookup(fuse_req_t req,
char *path;
const char *name;
struct fuse *f;
struct node *dot = NULL;
node_t *dot = NULL;
struct fuse_entry_param e = {0};
name = fuse_hdr_arg(hdr_);
@ -1681,9 +1644,11 @@ fuse_lib_forget_multi(fuse_req_t req,
entry = PARAM(arg);
for(uint32_t i = 0; i < arg->count; i++)
{
forget_node(f,
entry[i].nodeid,
entry[i].nlookup);
}
fuse_reply_none(req);
}
@ -1698,7 +1663,7 @@ fuse_lib_getattr(fuse_req_t req,
char *path;
struct fuse *f;
struct stat buf;
struct node *node;
node_t *node;
fuse_timeouts_t timeout;
fuse_file_info_t ffi = {0};
const struct fuse_getattr_in *arg;
@ -1759,7 +1724,7 @@ fuse_lib_setattr(fuse_req_t req,
struct stat stbuf = {0};
char *path;
int err;
struct node *node;
node_t *node;
fuse_timeouts_t timeout;
fuse_file_info_t *fi;
fuse_file_info_t ffi = {0};
@ -2019,7 +1984,7 @@ fuse_lib_unlink(fuse_req_t req,
char *path;
struct fuse *f;
const char *name;
struct node *wnode;
node_t *wnode;
name = PARAM(hdr_);
@ -2052,7 +2017,7 @@ fuse_lib_rmdir(fuse_req_t req,
char *path;
struct fuse *f;
const char *name;
struct node *wnode;
node_t *wnode;
name = PARAM(hdr_);
@ -2110,8 +2075,8 @@ fuse_lib_rename(fuse_req_t req,
char *newpath;
const char *oldname;
const char *newname;
struct node *wnode1;
struct node *wnode2;
node_t *wnode1;
node_t *wnode2;
struct fuse_rename_in *arg;
arg = fuse_hdr_arg(hdr_);
@ -2179,7 +2144,7 @@ fuse_do_release(struct fuse *f,
fuse_file_info_t *fi)
{
uint64_t fh;
struct node *node;
node_t *node;
fh = 0;
@ -2277,7 +2242,7 @@ open_auto_cache(struct fuse *f,
const char *path,
fuse_file_info_t *fi)
{
struct node *node;
node_t *node;
fuse_timeouts_t timeout;
pthread_mutex_lock(&f->lock);
@ -2917,11 +2882,11 @@ fuse_lib_copy_file_range(fuse_req_t req_,
}
static
struct lock*
locks_conflict(struct node *node,
const struct lock *lock)
lock_t*
locks_conflict(node_t *node,
const lock_t *lock)
{
struct lock *l;
lock_t *l;
for(l = node->locks; l; l = l->next)
if(l->owner != lock->owner &&
@ -2934,17 +2899,17 @@ locks_conflict(struct node *node,
static
void
delete_lock(struct lock **lockp)
delete_lock(lock_t **lockp)
{
struct lock *l = *lockp;
lock_t *l = *lockp;
*lockp = l->next;
free(l);
}
static
void
insert_lock(struct lock **pos,
struct lock *lock)
insert_lock(lock_t **pos,
lock_t *lock)
{
lock->next = *pos;
*pos = lock;
@ -2952,17 +2917,17 @@ insert_lock(struct lock **pos,
static
int
locks_insert(struct node *node,
struct lock *lock)
locks_insert(node_t *node,
lock_t *lock)
{
struct lock **lp;
struct lock *newl1 = NULL;
struct lock *newl2 = NULL;
lock_t **lp;
lock_t *newl1 = NULL;
lock_t *newl2 = NULL;
if(lock->type != F_UNLCK || lock->start != 0 || lock->end != OFFSET_MAX)
{
newl1 = malloc(sizeof(struct lock));
newl2 = malloc(sizeof(struct lock));
newl1 = malloc(sizeof(lock_t));
newl2 = malloc(sizeof(lock_t));
if(!newl1 || !newl2)
{
@ -2974,7 +2939,7 @@ locks_insert(struct node *node,
for(lp = &node->locks; *lp;)
{
struct lock *l = *lp;
lock_t *l = *lp;
if(l->owner != lock->owner)
goto skip;
@ -3038,9 +3003,9 @@ locks_insert(struct node *node,
static
void
flock_to_lock(struct flock *flock,
struct lock *lock)
lock_t *lock)
{
memset(lock,0,sizeof(struct lock));
memset(lock,0,sizeof(lock_t));
lock->type = flock->l_type;
lock->start = flock->l_start;
lock->end = flock->l_len ? flock->l_start + flock->l_len - 1 : OFFSET_MAX;
@ -3049,7 +3014,7 @@ flock_to_lock(struct flock *flock,
static
void
lock_to_flock(struct lock *lock,
lock_to_flock(lock_t *lock,
struct flock *flock)
{
flock->l_type = lock->type;
@ -3066,7 +3031,7 @@ fuse_flush_common(struct fuse *f,
fuse_file_info_t *fi)
{
struct flock lock;
struct lock l;
lock_t l;
int err;
int errlock;
@ -3194,9 +3159,9 @@ fuse_lib_getlk(fuse_req_t req,
{
int err;
struct fuse *f;
struct lock lk;
lock_t lk;
struct flock flk;
struct lock *conflict;
lock_t *conflict;
fuse_file_info_t ffi = {0};
const struct fuse_lk_in *arg;
@ -3239,7 +3204,7 @@ fuse_lib_setlk(fuse_req_t req,
if(!err)
{
struct fuse *f = req_fuse(req);
struct lock l;
lock_t l;
flock_to_lock(lock,&l);
l.owner = fi->lock_owner;
pthread_mutex_lock(&f->lock);
@ -3639,6 +3604,7 @@ static const struct fuse_opt fuse_lib_opts[] =
FUSE_LIB_OPT("threads=%d", read_thread_count,0),
FUSE_LIB_OPT("read-thread-count=%d", read_thread_count,0),
FUSE_LIB_OPT("process-thread-count=%d", process_thread_count,-1),
FUSE_LIB_OPT("process-thread-queue-depth=%d", process_thread_queue_depth,-1),
FUSE_LIB_OPT("pin-threads=%s", pin_threads, 0),
FUSE_OPT_END
};
@ -3712,7 +3678,7 @@ int
node_table_init(struct node_table *t)
{
t->size = NODE_TABLE_MIN_SIZE;
t->array = (struct node **)calloc(1,sizeof(struct node *) * t->size);
t->array = (node_t **)calloc(1,sizeof(node_t *) * t->size);
if(t->array == NULL)
{
fprintf(stderr,"fuse: memory allocation failed\n");
@ -3724,16 +3690,47 @@ node_table_init(struct node_table *t)
return 0;
}
static
struct fuse*
fuse_get_fuse_obj()
{
static struct fuse f = {0};
return &f;
}
static
void
metrics_log_nodes_info(struct fuse *f_,
FILE *file_)
{
char buf[1024];
char time_str[64];
struct tm tm;
struct timeval tv;
uint64_t sizeof_node;
float node_usage_ratio;
uint64_t node_slab_count;
uint64_t node_avail_objs;
uint64_t node_total_alloc_mem;
gettimeofday(&tv,NULL);
localtime_r(&tv.tv_sec,&tm);
strftime(time_str,sizeof(time_str),"%Y-%m-%dT%H:%M:%S.000%z",&tm);
sizeof_node = sizeof(node_t);
lfmp_t *lfmp;
lfmp = node_lfmp();
lfmp_lock(lfmp);
node_slab_count = fmp_slab_count(&lfmp->fmp);
node_usage_ratio = fmp_slab_usage_ratio(&lfmp->fmp);
node_avail_objs = fmp_avail_objs(&lfmp->fmp);
node_total_alloc_mem = fmp_total_allocated_memory(&lfmp->fmp);
lfmp_unlock(lfmp);
lfmp_lock(&f_->node_fmp);
snprintf(buf,sizeof(buf),
"time: %"PRIu64"\n"
"time: %s\n"
"sizeof(node): %"PRIu64"\n"
"node id_table size: %"PRIu64"\n"
"node id_table usage: %"PRIu64"\n"
@ -3745,22 +3742,29 @@ metrics_log_nodes_info(struct fuse *f_,
"node memory pool usage ratio: %f\n"
"node memory pool avail objs: %"PRIu64"\n"
"node memory pool total allocated memory: %"PRIu64"\n"
"msgbuf bufsize: %"PRIu64"\n"
"msgbuf allocation count: %"PRIu64"\n"
"msgbuf available count: %"PRIu64"\n"
"msgbuf total allocated memory: %"PRIu64"\n"
"\n"
,
(uint64_t)time(NULL),
(uint64_t)sizeof(struct node),
time_str,
sizeof_node,
(uint64_t)f_->id_table.size,
(uint64_t)f_->id_table.use,
(uint64_t)(f_->id_table.size * sizeof(struct node*)),
(uint64_t)(f_->id_table.size * sizeof(node_t*)),
(uint64_t)f_->name_table.size,
(uint64_t)f_->name_table.use,
(uint64_t)(f_->name_table.size * sizeof(struct node*)),
(uint64_t)fmp_slab_count(&f_->node_fmp.fmp),
fmp_slab_usage_ratio(&f_->node_fmp.fmp),
(uint64_t)fmp_avail_objs(&f_->node_fmp.fmp),
(uint64_t)fmp_total_allocated_memory(&f_->node_fmp.fmp)
(uint64_t)(f_->name_table.size * sizeof(node_t*)),
node_slab_count,
node_usage_ratio,
node_avail_objs,
node_total_alloc_mem,
msgbuf_get_bufsize(),
msgbuf_alloc_count(),
msgbuf_avail_count(),
msgbuf_alloc_count() * msgbuf_get_bufsize()
);
lfmp_unlock(&f_->node_fmp);
fputs(buf,file_);
}
@ -3769,12 +3773,20 @@ static
void
metrics_log_nodes_info_to_tmp_dir(struct fuse *f_)
{
int rv;
FILE *file;
char filepath[256];
struct stat st;
char const *mode = "a";
off_t const max_size = (1024 * 1024);
sprintf(filepath,"/tmp/mergerfs.%d.info",getpid());
file = fopen(filepath,"w");
rv = lstat(filepath,&st);
if((rv == 0) && (st.st_size > max_size))
mode = "w";
file = fopen(filepath,mode);
if(file == NULL)
return;
@ -3783,7 +3795,6 @@ metrics_log_nodes_info_to_tmp_dir(struct fuse *f_)
fclose(file);
}
static
void
fuse_malloc_trim(void)
@ -3793,16 +3804,60 @@ fuse_malloc_trim(void)
#endif
}
void
fuse_invalidate_all_nodes()
{
struct fuse *f = fuse_get_fuse_obj();
syslog_info("invalidating file entries");
pthread_mutex_lock(&f->lock);
for(int i = 0; i < f->id_table.size; i++)
{
node_t *node;
for(node = f->id_table.array[i]; node != NULL; node = node->id_next)
{
if(node->nodeid == FUSE_ROOT_ID)
continue;
if(node->parent->nodeid != FUSE_ROOT_ID)
continue;
fuse_lowlevel_notify_inval_entry(f->se->ch,
node->parent->nodeid,
node->name,
strlen(node->name));
}
}
pthread_mutex_unlock(&f->lock);
}
void
fuse_gc()
{
syslog_info("running thorough garbage collection");
node_gc();
msgbuf_gc();
fuse_malloc_trim();
}
void
fuse_gc1()
{
syslog_info("running basic garbage collection");
node_gc1();
msgbuf_gc_10percent();
fuse_malloc_trim();
}
static
void*
fuse_maintenance_loop(void *fuse_)
{
int gc;
int loops;
int sleep_time;
struct fuse *f = (struct fuse*)fuse_;
gc = 0;
loops = 0;
sleep_time = 60;
while(1)
@ -3811,14 +3866,7 @@ fuse_maintenance_loop(void *fuse_)
fuse_prune_remembered_nodes(f);
if((loops % 15) == 0)
{
fuse_malloc_trim();
gc = 1;
}
// Trigger a followup gc if this gc succeeds
if(!f->conf.nogc && gc)
gc = lfmp_gc(&f->node_fmp);
fuse_gc1();
if(g_LOG_METRICS)
metrics_log_nodes_info_to_tmp_dir(f);
@ -3852,14 +3900,14 @@ fuse_new_common(struct fuse_chan *ch,
size_t op_size)
{
struct fuse *f;
struct node *root;
node_t *root;
struct fuse_fs *fs;
struct fuse_lowlevel_ops llop = fuse_path_ops;
if(fuse_create_context_key() == -1)
goto out;
f = (struct fuse *)calloc(1,sizeof(struct fuse));
f = fuse_get_fuse_obj();
if(f == NULL)
{
fprintf(stderr,"fuse: failed to allocate fuse object\n");
@ -3902,10 +3950,9 @@ fuse_new_common(struct fuse_chan *ch,
fuse_mutex_init(&f->lock);
lfmp_init(&f->node_fmp,sizeof(struct node),256);
kv_init(f->remembered_nodes);
root = alloc_node(f);
root = node_alloc();
if(root == NULL)
{
fprintf(stderr,"fuse: memory allocation failed\n");
@ -3933,7 +3980,7 @@ fuse_new_common(struct fuse_chan *ch,
fs->op.destroy = NULL;
free(f->fs);
out_free:
free(f);
// free(f);
out_delete_context_key:
fuse_delete_context_key();
out:
@ -3963,7 +4010,7 @@ fuse_destroy(struct fuse *f)
for(i = 0; i < f->id_table.size; i++)
{
struct node *node;
node_t *node;
for(node = f->id_table.array[i]; node != NULL; node = node->id_next)
{
@ -3978,8 +4025,8 @@ fuse_destroy(struct fuse *f)
for(i = 0; i < f->id_table.size; i++)
{
struct node *node;
struct node *next;
node_t *node;
node_t *next;
for(node = f->id_table.array[i]; node != NULL; node = next)
{
@ -3993,9 +4040,7 @@ fuse_destroy(struct fuse *f)
free(f->name_table.array);
pthread_mutex_destroy(&f->lock);
fuse_session_destroy(f->se);
lfmp_destroy(&f->node_fmp);
kv_destroy(f->remembered_nodes);
free(f);
fuse_delete_context_key();
}
@ -4011,6 +4056,12 @@ fuse_config_process_thread_count(const struct fuse *f_)
return f_->conf.process_thread_count;
}
int
fuse_config_process_thread_queue_depth(const struct fuse *f_)
{
return f_->conf.process_thread_queue_depth;
}
const
char*
fuse_config_pin_threads(const struct fuse *f_)

219
libfuse/lib/fuse_loop_mt.cpp

@ -2,9 +2,11 @@
#define _GNU_SOURCE
#endif
#include "thread_pool.hpp"
#include "bounded_thread_pool.hpp"
#include "cpu.hpp"
#include "fmt/core.h"
#include "scope_guard.hpp"
#include "syslog.h"
#include "fuse_i.h"
#include "fuse_kernel.h"
@ -27,33 +29,6 @@
#include <cassert>
#include <vector>
struct fuse_worker_data_t
{
struct fuse_session *se;
sem_t finished;
std::function<void(fuse_worker_data_t*,fuse_msgbuf_t*)> msgbuf_processor;
std::shared_ptr<ThreadPool> tp;
};
class WorkerCleanup
{
public:
WorkerCleanup(fuse_worker_data_t *wd_)
: _wd(wd_)
{
}
~WorkerCleanup()
{
fuse_session_exit(_wd->se);
sem_post(&_wd->finished);
}
private:
fuse_worker_data_t *_wd;
};
static
bool
retriable_receive_error(const int err_)
@ -77,30 +52,41 @@ fatal_receive_error(const int err_)
}
static
void*
void
handle_receive_error(const int rv_,
fuse_msgbuf_t *msgbuf_)
{
msgbuf_free(msgbuf_);
fprintf(stderr,
"mergerfs: error reading from /dev/fuse - %s (%d)\n",
fmt::print(stderr,
"mergerfs: error reading from /dev/fuse - {} ({})\n",
strerror(-rv_),
-rv_);
}
return NULL;
struct AsyncWorker
{
fuse_session *_se;
sem_t *_finished;
std::shared_ptr<BoundedThreadPool> _process_tp;
AsyncWorker(fuse_session *se_,
sem_t *finished_,
std::shared_ptr<BoundedThreadPool> process_tp_)
: _se(se_),
_finished(finished_),
_process_tp(process_tp_)
{
}
static
void*
fuse_do_work(void *data)
inline
void
operator()() const
{
fuse_worker_data_t *wd = (fuse_worker_data_t*)data;
fuse_session *se = wd->se;
auto &process_msgbuf = wd->msgbuf_processor;
WorkerCleanup workercleanup(wd);
DEFER{ fuse_session_exit(_se); };
DEFER{ sem_post(_finished); };
while(!fuse_session_exited(se))
while(!fuse_session_exited(_se))
{
int rv;
fuse_msgbuf_t *msgbuf;
@ -110,22 +96,70 @@ fuse_do_work(void *data)
do
{
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
rv = se->receive_buf(se,msgbuf);
rv = _se->receive_buf(_se,msgbuf);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
if(rv == 0)
return NULL;
return;
if(retriable_receive_error(rv))
continue;
if(fatal_receive_error(rv))
return handle_receive_error(rv,msgbuf);
} while(false);
process_msgbuf(wd,msgbuf);
_process_tp->enqueue_work([=] {
_se->process_buf(_se,msgbuf);
msgbuf_free(msgbuf);
});
}
}
};
struct SyncWorker
{
fuse_session *_se;
sem_t *_finished;
SyncWorker(fuse_session *se_,
sem_t *finished_)
return NULL;
: _se(se_),
_finished(finished_)
{
}
inline
void
operator()() const
{
DEFER{ fuse_session_exit(_se); };
DEFER{ sem_post(_finished); };
while(!fuse_session_exited(_se))
{
int rv;
fuse_msgbuf_t *msgbuf;
msgbuf = msgbuf_alloc();
do
{
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
rv = _se->receive_buf(_se,msgbuf);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
if(rv == 0)
return;
if(retriable_receive_error(rv))
continue;
if(fatal_receive_error(rv))
return handle_receive_error(rv,msgbuf);
} while(false);
_se->process_buf(_se,msgbuf);
msgbuf_free(msgbuf);
}
}
};
int
fuse_start_thread(pthread_t *thread_id,
void *(*func)(void *),
@ -174,7 +208,8 @@ calculate_thread_count(const int raw_thread_count_)
static
void
calculate_thread_counts(int *read_thread_count_,
int *process_thread_count_)
int *process_thread_count_,
int *process_thread_queue_depth_)
{
if((*read_thread_count_ == -1) && (*process_thread_count_ == -1))
{
@ -190,27 +225,9 @@ calculate_thread_counts(int *read_thread_count_,
if(*process_thread_count_ != -1)
*process_thread_count_ = ::calculate_thread_count(*process_thread_count_);
}
}
static
void
process_msgbuf_sync(fuse_worker_data_t *wd_,
fuse_msgbuf_t *msgbuf_)
{
wd_->se->process_buf(wd_->se,msgbuf_);
msgbuf_free(msgbuf_);
}
static
void
process_msgbuf_async(fuse_worker_data_t *wd_,
fuse_msgbuf_t *msgbuf_)
{
const auto func = [=] {
process_msgbuf_sync(wd_,msgbuf_);
};
wd_->tp->enqueue_work(func);
if(*process_thread_queue_depth_ <= 0)
*process_thread_queue_depth_ = *process_thread_count_;
}
static
@ -425,63 +442,71 @@ pin_threads(const std::vector<pthread_t> read_threads_,
return ::pin_threads_R1PPSP(read_threads_,process_threads_);
}
static
void
wait(fuse_session *se_,
sem_t *finished_sem_)
{
while(!fuse_session_exited(se_))
sem_wait(finished_sem_);
}
int
fuse_session_loop_mt(struct fuse_session *se_,
const int raw_read_thread_count_,
const int raw_process_thread_count_,
const int raw_process_thread_queue_depth_,
const char *pin_threads_type_)
{
int err;
sem_t finished;
int read_thread_count;
int process_thread_count;
fuse_worker_data_t wd = {0};
int process_thread_queue_depth;
std::vector<pthread_t> read_threads;
std::vector<pthread_t> process_threads;
std::unique_ptr<BoundedThreadPool> read_tp;
std::shared_ptr<BoundedThreadPool> process_tp;
sem_init(&finished,0,0);
read_thread_count = raw_read_thread_count_;
process_thread_count = raw_process_thread_count_;
::calculate_thread_counts(&read_thread_count,&process_thread_count);
process_thread_queue_depth = raw_process_thread_queue_depth_;
::calculate_thread_counts(&read_thread_count,
&process_thread_count,
&process_thread_queue_depth);
if(process_thread_count > 0)
process_tp = std::make_shared<BoundedThreadPool>(process_thread_count,process_thread_queue_depth);
read_tp = std::make_unique<BoundedThreadPool>(read_thread_count);
if(process_tp)
{
wd.tp = std::make_shared<ThreadPool>(process_thread_count);
wd.msgbuf_processor = process_msgbuf_async;
process_threads = wd.tp->threads();
for(auto i = 0; i < read_thread_count; i++)
read_tp->enqueue_work(AsyncWorker(se_,&finished,process_tp));
}
else
{
wd.msgbuf_processor = process_msgbuf_sync;
for(auto i = 0; i < read_thread_count; i++)
read_tp->enqueue_work(SyncWorker(se_,&finished));
}
wd.se = se_;
sem_init(&wd.finished,0,0);
err = 0;
for(int i = 0; i < read_thread_count; i++)
{
pthread_t thread_id;
err = fuse_start_thread(&thread_id,fuse_do_work,&wd);
assert(err == 0);
read_threads.push_back(thread_id);
}
if(read_tp)
read_threads = read_tp->threads();
if(process_tp)
process_threads = process_tp->threads();
if(pin_threads_type_ != NULL)
if(pin_threads_type_ != nullptr)
::pin_threads(read_threads,process_threads,pin_threads_type_);
if(!err)
{
/* sem_wait() is interruptible */
while(!fuse_session_exited(se_))
sem_wait(&wd.finished);
syslog_info("read-thread-count=%d; process-thread-count=%d; process-thread-queue-depth=%d",
read_thread_count,
process_thread_count,
process_thread_queue_depth);
for(const auto &thread_id : read_threads)
pthread_cancel(thread_id);
::wait(se_,&finished);
for(const auto &thread_id : read_threads)
pthread_join(thread_id,NULL);
}
sem_destroy(&finished);
sem_destroy(&wd.finished);
return err;
return 0;
}

106
libfuse/lib/fuse_msgbuf.cpp

@ -20,17 +20,20 @@
#include <unistd.h>
#include <atomic>
#include <cstdint>
#include <cstdlib>
#include <mutex>
#include <stack>
#include <vector>
static std::uint32_t g_PAGESIZE = 0;
static std::uint32_t g_BUFSIZE = 0;
static std::atomic_uint64_t g_MSGBUF_ALLOC_COUNT;
static std::mutex g_MUTEX;
static std::stack<fuse_msgbuf_t*> g_MSGBUF_STACK;
static std::vector<fuse_msgbuf_t*> g_MSGBUF_STACK;
static
__attribute__((constructor))
@ -38,6 +41,7 @@ void
msgbuf_constructor()
{
g_PAGESIZE = sysconf(_SC_PAGESIZE);
// +2 because to do O_DIRECT we need to offset the buffer to align
g_BUFSIZE = (g_PAGESIZE * (FUSE_MAX_MAX_PAGES + 2));
}
@ -46,10 +50,10 @@ __attribute__((destructor))
void
msgbuf_destroy()
{
// TODO: cleanup?
}
uint32_t
uint64_t
msgbuf_get_bufsize()
{
return g_BUFSIZE;
@ -58,7 +62,7 @@ msgbuf_get_bufsize()
void
msgbuf_set_bufsize(const uint32_t size_in_pages_)
{
g_BUFSIZE = (size_in_pages_ * g_PAGESIZE);
g_BUFSIZE = ((size_in_pages_ + 1) * g_PAGESIZE);
}
static
@ -85,40 +89,104 @@ msgbuf_alloc()
{
g_MUTEX.unlock();
msgbuf = (fuse_msgbuf_t*)malloc(sizeof(fuse_msgbuf_t));
msgbuf = (fuse_msgbuf_t*)page_aligned_malloc(g_BUFSIZE);
if(msgbuf == NULL)
return NULL;
msgbuf->mem = (char*)page_aligned_malloc(g_BUFSIZE);
if(msgbuf->mem == NULL)
{
free(msgbuf);
return NULL;
}
msgbuf->mem = (((char*)msgbuf) + g_PAGESIZE);
msgbuf->size = g_BUFSIZE;
msgbuf->size = g_BUFSIZE - g_PAGESIZE;
g_MSGBUF_ALLOC_COUNT++;
}
else
{
msgbuf = g_MSGBUF_STACK.top();
g_MSGBUF_STACK.pop();
msgbuf = g_MSGBUF_STACK.back();
g_MSGBUF_STACK.pop_back();
g_MUTEX.unlock();
}
return msgbuf;
}
static
void
msgbuf_destroy(fuse_msgbuf_t *msgbuf_)
{
// free(msgbuf_->mem);
free(msgbuf_);
}
void
msgbuf_free(fuse_msgbuf_t *msgbuf_)
{
std::lock_guard<std::mutex> lck(g_MUTEX);
if(msgbuf_->size != g_BUFSIZE)
if(msgbuf_->size != (g_BUFSIZE - g_PAGESIZE))
{
free(msgbuf_->mem);
free(msgbuf_);
msgbuf_destroy(msgbuf_);
g_MSGBUF_ALLOC_COUNT--;
return;
}
g_MSGBUF_STACK.push(msgbuf_);
g_MSGBUF_STACK.emplace_back(msgbuf_);
}
uint64_t
msgbuf_alloc_count()
{
return g_MSGBUF_ALLOC_COUNT;
}
uint64_t
msgbuf_avail_count()
{
std::lock_guard<std::mutex> lck(g_MUTEX);
return g_MSGBUF_STACK.size();
}
void
msgbuf_gc_10percent()
{
std::vector<fuse_msgbuf_t*> togc;
{
std::size_t size;
std::size_t ten_percent;
std::lock_guard<std::mutex> lck(g_MUTEX);
size = g_MSGBUF_STACK.size();
ten_percent = (size / 10);
for(std::size_t i = 0; i < ten_percent; i++)
{
togc.push_back(g_MSGBUF_STACK.back());
g_MSGBUF_STACK.pop_back();
}
}
for(auto msgbuf : togc)
{
msgbuf_destroy(msgbuf);
g_MSGBUF_ALLOC_COUNT--;
}
}
void
msgbuf_gc()
{
std::vector<fuse_msgbuf_t*> oldstack;
{
std::lock_guard<std::mutex> lck(g_MUTEX);
oldstack.swap(g_MSGBUF_STACK);
}
for(auto msgbuf: oldstack)
{
msgbuf_destroy(msgbuf);
g_MSGBUF_ALLOC_COUNT--;
}
}

9
libfuse/lib/fuse_msgbuf.hpp

@ -24,10 +24,15 @@
EXTERN_C_BEGIN
void msgbuf_set_bufsize(const uint32_t size);
uint32_t msgbuf_get_bufsize();
uint64_t msgbuf_get_bufsize();
fuse_msgbuf_t* msgbuf_alloc();
fuse_msgbuf_t* msgbuf_alloc_memonly();
void msgbuf_free(fuse_msgbuf_t *msgbuf);
void msgbuf_gc();
void msgbuf_gc_10percent();
uint64_t msgbuf_alloc_count();
uint64_t msgbuf_avail_count();
EXTERN_C_END

1
libfuse/lib/fuse_mt.c

@ -29,6 +29,7 @@ fuse_loop_mt(struct fuse *f)
res = fuse_session_loop_mt(fuse_get_session(f),
fuse_config_read_thread_count(f),
fuse_config_process_thread_count(f),
fuse_config_process_thread_queue_depth(f),
fuse_config_pin_threads(f));
fuse_stop_maintenance_thread(f);

14
libfuse/lib/fuse_signals.c

@ -14,7 +14,9 @@
static struct fuse_session *fuse_instance;
static void exit_handler(int sig)
static
void
exit_handler(int sig)
{
(void)sig;
if (fuse_instance)
@ -46,10 +48,10 @@ static int set_one_signal_handler(int sig, void (*handler)(int), int remove)
int fuse_set_signal_handlers(struct fuse_session *se)
{
if (set_one_signal_handler(SIGHUP, exit_handler, 0) == -1 ||
set_one_signal_handler(SIGINT, exit_handler, 0) == -1 ||
set_one_signal_handler(SIGTERM, exit_handler, 0) == -1 ||
set_one_signal_handler(SIGPIPE, SIG_IGN, 0) == -1)
if((set_one_signal_handler(SIGINT, exit_handler, 0) == -1) ||
(set_one_signal_handler(SIGTERM, exit_handler, 0) == -1) ||
(set_one_signal_handler(SIGQUIT, exit_handler, 0) == -1) ||
(set_one_signal_handler(SIGPIPE, SIG_IGN, 0) == -1))
return -1;
fuse_instance = se;
@ -64,8 +66,8 @@ void fuse_remove_signal_handlers(struct fuse_session *se)
else
fuse_instance = NULL;
set_one_signal_handler(SIGHUP, exit_handler, 1);
set_one_signal_handler(SIGINT, exit_handler, 1);
set_one_signal_handler(SIGTERM, exit_handler, 1);
set_one_signal_handler(SIGQUIT, exit_handler, 1);
set_one_signal_handler(SIGPIPE, SIG_IGN, 1);
}

13
libfuse/lib/lock.h

@ -0,0 +1,13 @@
#include <stdint.h>
#include <sys/types.h>
typedef struct lock_s lock_t;
struct lock_s
{
int type;
off_t start;
off_t end;
pid_t pid;
uint64_t owner;
lock_t *next;
};

41
libfuse/lib/make_unique.hpp

@ -0,0 +1,41 @@
#include <cstddef>
#include <memory>
#include <type_traits>
#include <utility>
namespace std
{
template<class T> struct _Unique_if
{
typedef unique_ptr<T> _Single_object;
};
template<class T> struct _Unique_if<T[]>
{
typedef unique_ptr<T[]> _Unknown_bound;
};
template<class T, size_t N> struct _Unique_if<T[N]>
{
typedef void _Known_bound;
};
template<class T, class... Args>
typename _Unique_if<T>::_Single_object
make_unique(Args&&... args)
{
return unique_ptr<T>(new T(std::forward<Args>(args)...));
}
template<class T>
typename _Unique_if<T>::_Unknown_bound
make_unique(size_t n)
{
typedef typename remove_extent<T>::type U;
return unique_ptr<T>(new U[n]());
}
template<class T, class... Args>
typename _Unique_if<T>::_Known_bound
make_unique(Args&&...) = delete;
}

60
libfuse/lib/node.c

@ -0,0 +1,60 @@
#include "node.h"
#include "lfmp.h"
static lfmp_t g_NODE_FMP;
static
__attribute__((constructor))
void
node_constructor()
{
lfmp_init(&g_NODE_FMP,sizeof(node_t),256);
}
static
__attribute__((destructor))
void
node_destructor()
{
lfmp_destroy(&g_NODE_FMP);
}
node_t *
node_alloc()
{
return lfmp_calloc(&g_NODE_FMP);
}
void
node_free(node_t *node_)
{
lfmp_free(&g_NODE_FMP,node_);
}
int
node_gc1()
{
return lfmp_gc(&g_NODE_FMP);
}
void
node_gc()
{
int rv;
int fails;
fails = 0;
do
{
rv = node_gc1();
if(rv == 0)
fails++;
} while(rv || (fails < 3));
}
lfmp_t*
node_lfmp()
{
return &g_NODE_FMP;
}

30
libfuse/lib/node.h

@ -0,0 +1,30 @@
#include "lock.h"
#include "lfmp.h"
typedef struct node_s node_t;
struct node_s
{
node_t *name_next;
node_t *id_next;
uint64_t nodeid;
char *name;
node_t *parent;
uint64_t nlookup;
uint32_t refctr;
uint32_t open_count;
uint64_t hidden_fh;
int32_t treelock;
lock_t *locks;
uint32_t stat_crc32b;
uint8_t is_stat_cache_valid:1;
};
node_t *node_alloc();
void node_free(node_t*);
int node_gc1();
void node_gc();
lfmp_t *node_lfmp();

369
libfuse/lib/scope_guard.hpp

@ -0,0 +1,369 @@
// _____ _____ _ _____
// / ____| / ____| | | / ____|_ _
// | (___ ___ ___ _ __ ___ | | __ _ _ __ _ _ __ __| | | | _| |_ _| |_
// \___ \ / __/ _ \| '_ \ / _ \ | | |_ | | | |/ _` | '__/ _` | | | |_ _|_ _|
// ____) | (_| (_) | |_) | __/ | |__| | |_| | (_| | | | (_| | | |____|_| |_|
// |_____/ \___\___/| .__/ \___| \_____|\__,_|\__,_|_| \__,_| \_____|
// | | https://github.com/Neargye/scope_guard
// |_| version 0.9.1
//
// Licensed under the MIT License <http://opensource.org/licenses/MIT>.
// SPDX-License-Identifier: MIT
// Copyright (c) 2018 - 2021 Daniil Goncharov <neargye@gmail.com>.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
#ifndef NEARGYE_SCOPE_GUARD_HPP
#define NEARGYE_SCOPE_GUARD_HPP
#define SCOPE_GUARD_VERSION_MAJOR 0
#define SCOPE_GUARD_VERSION_MINOR 9
#define SCOPE_GUARD_VERSION_PATCH 1
#include <type_traits>
#if (defined(_MSC_VER) && _MSC_VER >= 1900) || ((defined(__clang__) || defined(__GNUC__)) && __cplusplus >= 201700L)
#include <exception>
#endif
// scope_guard throwable settings:
// SCOPE_GUARD_NO_THROW_CONSTRUCTIBLE requires nothrow constructible action.
// SCOPE_GUARD_MAY_THROW_ACTION action may throw exceptions.
// SCOPE_GUARD_NO_THROW_ACTION requires noexcept action.
// SCOPE_GUARD_SUPPRESS_THROW_ACTION exceptions during action will be suppressed.
// SCOPE_GUARD_CATCH_HANDLER exceptions handler. If SCOPE_GUARD_SUPPRESS_THROW_ACTIONS is not defined, it will do nothing.
#if !defined(SCOPE_GUARD_MAY_THROW_ACTION) && !defined(SCOPE_GUARD_NO_THROW_ACTION) && !defined(SCOPE_GUARD_SUPPRESS_THROW_ACTION)
# define SCOPE_GUARD_MAY_THROW_ACTION
#elif (defined(SCOPE_GUARD_MAY_THROW_ACTION) + defined(SCOPE_GUARD_NO_THROW_ACTION) + defined(SCOPE_GUARD_SUPPRESS_THROW_ACTION)) > 1
# error Only one of SCOPE_GUARD_MAY_THROW_ACTION and SCOPE_GUARD_NO_THROW_ACTION and SCOPE_GUARD_SUPPRESS_THROW_ACTION may be defined.
#endif
#if !defined(SCOPE_GUARD_CATCH_HANDLER)
# define SCOPE_GUARD_CATCH_HANDLER /* Suppress exception.*/
#endif
namespace scope_guard {
namespace detail {
#if defined(SCOPE_GUARD_SUPPRESS_THROW_ACTION) && (defined(__cpp_exceptions) || defined(__EXCEPTIONS) || (_HAS_EXCEPTIONS))
# define NEARGYE_NOEXCEPT(...) noexcept
# define NEARGYE_TRY try {
# define NEARGYE_CATCH } catch (...) { SCOPE_GUARD_CATCH_HANDLER }
#else
# define NEARGYE_NOEXCEPT(...) noexcept(__VA_ARGS__)
# define NEARGYE_TRY
# define NEARGYE_CATCH
#endif
#define NEARGYE_MOV(...) static_cast<typename std::remove_reference<decltype(__VA_ARGS__)>::type&&>(__VA_ARGS__)
#define NEARGYE_FWD(...) static_cast<decltype(__VA_ARGS__)&&>(__VA_ARGS__)
// NEARGYE_NODISCARD encourages the compiler to issue a warning if the return value is discarded.
#if !defined(NEARGYE_NODISCARD)
# if defined(__clang__)
# if (__clang_major__ * 10 + __clang_minor__) >= 39 && __cplusplus >= 201703L
# define NEARGYE_NODISCARD [[nodiscard]]
# else
# define NEARGYE_NODISCARD __attribute__((__warn_unused_result__))
# endif
# elif defined(__GNUC__)
# if __GNUC__ >= 7 && __cplusplus >= 201703L
# define NEARGYE_NODISCARD [[nodiscard]]
# else
# define NEARGYE_NODISCARD __attribute__((__warn_unused_result__))
# endif
# elif defined(_MSC_VER)
# if _MSC_VER >= 1911 && defined(_MSVC_LANG) && _MSVC_LANG >= 201703L
# define NEARGYE_NODISCARD [[nodiscard]]
# elif defined(_Check_return_)
# define NEARGYE_NODISCARD _Check_return_
# else
# define NEARGYE_NODISCARD
# endif
# else
# define NEARGYE_NODISCARD
# endif
#endif
#if defined(_MSC_VER) && _MSC_VER < 1900
inline int uncaught_exceptions() noexcept {
return *(reinterpret_cast<int*>(static_cast<char*>(static_cast<void*>(_getptd())) + (sizeof(void*) == 8 ? 0x100 : 0x90)));
}
#elif (defined(__clang__) || defined(__GNUC__)) && __cplusplus < 201700L
struct __cxa_eh_globals;
extern "C" __cxa_eh_globals* __cxa_get_globals() noexcept;
inline int uncaught_exceptions() noexcept {
return static_cast<int>(*(reinterpret_cast<unsigned int*>(static_cast<char*>(static_cast<void*>(__cxa_get_globals())) + sizeof(void*))));
}
#else
inline int uncaught_exceptions() noexcept {
return std::uncaught_exceptions();
}
#endif
class on_exit_policy {
bool execute_;
public:
explicit on_exit_policy(bool execute) noexcept : execute_{execute} {}
void dismiss() noexcept {
execute_ = false;
}
bool should_execute() const noexcept {
return execute_;
}
};
class on_fail_policy {
int ec_;
public:
explicit on_fail_policy(bool execute) noexcept : ec_{execute ? uncaught_exceptions() : -1} {}
void dismiss() noexcept {
ec_ = -1;
}
bool should_execute() const noexcept {
return ec_ != -1 && ec_ < uncaught_exceptions();
}
};
class on_success_policy {
int ec_;
public:
explicit on_success_policy(bool execute) noexcept : ec_{execute ? uncaught_exceptions() : -1} {}
void dismiss() noexcept {
ec_ = -1;
}
bool should_execute() const noexcept {
return ec_ != -1 && ec_ >= uncaught_exceptions();
}
};
template <typename T, typename = void>
struct is_noarg_returns_void_action
: std::false_type {};
template <typename T>
struct is_noarg_returns_void_action<T, decltype((std::declval<T>())())>
: std::true_type {};
template <typename T, bool = is_noarg_returns_void_action<T>::value>
struct is_nothrow_invocable_action
: std::false_type {};
template <typename T>
struct is_nothrow_invocable_action<T, true>
: std::integral_constant<bool, noexcept((std::declval<T>())())> {};
template <typename F, typename P>
class scope_guard {
using A = typename std::decay<F>::type;
static_assert(is_noarg_returns_void_action<A>::value,
"scope_guard requires no-argument action, that returns void.");
static_assert(std::is_same<P, on_exit_policy>::value || std::is_same<P, on_fail_policy>::value || std::is_same<P, on_success_policy>::value,
"scope_guard requires on_exit_policy, on_fail_policy or on_success_policy.");
#if defined(SCOPE_GUARD_NO_THROW_ACTION)
static_assert(is_nothrow_invocable_action<A>::value,
"scope_guard requires noexcept invocable action.");
#endif
#if defined(SCOPE_GUARD_NO_THROW_CONSTRUCTIBLE)
static_assert(std::is_nothrow_move_constructible<A>::value,
"scope_guard requires nothrow constructible action.");
#endif
P policy_;
A action_;
void* operator new(std::size_t) = delete;
void operator delete(void*) = delete;
public:
scope_guard() = delete;
scope_guard(const scope_guard&) = delete;
scope_guard& operator=(const scope_guard&) = delete;
scope_guard& operator=(scope_guard&&) = delete;
scope_guard(scope_guard&& other) noexcept(std::is_nothrow_move_constructible<A>::value)
: policy_{false},
action_{NEARGYE_MOV(other.action_)} {
policy_ = NEARGYE_MOV(other.policy_);
other.policy_.dismiss();
}
scope_guard(const A& action) = delete;
scope_guard(A& action) = delete;
explicit scope_guard(A&& action) noexcept(std::is_nothrow_move_constructible<A>::value)
: policy_{true},
action_{NEARGYE_MOV(action)} {}
void dismiss() noexcept {
policy_.dismiss();
}
~scope_guard() NEARGYE_NOEXCEPT(is_nothrow_invocable_action<A>::value) {
if (policy_.should_execute()) {
NEARGYE_TRY
action_();
NEARGYE_CATCH
}
}
};
template <typename F>
using scope_exit = scope_guard<F, on_exit_policy>;
template <typename F, typename std::enable_if<is_noarg_returns_void_action<F>::value, int>::type = 0>
NEARGYE_NODISCARD scope_exit<F> make_scope_exit(F&& action) noexcept(noexcept(scope_exit<F>{NEARGYE_FWD(action)})) {
return scope_exit<F>{NEARGYE_FWD(action)};
}
template <typename F>
using scope_fail = scope_guard<F, on_fail_policy>;
template <typename F, typename std::enable_if<is_noarg_returns_void_action<F>::value, int>::type = 0>
NEARGYE_NODISCARD scope_fail<F> make_scope_fail(F&& action) noexcept(noexcept(scope_fail<F>{NEARGYE_FWD(action)})) {
return scope_fail<F>{NEARGYE_FWD(action)};
}
template <typename F>
using scope_success = scope_guard<F, on_success_policy>;
template <typename F, typename std::enable_if<is_noarg_returns_void_action<F>::value, int>::type = 0>
NEARGYE_NODISCARD scope_success<F> make_scope_success(F&& action) noexcept(noexcept(scope_success<F>{NEARGYE_FWD(action)})) {
return scope_success<F>{NEARGYE_FWD(action)};
}
struct scope_exit_tag {};
template <typename F, typename std::enable_if<is_noarg_returns_void_action<F>::value, int>::type = 0>
scope_exit<F> operator<<(scope_exit_tag, F&& action) noexcept(noexcept(scope_exit<F>{NEARGYE_FWD(action)})) {
return scope_exit<F>{NEARGYE_FWD(action)};
}
struct scope_fail_tag {};
template <typename F, typename std::enable_if<is_noarg_returns_void_action<F>::value, int>::type = 0>
scope_fail<F> operator<<(scope_fail_tag, F&& action) noexcept(noexcept(scope_fail<F>{NEARGYE_FWD(action)})) {
return scope_fail<F>{NEARGYE_FWD(action)};
}
struct scope_success_tag {};
template <typename F, typename std::enable_if<is_noarg_returns_void_action<F>::value, int>::type = 0>
scope_success<F> operator<<(scope_success_tag, F&& action) noexcept(noexcept(scope_success<F>{NEARGYE_FWD(action)})) {
return scope_success<F>{NEARGYE_FWD(action)};
}
#undef NEARGYE_MOV
#undef NEARGYE_FWD
#undef NEARGYE_NOEXCEPT
#undef NEARGYE_TRY
#undef NEARGYE_CATCH
#undef NEARGYE_NODISCARD
} // namespace scope_guard::detail
using detail::make_scope_exit;
using detail::make_scope_fail;
using detail::make_scope_success;
} // namespace scope_guard
// NEARGYE_MAYBE_UNUSED suppresses compiler warnings on unused entities, if any.
#if !defined(NEARGYE_MAYBE_UNUSED)
# if defined(__clang__)
# if (__clang_major__ * 10 + __clang_minor__) >= 39 && __cplusplus >= 201703L
# define NEARGYE_MAYBE_UNUSED [[maybe_unused]]
# else
# define NEARGYE_MAYBE_UNUSED __attribute__((__unused__))
# endif
# elif defined(__GNUC__)
# if __GNUC__ >= 7 && __cplusplus >= 201703L
# define NEARGYE_MAYBE_UNUSED [[maybe_unused]]
# else
# define NEARGYE_MAYBE_UNUSED __attribute__((__unused__))
# endif
# elif defined(_MSC_VER)
# if _MSC_VER >= 1911 && defined(_MSVC_LANG) && _MSVC_LANG >= 201703L
# define NEARGYE_MAYBE_UNUSED [[maybe_unused]]
# else
# define NEARGYE_MAYBE_UNUSED __pragma(warning(suppress : 4100 4101 4189))
# endif
# else
# define NEARGYE_MAYBE_UNUSED
# endif
#endif
#if !defined(NEARGYE_STR_CONCAT)
# define NEARGYE_STR_CONCAT_(s1, s2) s1##s2
# define NEARGYE_STR_CONCAT(s1, s2) NEARGYE_STR_CONCAT_(s1, s2)
#endif
#if !defined(NEARGYE_COUNTER)
# if defined(__COUNTER__)
# define NEARGYE_COUNTER __COUNTER__
# elif defined(__LINE__)
# define NEARGYE_COUNTER __LINE__
# endif
#endif
#if defined(SCOPE_GUARD_NO_THROW_ACTION)
# define NEARGYE_MAKE_SCOPE_GUARD_ACTION [&]() noexcept -> void
#else
# define NEARGYE_MAKE_SCOPE_GUARD_ACTION [&]() -> void
#endif
#define NEARGYE_MAKE_SCOPE_EXIT ::scope_guard::detail::scope_exit_tag{} << NEARGYE_MAKE_SCOPE_GUARD_ACTION
#define NEARGYE_MAKE_SCOPE_FAIL ::scope_guard::detail::scope_fail_tag{} << NEARGYE_MAKE_SCOPE_GUARD_ACTION
#define NEARGYE_MAKE_SCOPE_SUCCESS ::scope_guard::detail::scope_success_tag{} << NEARGYE_MAKE_SCOPE_GUARD_ACTION
#define NEARGYE_SCOPE_GUARD_WITH_(g, i) for (int i = 1; i--; g)
#define NEARGYE_SCOPE_GUARD_WITH(g) NEARGYE_SCOPE_GUARD_WITH_(g, NEARGYE_STR_CONCAT(NEARGYE_INTERNAL_OBJECT_, NEARGYE_COUNTER))
// SCOPE_EXIT executing action on scope exit.
#define MAKE_SCOPE_EXIT(name) auto name = NEARGYE_MAKE_SCOPE_EXIT
#define SCOPE_EXIT NEARGYE_MAYBE_UNUSED const MAKE_SCOPE_EXIT(NEARGYE_STR_CONCAT(NEARGYE_SCOPE_EXIT_, NEARGYE_COUNTER))
#define WITH_SCOPE_EXIT(guard) NEARGYE_SCOPE_GUARD_WITH(NEARGYE_MAKE_SCOPE_EXIT{ guard })
// SCOPE_FAIL executing action on scope exit when an exception has been thrown before scope exit.
#define MAKE_SCOPE_FAIL(name) auto name = NEARGYE_MAKE_SCOPE_FAIL
#define SCOPE_FAIL NEARGYE_MAYBE_UNUSED const MAKE_SCOPE_FAIL(NEARGYE_STR_CONCAT(NEARGYE_SCOPE_FAIL_, NEARGYE_COUNTER))
#define WITH_SCOPE_FAIL(guard) NEARGYE_SCOPE_GUARD_WITH(NEARGYE_MAKE_SCOPE_FAIL{ guard })
// SCOPE_SUCCESS executing action on scope exit when no exceptions have been thrown before scope exit.
#define MAKE_SCOPE_SUCCESS(name) auto name = NEARGYE_MAKE_SCOPE_SUCCESS
#define SCOPE_SUCCESS NEARGYE_MAYBE_UNUSED const MAKE_SCOPE_SUCCESS(NEARGYE_STR_CONCAT(NEARGYE_SCOPE_SUCCESS_, NEARGYE_COUNTER))
#define WITH_SCOPE_SUCCESS(guard) NEARGYE_SCOPE_GUARD_WITH(NEARGYE_MAKE_SCOPE_SUCCESS{ guard })
// DEFER executing action on scope exit.
#define MAKE_DEFER(name) MAKE_SCOPE_EXIT(name)
#define DEFER SCOPE_EXIT
#define WITH_DEFER(guard) WITH_SCOPE_EXIT(guard)
#endif // NEARGYE_SCOPE_GUARD_HPP

110
libfuse/lib/syslog.c

@ -0,0 +1,110 @@
/*
ISC License
Copyright (c) 2023, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include <stdarg.h>
#include <syslog.h>
#include <stdbool.h>
static bool g_SYSLOG_ENABLED = false;
void
syslog_open()
{
const char *ident = "mergerfs";
const int option = (LOG_CONS|LOG_PID);
const int facility = LOG_USER;
openlog(ident,option,facility);
g_SYSLOG_ENABLED = true;
}
void
syslog_close()
{
closelog();
g_SYSLOG_ENABLED = false;
}
static
void
syslog_vlog(const int priority_,
const char *format_,
va_list valist_)
{
if(g_SYSLOG_ENABLED == false)
return;
vsyslog(priority_,format_,valist_);
}
void
syslog_log(const int priority_,
const char *format_,
...)
{
va_list valist;
va_start(valist,format_);
syslog_vlog(priority_,format_,valist);
va_end(valist);
}
void
syslog_info(const char *format_,
...)
{
va_list valist;
va_start(valist,format_);
syslog_vlog(LOG_INFO,format_,valist);
va_end(valist);
}
void
syslog_notice(const char *format_,
...)
{
va_list valist;
va_start(valist,format_);
syslog_vlog(LOG_NOTICE,format_,valist);
va_end(valist);
}
void
syslog_warning(const char *format_,
...)
{
va_list valist;
va_start(valist,format_);
syslog_vlog(LOG_WARNING,format_,valist);
va_end(valist);
}
void
syslog_error(const char *format_,
...)
{
va_list valist;
va_start(valist,format_);
syslog_vlog(LOG_ERR,format_,valist);
va_end(valist);
}

30
libfuse/lib/syslog.h

@ -0,0 +1,30 @@
/*
ISC License
Copyright (c) 2023, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include <syslog.h>
void syslog_open();
void syslog_log(const int priority, const char *format, ...);
void syslog_info(const char *format, ...);
void syslog_notice(const char *format, ...);
void syslog_warning(const char *format, ...);
void syslog_error(const char *format, ...);
void syslog_close();

1
libfuse/lib/thread_pool.hpp

@ -1,6 +1,7 @@
#pragma once
#include "unbounded_queue.hpp"
#include "bounded_queue.hpp"
#include <tuple>
#include <atomic>

49
man/mergerfs.1

@ -311,6 +311,13 @@ reading FUSE messages which are dispatched to process threads.
-1 means disabled otherwise acts like \f[C]read-thread-count\f[R].
(default: -1)
.IP \[bu] 2
\f[B]process-thread-queue-depth=INT\f[R]: Sets the number of requests
any single process thread can have queued up at one time.
Meaning the total memory usage of the queues is queue depth multiplied
by the number of process threads plus read thread count.
-1 sets the depth to the same as the process thread count.
(default: -1)
.IP \[bu] 2
\f[B]pin-threads=STR\f[R]: Selects a strategy to pin threads to CPUs
(default: unset)
.IP \[bu] 2
@ -1480,7 +1487,8 @@ Before mounting over top the mount point with the new instance of
mergerfs issue: \f[C]umount -l <mergerfs_mountpoint>\f[R].
Or you can let mergerfs do it by setting the option
\f[C]lazy-umount-mountpoint=true\f[R].
.SH RUNTIME CONFIG
.SH RUNTIME INTERFACES
.SS RUNTIME CONFIG
.SS .mergerfs pseudo file
.IP
.nf
@ -1596,6 +1604,45 @@ given the getattr policy
.IP \[bu] 2
\f[B]user.mergerfs.allpaths\f[R]: a NUL (`\[rs]0') separated list of
full paths to all files found
.SS SIGNALS
.IP \[bu] 2
USR1: This will cause mergerfs to send invalidation notifications to the
kernel for all files.
This will cause all unused files to be released from memory.
.IP \[bu] 2
USR2: Trigger a general cleanup of currently unused memory.
A more thorough version of what happens every \[ti]15 minutes.
.SS IOCTLS
.PP
Found in \f[C]fuse_ioctl.cpp\f[R]:
.IP
.nf
\f[C]
typedef char IOCTL_BUF[4096];
#define IOCTL_APP_TYPE 0xDF
#define IOCTL_FILE_INFO _IOWR(IOCTL_APP_TYPE,0,IOCTL_BUF)
#define IOCTL_GC _IO(IOCTL_APP_TYPE,1)
#define IOCTL_GC1 _IO(IOCTL_APP_TYPE,2)
#define IOCTL_INVALIDATE_ALL_NODES _IO(IOCTL_APP_TYPE,3)
\f[R]
.fi
.IP \[bu] 2
IOCTL_FILE_INFO: Same as the \[lq]file / directory xattrs\[rq] mentioned
above.
Use a buffer size of 4096 bytes.
Pass in a string of \[lq]basepath\[rq], \[lq]relpath\[rq],
\[lq]fullpath\[rq], or \[lq]allpaths\[rq].
Receive details in same buffer.
.IP \[bu] 2
IOCTL_GC: Triggers a thorough garbage collection of excess memory.
Same as SIGUSR2.
.IP \[bu] 2
IOCTL_GC1: Triggers a simple garbage collection of excess memory.
Same as what happens every 15 minutes normally.
.IP \[bu] 2
IOCTL_INVALIDATE_ALL_NODES: Same as SIGUSR1.
Send invalidation notifications to the kernel for all files causing
unused files to be released from memory.
.SH TOOLING
.IP \[bu] 2
https://github.com/trapexit/mergerfs-tools

3
src/config.cpp

@ -65,6 +65,7 @@ namespace l
IFERT("pid");
IFERT("pin-threads");
IFERT("process-thread-count");
IFERT("process-thread-queue-depth");
IFERT("read-thread-count");
IFERT("readdirplus");
IFERT("scheduling-priority");
@ -123,6 +124,7 @@ Config::Config()
symlinkify_timeout(3600),
fuse_read_thread_count(0),
fuse_process_thread_count(-1),
fuse_process_thread_queue_depth(0),
fuse_pin_threads("false"),
version(MERGERFS_VERSION),
writeback_cache(false),
@ -200,6 +202,7 @@ Config::Config()
_map["threads"] = &fuse_read_thread_count;
_map["read-thread-count"] = &fuse_read_thread_count;
_map["process-thread-count"] = &fuse_process_thread_count;
_map["process-thread-queue-depth"] = &fuse_process_thread_queue_depth;
_map["version"] = &version;
_map["xattr"] = &xattr;
}

1
src/config.hpp

@ -147,6 +147,7 @@ public:
ConfigUINT64 symlinkify_timeout;
ConfigINT fuse_read_thread_count;
ConfigINT fuse_process_thread_count;
ConfigINT fuse_process_thread_queue_depth;
ConfigSTR fuse_pin_threads;
ConfigSTR version;
ConfigBOOL writeback_cache;

12
src/fuse_ioctl.cpp

@ -43,6 +43,9 @@ using std::vector;
typedef char IOCTL_BUF[4096];
#define IOCTL_APP_TYPE 0xDF
#define IOCTL_FILE_INFO _IOWR(IOCTL_APP_TYPE,0,IOCTL_BUF)
#define IOCTL_GC _IO(IOCTL_APP_TYPE,1)
#define IOCTL_GC1 _IO(IOCTL_APP_TYPE,2)
#define IOCTL_INVALIDATE_ALL_NODES _IO(IOCTL_APP_TYPE,3)
// From linux/btrfs.h
#define BTRFS_IOCTL_MAGIC 0x94
@ -334,6 +337,15 @@ namespace l
{
case IOCTL_FILE_INFO:
return l::file_info(ffi_,data_);
case IOCTL_GC:
fuse_gc();
return 0;
case IOCTL_GC1:
fuse_gc1();
return 0;
case IOCTL_INVALIDATE_ALL_NODES:
fuse_invalidate_all_nodes();
return 0;
}
return -ENOTTY;

26
src/mergerfs.cpp

@ -76,6 +76,7 @@
#include "fuse.h"
#include <csignal>
#include <cstdlib>
#include <iostream>
@ -195,6 +196,30 @@ namespace l
}
}
static
void
usr1_signal_handler(int signal_)
{
syslog_info("Received SIGUSR1 - invalidating all nodes");
fuse_invalidate_all_nodes();
}
static
void
usr2_signal_handler(int signal_)
{
syslog_info("Received SIGUSR2 - triggering thorough gc");
fuse_gc();
}
static
void
setup_signal_handlers()
{
std::signal(SIGUSR1,l::usr1_signal_handler);
std::signal(SIGUSR2,l::usr2_signal_handler);
}
int
main(const int argc_,
char **argv_)
@ -223,6 +248,7 @@ namespace l
l::wait_for_mount(cfg);
l::setup_resources(cfg->scheduling_priority);
l::setup_signal_handlers();
l::get_fuse_operations(ops,cfg->nullrw);
if(cfg->lazy_umount_mountpoint)

1
src/option_parser.cpp

@ -82,6 +82,7 @@ set_fuse_threads(Config::Write &cfg_,
{
set_kv_option("read-thread-count",cfg_->fuse_read_thread_count.to_string(),args_);
set_kv_option("process-thread-count",cfg_->fuse_process_thread_count.to_string(),args_);
set_kv_option("process-thread-queue-depth",cfg_->fuse_process_thread_queue_depth.to_string(),args_);
set_kv_option("pin-threads",cfg_->fuse_pin_threads.to_string(),args_);
}

Loading…
Cancel
Save