Browse Source

Add async message processing

pull/1118/head
Antonio SJ Musumeci 2 years ago
parent
commit
24423b8d2a
  1. 199
      README.md
  2. 27
      libfuse/Makefile
  3. 9
      libfuse/include/fuse.h
  4. 14
      libfuse/include/fuse_lowlevel.h
  5. 12
      libfuse/include/fuse_msgbuf.h
  6. 165
      libfuse/lib/bounded_queue.hpp
  7. 72
      libfuse/lib/fuse.c
  8. 19
      libfuse/lib/fuse_i.h
  9. 15
      libfuse/lib/fuse_ll.hpp
  10. 231
      libfuse/lib/fuse_loop_mt.c
  11. 266
      libfuse/lib/fuse_loop_mt.cpp
  12. 373
      libfuse/lib/fuse_lowlevel.c
  13. 123
      libfuse/lib/fuse_msgbuf.cpp
  14. 29
      libfuse/lib/fuse_msgbuf.hpp
  15. 3
      libfuse/lib/fuse_mt.c
  16. 102
      libfuse/lib/fuse_session.c
  17. 112
      libfuse/lib/thread_pool.hpp
  18. 161
      libfuse/lib/unbounded_queue.hpp
  19. 7
      src/config.cpp
  20. 3
      src/config.hpp
  21. 13
      src/option_parser.cpp
  22. 14
      tools/create-branches

199
README.md

@ -1,6 +1,6 @@
% mergerfs(1) mergerfs user manual % mergerfs(1) mergerfs user manual
% Antonio SJ Musumeci <trapexit@spawn.link> % Antonio SJ Musumeci <trapexit@spawn.link>
% 2022-05-23
% 2023-01-16
# NAME # NAME
@ -14,7 +14,9 @@ mergerfs -o&lt;options&gt; &lt;branches&gt; &lt;mountpoint&gt;
# DESCRIPTION # DESCRIPTION
**mergerfs** is a union filesystem geared towards simplifying storage and management of files across numerous commodity storage devices. It is similar to **mhddfs**, **unionfs**, and **aufs**.
**mergerfs** is a union filesystem geared towards simplifying storage
and management of files across numerous commodity storage devices. It
is similar to **mhddfs**, **unionfs**, and **aufs**.
# FEATURES # FEATURES
@ -36,7 +38,10 @@ mergerfs -o&lt;options&gt; &lt;branches&gt; &lt;mountpoint&gt;
# HOW IT WORKS # HOW IT WORKS
mergerfs logically merges multiple paths together. Think a union of sets. The file/s or directory/s acted on or presented through mergerfs are based on the policy chosen for that particular action. Read more about policies below.
mergerfs logically merges multiple paths together. Think a union of
sets. The file/s or directory/s acted on or presented through mergerfs
are based on the policy chosen for that particular action. Read more
about policies below.
``` ```
A + B = C A + B = C
@ -59,7 +64,12 @@ A + B = C
+-- file6 +-- file6
``` ```
mergerfs does **NOT** support the copy-on-write (CoW) or whiteout behaviors found in **aufs** and **overlayfs**. You can **not** mount a read-only filesystem and write to it. However, mergerfs will ignore read-only drives when creating new files so you can mix read-write and read-only drives. It also does **NOT** split data across drives. It is not RAID0 / striping. It is simply a union of other filesystems.
mergerfs does **NOT** support the copy-on-write (CoW) or whiteout
behaviors found in **aufs** and **overlayfs**. You can **not** mount a
read-only filesystem and write to it. However, mergerfs will ignore
read-only drives when creating new files so you can mix read-write and
read-only drives. It also does **NOT** split data across drives. It is
not RAID0 / striping. It is simply a union of other filesystems.
# TERMINOLOGY # TERMINOLOGY
@ -75,7 +85,8 @@ mergerfs does **NOT** support the copy-on-write (CoW) or whiteout behaviors foun
# BASIC SETUP # BASIC SETUP
If you don't already know that you have a special use case then just start with one of the following option sets.
If you don't already know that you have a special use case then just
start with one of the following option sets.
#### You need `mmap` (used by rtorrent and many sqlite3 base software) #### You need `mmap` (used by rtorrent and many sqlite3 base software)
@ -95,50 +106,142 @@ These options are the same regardless of whether you use them with the `mergerfs
### mount options ### mount options
* **config**: Path to a config file. Same arguments as below in key=val / ini style format.
* **config**: Path to a config file. Same arguments as below in
key=val / ini style format.
* **branches**: Colon delimited list of branches. * **branches**: Colon delimited list of branches.
* **allow_other**: A libfuse option which allows users besides the one which ran mergerfs to see the filesystem. This is required for most use-cases.
* **minfreespace=SIZE**: The minimum space value used for creation policies. Can be overridden by branch specific option. Understands 'K', 'M', and 'G' to represent kilobyte, megabyte, and gigabyte respectively. (default: 4G)
* **moveonenospc=BOOL|POLICY**: When enabled if a **write** fails with **ENOSPC** (no space left on device) or **EDQUOT** (disk quota exceeded) the policy selected will run to find a new location for the file. An attempt to move the file to that branch will occur (keeping all metadata possible) and if successful the original is unlinked and the write retried. (default: false, true = mfs)
* **use_ino**: Causes mergerfs to supply file/directory inodes rather than libfuse. While not a default it is recommended it be enabled so that linked files share the same inode value.
* **inodecalc=passthrough|path-hash|devino-hash|hybrid-hash**: Selects the inode calculation algorithm. (default: hybrid-hash)
* **dropcacheonclose=BOOL**: When a file is requested to be closed call `posix_fadvise` on it first to instruct the kernel that we no longer need the data and it can drop its cache. Recommended when **cache.files=partial|full|auto-full** to limit double caching. (default: false)
* **symlinkify=BOOL**: When enabled and a file is not writable and its mtime or ctime is older than **symlinkify_timeout** files will be reported as symlinks to the original files. Please read more below before using. (default: false)
* **symlinkify_timeout=UINT**: Time to wait, in seconds, to activate the **symlinkify** behavior. (default: 3600)
* **nullrw=BOOL**: Turns reads and writes into no-ops. The request will succeed but do nothing. Useful for benchmarking mergerfs. (default: false)
* **ignorepponrename=BOOL**: Ignore path preserving on rename. Typically rename and link act differently depending on the policy of `create` (read below). Enabling this will cause rename and link to always use the non-path preserving behavior. This means files, when renamed or linked, will stay on the same drive. (default: false)
* **security_capability=BOOL**: If false return ENOATTR when xattr security.capability is queried. (default: true)
* **xattr=passthrough|noattr|nosys**: Runtime control of xattrs. Default is to passthrough xattr requests. 'noattr' will short circuit as if nothing exists. 'nosys' will respond with ENOSYS as if xattrs are not supported or disabled. (default: passthrough)
* **link_cow=BOOL**: When enabled if a regular file is opened which has a link count > 1 it will copy the file to a temporary file and rename over the original. Breaking the link and providing a basic copy-on-write function similar to cow-shell. (default: false)
* **statfs=base|full**: Controls how statfs works. 'base' means it will always use all branches in statfs calculations. 'full' is in effect path preserving and only includes drives where the path exists. (default: base)
* **statfs_ignore=none|ro|nc**: 'ro' will cause statfs calculations to ignore available space for branches mounted or tagged as 'read-only' or 'no create'. 'nc' will ignore available space for branches tagged as 'no create'. (default: none)
* **nfsopenhack=off|git|all**: A workaround for exporting mergerfs over NFS where there are issues with creating files for write while setting the mode to read-only. (default: off)
* **follow-symlinks=never|directory|regular|all**: Turns symlinks into what they point to. (default: never)
* **link-exdev=passthrough|rel-symlink|abs-base-symlink|abs-pool-symlink**: When a link fails with EXDEV optionally create a symlink to the file instead.
* **rename-exdev=passthrough|rel-symlink|abs-symlink**: When a rename fails with EXDEV optionally move the file to a special directory and symlink to it.
* **posix_acl=BOOL**: Enable POSIX ACL support (if supported by kernel and underlying filesystem). (default: false)
* **async_read=BOOL**: Perform reads asynchronously. If disabled or unavailable the kernel will ensure there is at most one pending read request per file handle and will attempt to order requests by offset. (default: true)
* **fuse_msg_size=UINT**: Set the max number of pages per FUSE message. Only available on Linux >= 4.20 and ignored otherwise. (min: 1; max: 256; default: 256)
* **threads=INT**: Number of threads to use in multithreaded mode. When set to zero it will attempt to discover and use the number of logical cores. If the lookup fails it will fall back to using 4. If the thread count is set negative it will look up the number of cores then divide by the absolute value. ie. threads=-2 on an 8 core machine will result in 8 / 2 = 4 threads. There will always be at least 1 thread. NOTE: higher number of threads increases parallelism but usually decreases throughput. (default: 0)
* **fsname=STR**: Sets the name of the filesystem as seen in **mount**, **df**, etc. Defaults to a list of the source paths concatenated together with the longest common prefix removed.
* **func.FUNC=POLICY**: Sets the specific FUSE function's policy. See below for the list of value types. Example: **func.getattr=newest**
* **category.action=POLICY**: Sets policy of all FUSE functions in the action category. (default: epall)
* **category.create=POLICY**: Sets policy of all FUSE functions in the create category. (default: epmfs)
* **category.search=POLICY**: Sets policy of all FUSE functions in the search category. (default: ff)
* **cache.open=UINT**: 'open' policy cache timeout in seconds. (default: 0)
* **cache.statfs=UINT**: 'statfs' cache timeout in seconds. (default: 0)
* **cache.attr=UINT**: File attribute cache timeout in seconds. (default: 1)
* **cache.entry=UINT**: File name lookup cache timeout in seconds. (default: 1)
* **cache.negative_entry=UINT**: Negative file name lookup cache timeout in seconds. (default: 0)
* **cache.files=libfuse|off|partial|full|auto-full**: File page caching mode (default: libfuse)
* **cache.writeback=BOOL**: Enable kernel writeback caching (default: false)
* **cache.symlinks=BOOL**: Cache symlinks (if supported by kernel) (default: false)
* **cache.readdir=BOOL**: Cache readdir (if supported by kernel) (default: false)
* **direct_io**: deprecated - Bypass page cache. Use `cache.files=off` instead. (default: false)
* **kernel_cache**: deprecated - Do not invalidate data cache on file open. Use `cache.files=full` instead. (default: false)
* **auto_cache**: deprecated - Invalidate data cache if file mtime or size change. Use `cache.files=auto-full` instead. (default: false)
* **async_read**: deprecated - Perform reads asynchronously. Use `async_read=true` instead.
* **sync_read**: deprecated - Perform reads synchronously. Use `async_read=false` instead.
* **allow_other**: A libfuse option which allows users besides the one
which ran mergerfs to see the filesystem. This is required for most
use-cases.
* **minfreespace=SIZE**: The minimum space value used for creation
policies. Can be overridden by branch specific option. Understands
'K', 'M', and 'G' to represent kilobyte, megabyte, and gigabyte
respectively. (default: 4G)
* **moveonenospc=BOOL|POLICY**: When enabled if a **write** fails with
**ENOSPC** (no space left on device) or **EDQUOT** (disk quota
exceeded) the policy selected will run to find a new location for
the file. An attempt to move the file to that branch will occur
(keeping all metadata possible) and if successful the original is
unlinked and the write retried. (default: false, true = mfs)
* **use_ino**: Causes mergerfs to supply file/directory inodes rather
than libfuse. While not a default it is recommended it be enabled so
that linked files share the same inode value.
* **inodecalc=passthrough|path-hash|devino-hash|hybrid-hash**: Selects
the inode calculation algorithm. (default: hybrid-hash)
* **dropcacheonclose=BOOL**: When a file is requested to be closed
call `posix_fadvise` on it first to instruct the kernel that we no
longer need the data and it can drop its cache. Recommended when
**cache.files=partial|full|auto-full** to limit double
caching. (default: false)
* **symlinkify=BOOL**: When enabled and a file is not writable and its
mtime or ctime is older than **symlinkify_timeout** files will be
reported as symlinks to the original files. Please read more below
before using. (default: false)
* **symlinkify_timeout=UINT**: Time to wait, in seconds, to activate
the **symlinkify** behavior. (default: 3600)
* **nullrw=BOOL**: Turns reads and writes into no-ops. The request
will succeed but do nothing. Useful for benchmarking
mergerfs. (default: false)
* **ignorepponrename=BOOL**: Ignore path preserving on
rename. Typically rename and link act differently depending on the
policy of `create` (read below). Enabling this will cause rename and
link to always use the non-path preserving behavior. This means
files, when renamed or linked, will stay on the same
drive. (default: false)
* **security_capability=BOOL**: If false return ENOATTR when xattr
security.capability is queried. (default: true)
* **xattr=passthrough|noattr|nosys**: Runtime control of
xattrs. Default is to passthrough xattr requests. 'noattr' will
short circuit as if nothing exists. 'nosys' will respond with ENOSYS
as if xattrs are not supported or disabled. (default: passthrough)
* **link_cow=BOOL**: When enabled if a regular file is opened which
has a link count > 1 it will copy the file to a temporary file and
rename over the original. Breaking the link and providing a basic
copy-on-write function similar to cow-shell. (default: false)
* **statfs=base|full**: Controls how statfs works. 'base' means it
will always use all branches in statfs calculations. 'full' is in
effect path preserving and only includes drives where the path
exists. (default: base)
* **statfs_ignore=none|ro|nc**: 'ro' will cause statfs calculations to
ignore available space for branches mounted or tagged as 'read-only'
or 'no create'. 'nc' will ignore available space for branches tagged
as 'no create'. (default: none)
* **nfsopenhack=off|git|all**: A workaround for exporting mergerfs
over NFS where there are issues with creating files for write while
setting the mode to read-only. (default: off)
* **follow-symlinks=never|directory|regular|all**: Turns symlinks into
what they point to. (default: never)
* **link-exdev=passthrough|rel-symlink|abs-base-symlink|abs-pool-symlink**:
When a link fails with EXDEV optionally create a symlink to the file
instead.
* **rename-exdev=passthrough|rel-symlink|abs-symlink**: When a rename
fails with EXDEV optionally move the file to a special directory and
symlink to it.
* **posix_acl=BOOL**: Enable POSIX ACL support (if supported by kernel
and underlying filesystem). (default: false)
* **async_read=BOOL**: Perform reads asynchronously. If disabled or
unavailable the kernel will ensure there is at most one pending read
request per file handle and will attempt to order requests by
offset. (default: true)
* **fuse_msg_size=UINT**: Set the max number of pages per FUSE
message. Only available on Linux >= 4.20 and ignored
otherwise. (min: 1; max: 256; default: 256)
* **threads=INT**: Number of threads to use. When used alone
(`process-thread-count=-1`) it sets the number of threads reading
and processing FUSE messages. When used together it sets the number
of threads reading from FUSE. When set to zero it will attempt to
discover and use the number of logical cores. If the thread count is
set negative it will look up the number of cores then divide by the
absolute value. ie. threads=-2 on an 8 core machine will result in 8
/ 2 = 4 threads. There will always be at least 1 thread. If set to
-1 in combination with `process-thread-count` then it will try to
pick reasonable values based on CPU thread count. NOTE: higher
number of threads increases parallelism but usually decreases
throughput. (default: 0)
* **read-thread-count=INT**: Alias for `threads`.
* **process-thread-count=INT**: Enables separate thread pool to
asynchronously process FUSE requests. In this mode
`read-thread-count` refers to the number of threads reading FUSE
messages which are dispatched to process threads. -1 means disabled
otherwise acts like `read-thread-count`. (default: -1)
* **fsname=STR**: Sets the name of the filesystem as seen in
**mount**, **df**, etc. Defaults to a list of the source paths
concatenated together with the longest common prefix removed.
* **func.FUNC=POLICY**: Sets the specific FUSE function's policy. See
below for the list of value types. Example: **func.getattr=newest**
* **category.action=POLICY**: Sets policy of all FUSE functions in the
action category. (default: epall)
* **category.create=POLICY**: Sets policy of all FUSE functions in the
create category. (default: epmfs)
* **category.search=POLICY**: Sets policy of all FUSE functions in the
search category. (default: ff)
* **cache.open=UINT**: 'open' policy cache timeout in
seconds. (default: 0)
* **cache.statfs=UINT**: 'statfs' cache timeout in seconds. (default:
0)
* **cache.attr=UINT**: File attribute cache timeout in
seconds. (default: 1)
* **cache.entry=UINT**: File name lookup cache timeout in
seconds. (default: 1)
* **cache.negative_entry=UINT**: Negative file name lookup cache
timeout in seconds. (default: 0)
* **cache.files=libfuse|off|partial|full|auto-full**: File page
caching mode (default: libfuse)
* **cache.writeback=BOOL**: Enable kernel writeback caching (default:
false)
* **cache.symlinks=BOOL**: Cache symlinks (if supported by kernel)
(default: false)
* **cache.readdir=BOOL**: Cache readdir (if supported by kernel)
(default: false)
* **direct_io**: deprecated - Bypass page cache. Use `cache.files=off`
instead. (default: false)
* **kernel_cache**: deprecated - Do not invalidate data cache on file
open. Use `cache.files=full` instead. (default: false)
* **auto_cache**: deprecated - Invalidate data cache if file mtime or
size change. Use `cache.files=auto-full` instead. (default: false)
* **async_read**: deprecated - Perform reads asynchronously. Use
`async_read=true` instead.
* **sync_read**: deprecated - Perform reads synchronously. Use
`async_read=false` instead.
**NOTE:** Options are evaluated in the order listed so if the options are **func.rmdir=rand,category.action=ff** the **action** category setting will override the **rmdir** setting. **NOTE:** Options are evaluated in the order listed so if the options are **func.rmdir=rand,category.action=ff** the **action** category setting will override the **rmdir** setting.

27
libfuse/Makefile

@ -31,13 +31,12 @@ INSTALLMAN1DIR = $(DESTDIR)$(MAN1DIR)
AR ?= ar AR ?= ar
SRC = \
SRC_C = \
lib/buffer.c \ lib/buffer.c \
lib/crc32b.c \ lib/crc32b.c \
lib/debug.c \ lib/debug.c \
lib/fuse.c \ lib/fuse.c \
lib/fuse_dirents.c \ lib/fuse_dirents.c \
lib/fuse_loop_mt.c \
lib/fuse_lowlevel.c \ lib/fuse_lowlevel.c \
lib/fuse_mt.c \ lib/fuse_mt.c \
lib/fuse_node.c \ lib/fuse_node.c \
@ -46,8 +45,13 @@ SRC = \
lib/fuse_signals.c \ lib/fuse_signals.c \
lib/helper.c \ lib/helper.c \
lib/mount.c lib/mount.c
OBJS = $(SRC:lib/%.c=build/%.o)
DEPS = $(SRC:lib/%.c=build/%.d)
SRC_CPP = \
lib/fuse_loop_mt.cpp \
lib/fuse_msgbuf.cpp
OBJS_C = $(SRC_C:lib/%.c=build/%.o)
OBJS_CPP = $(SRC_CPP:lib/%.cpp=build/%.o)
DEPS_C = $(SRC_C:lib/%.c=build/%.d)
DEPS_CPP = $(SRC_CPP:lib/%.cpp=build/%.d)
CFLAGS ?= \ CFLAGS ?= \
$(OPT_FLAGS) $(OPT_FLAGS)
CFLAGS := \ CFLAGS := \
@ -56,6 +60,12 @@ CFLAGS := \
-Wall \ -Wall \
-pipe \ -pipe \
-MMD -MMD
CXXFLAGS := \
${CXXFLAGS} \
-std=c++11 \
-Wall \
-pipe \
-MMD
FUSERMOUNT_DIR = $(BINDIR) FUSERMOUNT_DIR = $(BINDIR)
FUSE_FLAGS = \ FUSE_FLAGS = \
-Iinclude \ -Iinclude \
@ -80,10 +90,10 @@ build/stamp:
touch $@ touch $@
objects: build/config.h objects: build/config.h
$(MAKE) $(OBJS)
$(MAKE) $(OBJS_C) $(OBJS_CPP)
build/libfuse.a: objects build/libfuse.a: objects
${AR} rcs build/libfuse.a $(OBJS)
${AR} rcs build/libfuse.a $(OBJS_C) $(OBJS_CPP)
utils: mergerfs-fusermount mount.mergerfs utils: mergerfs-fusermount mount.mergerfs
@ -100,6 +110,9 @@ mount.mergerfs: build/mount.mergerfs
build/%.o: lib/%.c build/%.o: lib/%.c
$(CC) $(CFLAGS) $(FUSE_FLAGS) -c $< -o $@ $(CC) $(CFLAGS) $(FUSE_FLAGS) -c $< -o $@
build/%.o: lib/%.cpp
$(CXX) $(CXXFLAGS) $(FUSE_FLAGS) -c $< -o $@
clean: clean:
rm -rf build rm -rf build
@ -119,4 +132,4 @@ install: $(INSTALLUTILS)
.PHONY: objects strip utils install install-utils .PHONY: objects strip utils install install-utils
-include $(DEPS)
-include $(DEPS_C) $(DEPS_CPP)

9
libfuse/include/fuse.h

@ -641,7 +641,8 @@ void fuse_destroy(struct fuse *f);
*/ */
void fuse_exit(struct fuse *f); void fuse_exit(struct fuse *f);
int fuse_config_num_threads(const struct fuse *fuse_);
int fuse_config_read_thread_count(const struct fuse *f);
int fuse_config_process_thread_count(const struct fuse *f);
/** /**
* FUSE event loop with multiple threads * FUSE event loop with multiple threads
@ -762,12 +763,6 @@ struct fuse *fuse_setup(int argc, char *argv[],
/** This is the part of fuse_main() after the event loop */ /** This is the part of fuse_main() after the event loop */
void fuse_teardown(struct fuse *fuse, char *mountpoint); void fuse_teardown(struct fuse *fuse, char *mountpoint);
/** Read a single command. If none are read, return NULL */
struct fuse_cmd *fuse_read_cmd(struct fuse *f);
/** Process a single command */
void fuse_process_cmd(struct fuse *f, struct fuse_cmd *cmd);
/** Multi threaded event loop, which calls the custom command /** Multi threaded event loop, which calls the custom command
processor function */ processor function */
int fuse_loop_mt_proc(struct fuse *f, fuse_processor_t proc, void *data); int fuse_loop_mt_proc(struct fuse *f, fuse_processor_t proc, void *data);

14
libfuse/include/fuse_lowlevel.h

@ -19,6 +19,7 @@
* 25 * 25
*/ */
#include "fuse_msgbuf.h"
#ifndef FUSE_USE_VERSION #ifndef FUSE_USE_VERSION
#define FUSE_USE_VERSION 24 #define FUSE_USE_VERSION 24
#endif #endif
@ -1478,9 +1479,12 @@ void *fuse_session_data(struct fuse_session *se);
int fuse_session_receive(struct fuse_session *se, int fuse_session_receive(struct fuse_session *se,
struct fuse_buf *buf); struct fuse_buf *buf);
void fuse_session_process(struct fuse_session *se, void fuse_session_process(struct fuse_session *se,
const struct fuse_buf *buf);
const void *buf,
const size_t bufsize);
int fuse_session_loop_mt(struct fuse_session *se, const int threads);
int fuse_session_loop_mt(struct fuse_session *se,
const int read_thread_count,
const int process_thread_count);
/* ----------------------------------------------------------- * /* ----------------------------------------------------------- *
* Channel interface * * Channel interface *
@ -1520,12 +1524,6 @@ void *fuse_chan_data(struct fuse_chan *ch);
*/ */
struct fuse_session *fuse_chan_session(struct fuse_chan *ch); struct fuse_session *fuse_chan_session(struct fuse_chan *ch);
int fuse_chan_recv(struct fuse_chan *ch,
char *buf,
size_t size);
int fuse_chan_send(struct fuse_chan *ch,
const struct iovec iov[],
size_t count);
void fuse_chan_destroy(struct fuse_chan *ch); void fuse_chan_destroy(struct fuse_chan *ch);
EXTERN_C_END EXTERN_C_END

12
libfuse/include/fuse_msgbuf.h

@ -0,0 +1,12 @@
#pragma once
#include <stdint.h>
typedef struct fuse_msgbuf_t fuse_msgbuf_t;
struct fuse_msgbuf_t
{
char *mem;
uint32_t size;
uint32_t used;
int pipefd[2];
};

165
libfuse/lib/bounded_queue.hpp

@ -0,0 +1,165 @@
#pragma once
#include <condition_variable>
#include <mutex>
#include <queue>
#include <utility>
template<typename T>
class BoundedQueue
{
public:
explicit
BoundedQueue(std::size_t max_size_,
bool block_ = true)
: _block(block),
_max_size(max_size_)
{
if(_max_size == 0)
_max_size = 1;
}
bool
push(const T& item_)
{
{
std::unique_lock guard(_queue_lock);
_condition_push.wait(guard, [&]() { return _queue.size() < _max_size || !_block; });
if(_queue.size() == _max_size)
return false;
_queue.push(item);
}
_condition_pop.notify_one();
return true;
}
bool
push(T&& item_)
{
{
std::unique_lock guard(_queue_lock);
_condition_push.wait(guard, [&]() { return _queue.size() < _max_size || !_block; });
if(_queue.size() == _max_size)
return false;
_queue.push(std::move(item_));
}
_condition_pop.notify_one();
return true;
}
template<typename... Args>
bool
emplace(Args&&... args_)
{
{
std::unique_lock guard(_queue_lock);
_condition_push.wait(guard, [&]() { return _queue.size() < _max_size || !_block; });
if(_queue.size() == _max_size)
return false;
_queue.emplace(std::forward<Args>(args_)...);
}
_condition_pop.notify_one();
return true;
}
bool
pop(T& item_)
{
{
std::unique_lock guard(_queue_lock);
_condition_pop.wait(guard, [&]() { return !_queue.empty() || !_block; });
if(_queue.empty())
return false;
item_ = std::move(_queue.front());
_queue.pop();
}
_condition_push.notify_one();
return true;
}
std::size_t
size() const
{
std::lock_guard guard(_queue_lock);
return _queue.size();
}
std::size_t
capacity() const
{
return _max_size;
}
bool
empty() const
{
std::lock_guard guard(_queue_lock);
return _queue.empty();
}
bool
full() const
{
std::lock_guard lock(_queue_lock);
return (_queue.size() == capacity());
}
void
block()
{
std::lock_guard guard(_queue_lock);
_block = true;
}
void
unblock()
{
{
std::lock_guard guard(_queue_lock);
_block = false;
}
_condition_push.notify_all();
_condition_pop.notify_all();
}
bool
blocking() const
{
std::lock_guard guard(_queue_lock);
return _block;
}
private:
mutable std::mutex _queue_lock;
private:
bool _block;
std::queue<T> _queue;
const std::size_t _max_size;
std::condition_variable _condition_push;
std::condition_variable _condition_pop;
};

72
libfuse/lib/fuse.c

@ -72,7 +72,8 @@ struct fuse_config
int set_uid; int set_uid;
int set_gid; int set_gid;
int help; int help;
int threads;
int read_thread_count;
int process_thread_count;
}; };
struct fuse_fs struct fuse_fs
@ -3657,14 +3658,6 @@ fuse_notify_poll(fuse_pollhandle_t *ph)
return fuse_lowlevel_notify_poll(ph); return fuse_lowlevel_notify_poll(ph);
} }
static
void
free_cmd(struct fuse_cmd *cmd)
{
free(cmd->buf);
free(cmd);
}
int int
fuse_exited(struct fuse *f) fuse_exited(struct fuse *f)
{ {
@ -3677,53 +3670,6 @@ fuse_get_session(struct fuse *f)
return f->se; return f->se;
} }
static
struct fuse_cmd*
fuse_alloc_cmd(size_t bufsize)
{
struct fuse_cmd *cmd = (struct fuse_cmd *)malloc(sizeof(*cmd));
if(cmd == NULL)
{
fprintf(stderr,"fuse: failed to allocate cmd\n");
return NULL;
}
cmd->buf = (char *)malloc(bufsize);
if(cmd->buf == NULL)
{
fprintf(stderr,"fuse: failed to allocate read buffer\n");
free(cmd);
return NULL;
}
return cmd;
}
struct fuse_cmd*
fuse_read_cmd(struct fuse *f)
{
struct fuse_chan *ch = f->se->ch;
size_t bufsize = fuse_chan_bufsize(ch);
struct fuse_cmd *cmd = fuse_alloc_cmd(bufsize);
if(cmd != NULL)
{
int res = fuse_chan_recv(ch,cmd->buf,bufsize);
if(res <= 0)
{
free_cmd(cmd);
if(res < 0 && res != -EINTR && res != -EAGAIN)
fuse_exit(f);
return NULL;
}
cmd->buflen = res;
cmd->ch = ch;
}
return cmd;
}
void void
fuse_exit(struct fuse *f) fuse_exit(struct fuse *f)
{ {
@ -3759,7 +3705,9 @@ static const struct fuse_opt fuse_lib_opts[] =
FUSE_LIB_OPT("gid=%d", gid,0), FUSE_LIB_OPT("gid=%d", gid,0),
FUSE_LIB_OPT("noforget", remember,-1), FUSE_LIB_OPT("noforget", remember,-1),
FUSE_LIB_OPT("remember=%u", remember,0), FUSE_LIB_OPT("remember=%u", remember,0),
FUSE_LIB_OPT("threads=%d", threads,0),
FUSE_LIB_OPT("threads=%d", read_thread_count,0),
FUSE_LIB_OPT("read-thread-count=%d", read_thread_count,0),
FUSE_LIB_OPT("process-thread-count=%d", process_thread_count,-1),
FUSE_LIB_OPT("use_ino", use_ino,1), FUSE_LIB_OPT("use_ino", use_ino,1),
FUSE_OPT_END FUSE_OPT_END
}; };
@ -4118,9 +4066,15 @@ fuse_destroy(struct fuse *f)
} }
int int
fuse_config_num_threads(const struct fuse *fuse_)
fuse_config_read_thread_count(const struct fuse *f_)
{
return f_->conf.read_thread_count;
}
int
fuse_config_process_thread_count(const struct fuse *f_)
{ {
return fuse_->conf.threads;
return f_->conf.process_thread_count;
} }
void void

19
libfuse/lib/fuse_i.h

@ -10,6 +10,9 @@
#include "fuse.h" #include "fuse.h"
#include "fuse_lowlevel.h" #include "fuse_lowlevel.h"
#include "fuse_msgbuf.h"
#include "extern_c.h"
struct fuse_chan; struct fuse_chan;
struct fuse_ll; struct fuse_ll;
@ -17,16 +20,14 @@ struct fuse_ll;
struct fuse_session struct fuse_session
{ {
int (*receive_buf)(struct fuse_session *se, int (*receive_buf)(struct fuse_session *se,
struct fuse_buf *buf,
struct fuse_chan *ch);
fuse_msgbuf_t *msgbuf);
void (*process_buf)(void *data,
const struct fuse_buf *buf,
struct fuse_chan *ch);
void (*process_buf)(struct fuse_session *se,
const fuse_msgbuf_t *msgbuf);
void (*destroy)(void *data); void (*destroy)(void *data);
void *data;
struct fuse_ll *f;
volatile int exited; volatile int exited;
struct fuse_chan *ch; struct fuse_chan *ch;
}; };
@ -64,11 +65,11 @@ struct fuse_ll
int no_splice_move; int no_splice_move;
int no_splice_read; int no_splice_read;
struct fuse_lowlevel_ops op; struct fuse_lowlevel_ops op;
int got_init;
void *userdata; void *userdata;
uid_t owner; uid_t owner;
struct fuse_conn_info conn; struct fuse_conn_info conn;
pthread_mutex_t lock; pthread_mutex_t lock;
int got_init;
int got_destroy; int got_destroy;
pthread_key_t pipe_key; pthread_key_t pipe_key;
int broken_splice_nonblock; int broken_splice_nonblock;
@ -83,6 +84,8 @@ struct fuse_cmd
struct fuse_chan *ch; struct fuse_chan *ch;
}; };
EXTERN_C_BEGIN
struct fuse *fuse_new_common(struct fuse_chan *ch, struct fuse_args *args, struct fuse *fuse_new_common(struct fuse_chan *ch, struct fuse_args *args,
const struct fuse_operations *op, const struct fuse_operations *op,
size_t op_size); size_t op_size);
@ -110,3 +113,5 @@ struct fuse *fuse_setup_common(int argc, char *argv[],
int *fd); int *fd);
int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg); int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg);
EXTERN_C_END

15
libfuse/lib/fuse_ll.hpp

@ -0,0 +1,15 @@
#pragma once
#include "fuse_msgbuf.h"
int fuse_receive_buf(struct fuse_session *se,
fuse_msgbuf_t *msgbuf);
void fuse_process_buf(void *data,
const fuse_msgbuf_t *msgbuf,
struct fuse_chan *ch);
int fuse_receive_buf_splice(struct fuse_chan *ch,
fuse_msgbuf_t *msgbuf);
void fuse_process_buf_splice(struct fuse_chan *ch,
const fuse_msgbuf_t *msgbuf,
void *data);

231
libfuse/lib/fuse_loop_mt.c

@ -1,231 +0,0 @@
/*
FUSE: Filesystem in Userspace
Copyright (C) 2001-2007 Miklos Szeredi <miklos@szeredi.hu>
This program can be distributed under the terms of the GNU LGPLv2.
See the file COPYING.LIB.
*/
#include "fuse_i.h"
#include "fuse_kernel.h"
#include "fuse_lowlevel.h"
#include "fuse_misc.h"
#include <errno.h>
#include <semaphore.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <unistd.h>
#include <unistd.h>
/* Environment var controlling the thread stack size */
#define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK"
struct fuse_worker
{
struct fuse_worker *prev;
struct fuse_worker *next;
pthread_t thread_id;
size_t bufsize;
char *buf;
struct fuse_mt *mt;
};
struct fuse_mt
{
struct fuse_session *se;
struct fuse_worker main;
sem_t finish;
int exit;
int error;
};
static
void
list_add_worker(struct fuse_worker *w,
struct fuse_worker *next)
{
struct fuse_worker *prev = next->prev;
w->next = next;
w->prev = prev;
prev->next = w;
next->prev = w;
}
static void list_del_worker(struct fuse_worker *w)
{
struct fuse_worker *prev = w->prev;
struct fuse_worker *next = w->next;
prev->next = next;
next->prev = prev;
}
static int fuse_loop_start_thread(struct fuse_mt *mt);
static
void*
fuse_do_work(void *data)
{
struct fuse_worker *w = (struct fuse_worker *) data;
struct fuse_mt *mt = w->mt;
while(!fuse_session_exited(mt->se))
{
int res;
struct fuse_buf fbuf;
fbuf = (struct fuse_buf){ .mem = w->buf,
.size = w->bufsize };
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
res = fuse_session_receive(mt->se,&fbuf);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
if(res == -EINTR)
continue;
if(res <= 0)
{
if(res < 0)
{
mt->se->exited = 1;
mt->error = -1;
}
break;
}
if(mt->exit)
return NULL;
fuse_session_process(mt->se,&fbuf);
}
sem_post(&mt->finish);
return NULL;
}
int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg)
{
sigset_t oldset;
sigset_t newset;
int res;
pthread_attr_t attr;
char *stack_size;
/* Override default stack size */
pthread_attr_init(&attr);
stack_size = getenv(ENVNAME_THREAD_STACK);
if(stack_size && pthread_attr_setstacksize(&attr, atoi(stack_size)))
fprintf(stderr, "fuse: invalid stack size: %s\n", stack_size);
/* Disallow signal reception in worker threads */
sigfillset(&newset);
pthread_sigmask(SIG_BLOCK,&newset,&oldset);
res = pthread_create(thread_id, &attr, func, arg);
pthread_sigmask(SIG_SETMASK,&oldset,NULL);
pthread_attr_destroy(&attr);
if(res != 0) {
fprintf(stderr, "fuse: error creating thread: %s\n",
strerror(res));
return -1;
}
return 0;
}
static int fuse_loop_start_thread(struct fuse_mt *mt)
{
int res;
struct fuse_worker *w = malloc(sizeof(struct fuse_worker));
if(!w) {
fprintf(stderr, "fuse: failed to allocate worker structure\n");
return -1;
}
memset(w, 0, sizeof(struct fuse_worker));
w->bufsize = fuse_chan_bufsize(mt->se->ch);
w->buf = calloc(w->bufsize,1);
w->mt = mt;
if(!w->buf) {
fprintf(stderr, "fuse: failed to allocate read buffer\n");
free(w);
return -1;
}
res = fuse_start_thread(&w->thread_id, fuse_do_work, w);
if(res == -1) {
free(w->buf);
free(w);
return -1;
}
list_add_worker(w, &mt->main);
return 0;
}
static void fuse_join_worker(struct fuse_worker *w)
{
pthread_join(w->thread_id, NULL);
list_del_worker(w);
free(w->buf);
free(w);
}
static int number_of_threads(void)
{
#ifdef _SC_NPROCESSORS_ONLN
return sysconf(_SC_NPROCESSORS_ONLN);
#endif
return 4;
}
int
fuse_session_loop_mt(struct fuse_session *se_,
const int threads_)
{
int i;
int err;
int threads;
struct fuse_mt mt;
struct fuse_worker *w;
memset(&mt,0,sizeof(struct fuse_mt));
mt.se = se_;
mt.error = 0;
mt.main.thread_id = pthread_self();
mt.main.prev = mt.main.next = &mt.main;
sem_init(&mt.finish,0,0);
threads = ((threads_ > 0) ? threads_ : number_of_threads());
if(threads_ < 0)
threads /= -threads_;
if(threads == 0)
threads = 1;
err = 0;
for(i = 0; (i < threads) && !err; i++)
err = fuse_loop_start_thread(&mt);
if(!err)
{
/* sem_wait() is interruptible */
while(!fuse_session_exited(se_))
sem_wait(&mt.finish);
for(w = mt.main.next; w != &mt.main; w = w->next)
pthread_cancel(w->thread_id);
mt.exit = 1;
while(mt.main.next != &mt.main)
fuse_join_worker(mt.main.next);
err = mt.error;
}
sem_destroy(&mt.finish);
fuse_session_reset(se_);
return err;
}

266
libfuse/lib/fuse_loop_mt.cpp

@ -0,0 +1,266 @@
#include "thread_pool.hpp"
#include "fuse_i.h"
#include "fuse_kernel.h"
#include "fuse_lowlevel.h"
#include "fuse_misc.h"
#include "fuse_msgbuf.hpp"
#include "fuse_ll.hpp"
#include <errno.h>
#include <semaphore.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <unistd.h>
#include <cassert>
#include <vector>
struct fuse_worker_data_t
{
struct fuse_session *se;
sem_t finished;
std::function<void(fuse_worker_data_t*,fuse_msgbuf_t*)> msgbuf_processor;
std::function<fuse_msgbuf_t*(void)> msgbuf_allocator;
std::shared_ptr<ThreadPool> tp;
};
class WorkerCleanup
{
public:
WorkerCleanup(fuse_worker_data_t *wd_)
: _wd(wd_)
{
}
~WorkerCleanup()
{
fuse_session_exit(_wd->se);
sem_post(&_wd->finished);
}
private:
fuse_worker_data_t *_wd;
};
static
bool
retriable_receive_error(const int err_)
{
switch(err_)
{
case -EINTR:
case -EAGAIN:
case -ENOENT:
return true;
default:
return false;
}
}
static
bool
fatal_receive_error(const int err_)
{
return (err_ < 0);
}
static
void*
handle_receive_error(const int rv_,
fuse_msgbuf_t *msgbuf_)
{
msgbuf_free(msgbuf_);
fprintf(stderr,
"mergerfs: error reading from /dev/fuse - %s (%d)\n",
strerror(-rv_),
-rv_);
return NULL;
}
static
void*
fuse_do_work(void *data)
{
fuse_worker_data_t *wd = (fuse_worker_data_t*)data;
fuse_session *se = wd->se;
auto &process_msgbuf = wd->msgbuf_processor;
auto &msgbuf_allocator = wd->msgbuf_allocator;
WorkerCleanup workercleanup(wd);
while(!fuse_session_exited(se))
{
int rv;
fuse_msgbuf_t *msgbuf;
msgbuf = msgbuf_allocator();
do
{
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
rv = se->receive_buf(se,msgbuf);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
if(rv == 0)
return NULL;
if(retriable_receive_error(rv))
continue;
if(fatal_receive_error(rv))
return handle_receive_error(rv,msgbuf);
} while(false);
process_msgbuf(wd,msgbuf);
}
return NULL;
}
int
fuse_start_thread(pthread_t *thread_id,
void *(*func)(void *),
void *arg)
{
int res;
sigset_t oldset;
sigset_t newset;
sigfillset(&newset);
pthread_sigmask(SIG_BLOCK,&newset,&oldset);
res = pthread_create(thread_id,NULL,func,arg);
pthread_sigmask(SIG_SETMASK,&oldset,NULL);
if(res != 0)
{
fprintf(stderr,
"fuse: error creating thread: %s\n",
strerror(res));
return -1;
}
return 0;
}
static
int
calculate_thread_count(const int raw_thread_count_)
{
int thread_count;
thread_count = 4;
if(raw_thread_count_ == 0)
thread_count = std::thread::hardware_concurrency();
else if(raw_thread_count_ < 0)
thread_count = (std::thread::hardware_concurrency() / -raw_thread_count_);
else if(raw_thread_count_ > 0)
thread_count = raw_thread_count_;
if(thread_count <= 0)
thread_count = 1;
return thread_count;
}
static
void
calculate_thread_counts(int *read_thread_count_,
int *process_thread_count_)
{
if((*read_thread_count_ == -1) && (*process_thread_count_ == -1))
{
int nproc;
nproc = std::thread::hardware_concurrency();
*read_thread_count_ = 2;
*process_thread_count_ = std::max(2,(nproc - 2));
}
else
{
*read_thread_count_ = ::calculate_thread_count(*read_thread_count_);
if(*process_thread_count_ != -1)
*process_thread_count_ = ::calculate_thread_count(*process_thread_count_);
}
}
static
void
process_msgbuf_sync(fuse_worker_data_t *wd_,
fuse_msgbuf_t *msgbuf_)
{
wd_->se->process_buf(wd_->se,msgbuf_);
msgbuf_free(msgbuf_);
}
static
void
process_msgbuf_async(fuse_worker_data_t *wd_,
fuse_msgbuf_t *msgbuf_)
{
const auto func = [=] {
process_msgbuf_sync(wd_,msgbuf_);
};
wd_->tp->enqueue_work(func);
}
int
fuse_session_loop_mt(struct fuse_session *se_,
const int raw_read_thread_count_,
const int raw_process_thread_count_)
{
int err;
int read_thread_count;
int process_thread_count;
fuse_worker_data_t wd = {0};
std::vector<pthread_t> threads;
read_thread_count = raw_read_thread_count_;
process_thread_count = raw_process_thread_count_;
::calculate_thread_counts(&read_thread_count,&process_thread_count);
if(process_thread_count > 0)
{
wd.tp = std::make_shared<ThreadPool>(process_thread_count);
wd.msgbuf_processor = process_msgbuf_async;
}
else
{
wd.msgbuf_processor = process_msgbuf_sync;
}
wd.msgbuf_allocator = ((se_->f->splice_read) ? msgbuf_alloc : msgbuf_alloc_memonly);
wd.se = se_;
sem_init(&wd.finished,0,0);
err = 0;
for(int i = 0; i < read_thread_count; i++)
{
pthread_t thread_id;
err = fuse_start_thread(&thread_id,fuse_do_work,&wd);
assert(err == 0);
threads.push_back(thread_id);
}
if(!err)
{
/* sem_wait() is interruptible */
while(!fuse_session_exited(se_))
sem_wait(&wd.finished);
for(const auto &thread_id : threads)
pthread_cancel(thread_id);
for(const auto &thread_id : threads)
pthread_join(thread_id,NULL);
}
sem_destroy(&wd.finished);
return err;
}

373
libfuse/lib/fuse_lowlevel.c

@ -17,6 +17,7 @@
#include "fuse_opt.h" #include "fuse_opt.h"
#include "fuse_misc.h" #include "fuse_misc.h"
#include "fuse_pollhandle.h" #include "fuse_pollhandle.h"
#include "fuse_msgbuf.h"
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@ -133,11 +134,16 @@ fuse_send_msg(struct fuse_ll *f,
struct iovec *iov, struct iovec *iov,
int count) int count)
{ {
int rv;
struct fuse_out_header *out = iov[0].iov_base; struct fuse_out_header *out = iov[0].iov_base;
out->len = iov_length(iov, count); out->len = iov_length(iov, count);
return fuse_chan_send(ch, iov, count);
rv = writev(fuse_chan_fd(ch),iov,count);
if(rv == -1)
return -errno;
return 0;
} }
int int
@ -231,8 +237,6 @@ fuse_reply_err(fuse_req_t req_,
void void
fuse_reply_none(fuse_req_t req) fuse_reply_none(fuse_req_t req)
{ {
if (req->ch)
fuse_chan_send(req->ch, NULL, 0);
destroy_req(req); destroy_req(req);
} }
@ -1947,70 +1951,6 @@ static struct {
#define FUSE_MAXOP (sizeof(fuse_ll_ops) / sizeof(fuse_ll_ops[0])) #define FUSE_MAXOP (sizeof(fuse_ll_ops) / sizeof(fuse_ll_ops[0]))
static
void
fuse_ll_process_buf(void *data,
const struct fuse_buf *buf,
struct fuse_chan *ch)
{
struct fuse_ll *f = (struct fuse_ll*)data;
struct fuse_in_header *in;
struct fuse_req *req;
int err;
in = buf->mem;
req = fuse_ll_alloc_req(f);
if(req == NULL)
{
struct fuse_out_header out = {
.unique = in->unique,
.error = -ENOMEM,
};
struct iovec iov = {
.iov_base = &out,
.iov_len = sizeof(struct fuse_out_header),
};
fuse_send_msg(f, ch, &iov, 1);
return;
}
req->unique = in->unique;
req->ctx.uid = in->uid;
req->ctx.gid = in->gid;
req->ctx.pid = in->pid;
req->ch = ch;
err = EIO;
if(!f->got_init)
{
enum fuse_opcode expected;
expected = FUSE_INIT;
if (in->opcode != expected)
goto reply_err;
}
else if(in->opcode == FUSE_INIT)
{
goto reply_err;
}
err = ENOSYS;
if(in->opcode >= FUSE_MAXOP)
goto reply_err;
if(fuse_ll_ops[in->opcode].func == NULL)
goto reply_err;
fuse_ll_ops[in->opcode].func(req, in);
return;
reply_err:
fuse_reply_err(req, err);
return;
}
enum { enum {
KEY_HELP, KEY_HELP,
KEY_VERSION, KEY_VERSION,
@ -2130,96 +2070,270 @@ fuse_ll_pipe_destructor(void *data)
fuse_ll_pipe_free(llp); fuse_ll_pipe_free(llp);
} }
#ifdef HAVE_SPLICE
static static
int
fuse_ll_receive_buf(struct fuse_session *se,
struct fuse_buf *buf)
void
fuse_send_errno(struct fuse_ll *f_,
struct fuse_chan *ch_,
const int errno_,
const uint64_t unique_id_)
{ {
struct fuse_ll *f = fuse_session_data(se);
size_t bufsize = buf->size;
struct fuse_ll_pipe *llp;
int err;
int res;
struct fuse_out_header out = {0};
struct iovec iov = {0};
if(f->conn.proto_minor < 14 || !(f->conn.want & FUSE_CAP_SPLICE_READ))
goto fallback;
out.unique = unique_id_;
out.error = -errno_;
iov.iov_base = &out;
iov.iov_len = sizeof(struct fuse_out_header);
llp = fuse_ll_get_pipe(f);
if (llp == NULL)
goto fallback;
fuse_send_msg(f_,ch_,&iov,1);
}
if(llp->size < bufsize)
static
void
fuse_send_enomem(struct fuse_ll *f_,
struct fuse_chan *ch_,
const uint64_t unique_id_)
{ {
if(llp->can_grow)
fuse_send_errno(f_,ch_,ENOMEM,unique_id_);
}
static
int
fuse_ll_buf_receive_read(struct fuse_session *se_,
fuse_msgbuf_t *msgbuf_)
{ {
res = fcntl(llp->pipe[0], F_SETPIPE_SZ, bufsize);
if(res == -1)
int rv;
rv = read(fuse_chan_fd(se_->ch),msgbuf_->mem,msgbuf_->size);
if(rv == -1)
return -errno;
if(rv < sizeof(struct fuse_in_header))
{ {
llp->can_grow = 0;
goto fallback;
fprintf(stderr, "short splice from fuse device\n");
return -EIO;
} }
llp->size = res;
return rv;
} }
if(llp->size < bufsize)
goto fallback;
static
void
fuse_ll_buf_process_read(struct fuse_session *se_,
const fuse_msgbuf_t *msgbuf_)
{
int err;
struct fuse_req *req;
struct fuse_in_header *in;
in = (struct fuse_in_header*)msgbuf_->mem;
req = fuse_ll_alloc_req(se_->f);
if(req == NULL)
return fuse_send_enomem(se_->f,se_->ch,in->unique);
req->unique = in->unique;
req->ctx.uid = in->uid;
req->ctx.gid = in->gid;
req->ctx.pid = in->pid;
req->ch = se_->ch;
err = ENOSYS;
if(in->opcode >= FUSE_MAXOP)
goto reply_err;
if(fuse_ll_ops[in->opcode].func == NULL)
goto reply_err;
fuse_ll_ops[in->opcode].func(req, in);
return;
reply_err:
fuse_reply_err(req, err);
return;
} }
res = splice(fuse_chan_fd(se->ch), NULL, llp->pipe[1], NULL, bufsize, SPLICE_F_MOVE);
err = errno;
static
void
fuse_ll_buf_process_read_init(struct fuse_session *se_,
const fuse_msgbuf_t *msgbuf_)
{
int err;
struct fuse_req *req;
struct fuse_in_header *in;
if(fuse_session_exited(se))
return 0;
in = (struct fuse_in_header*)msgbuf_->mem;
if(res == -1)
req = fuse_ll_alloc_req(se_->f);
if(req == NULL)
return fuse_send_enomem(se_->f,se_->ch,in->unique);
req->unique = in->unique;
req->ctx.uid = in->uid;
req->ctx.gid = in->gid;
req->ctx.pid = in->pid;
req->ch = se_->ch;
err = EIO;
if(in->opcode != FUSE_INIT)
goto reply_err;
if(fuse_ll_ops[in->opcode].func == NULL)
goto reply_err;
se_->process_buf = fuse_ll_buf_process_read;
fuse_ll_ops[in->opcode].func(req, in);
return;
reply_err:
fuse_reply_err(req, err);
return;
}
#if defined(HAVE_SPLICE) && defined(HAVE_VMSPLICE)
static
int
fuse_ll_buf_receive_splice(struct fuse_session *se_,
fuse_msgbuf_t *msgbuf_)
{ {
if(err == ENODEV)
int rv;
size_t bufsize = msgbuf_->size;
rv = splice(fuse_chan_fd(se_->ch),NULL,msgbuf_->pipefd[1],NULL,bufsize,SPLICE_F_MOVE);
if(rv == -1)
return -errno;
if(rv < sizeof(struct fuse_in_header))
{ {
fuse_session_exit(se);
return 0;
fprintf(stderr,"short splice from fuse device\n");
return -EIO;
} }
if(err != EINTR && err != EAGAIN)
perror("fuse: splice from device");
return rv;
}
static
void
fuse_ll_buf_process_splice(struct fuse_session *se_,
const fuse_msgbuf_t *msgbuf_)
{
int rv;
struct fuse_req *req;
struct fuse_in_header *in;
struct iovec iov = { msgbuf_->mem, msgbuf_->size };
return -err;
retry:
rv = vmsplice(msgbuf_->pipefd[0], &iov, 1, 0);
if(rv == -1)
{
rv = errno;
if(rv == EAGAIN)
goto retry;
// TODO: Need to propagate back errors to caller
return;
} }
if(res < sizeof(struct fuse_in_header))
in = (struct fuse_in_header*)msgbuf_->mem;
req = fuse_ll_alloc_req(se_->f);
if(req == NULL)
return fuse_send_enomem(se_->f,se_->ch,in->unique);
req->unique = in->unique;
req->ctx.uid = in->uid;
req->ctx.gid = in->gid;
req->ctx.pid = in->pid;
req->ch = se_->ch;
rv = ENOSYS;
if(in->opcode >= FUSE_MAXOP)
goto reply_err;
if(fuse_ll_ops[in->opcode].func == NULL)
goto reply_err;
fuse_ll_ops[in->opcode].func(req, in);
return;
reply_err:
fuse_reply_err(req, rv);
return;
}
static
void
fuse_ll_buf_process_splice_init(struct fuse_session *se_,
const fuse_msgbuf_t *msgbuf_)
{ {
fprintf(stderr, "short splice from fuse device\n");
return -EIO;
int rv;
struct fuse_req *req;
struct fuse_in_header *in;
struct iovec iov = { msgbuf_->mem, msgbuf_->size };
retry:
rv = vmsplice(msgbuf_->pipefd[0], &iov, 1, 0);
if(rv == -1)
{
rv = errno;
if(rv == EAGAIN)
goto retry;
// TODO: Need to propagate back errors to caller
return;
} }
struct iovec iov = { buf->mem, res };
res = vmsplice(llp->pipe[0], &iov, 1, 0);
in = (struct fuse_in_header*)msgbuf_->mem;
return res;
req = fuse_ll_alloc_req(se_->f);
if(req == NULL)
return fuse_send_enomem(se_->f,se_->ch,in->unique);
fallback:
res = fuse_chan_recv(se->ch, buf->mem, buf->size);
if(res <= 0)
return res;
req->unique = in->unique;
req->ctx.uid = in->uid;
req->ctx.gid = in->gid;
req->ctx.pid = in->pid;
req->ch = se_->ch;
buf->size = res;
rv = EIO;
if(in->opcode != FUSE_INIT)
goto reply_err;
if(fuse_ll_ops[in->opcode].func == NULL)
goto reply_err;
return res;
se_->process_buf = fuse_ll_buf_process_splice;
fuse_ll_ops[in->opcode].func(req, in);
return;
reply_err:
fuse_reply_err(req, rv);
return;
} }
#else #else
static static
int int
fuse_ll_receive_buf(struct fuse_session *se,
struct fuse_buf *buf)
fuse_ll_buf_receive_splice(struct fuse_session *se_,
fuse_msgbuf_t *msgbuf_)
{ {
int res;
res = fuse_chan_recv(se->ch, buf->mem, buf->size);
if(res <= 0)
return res;
return fuse_ll_buf_receive_read(se_,msgbuf_);
}
buf->size = res;
static
void
fuse_ll_buf_process_splice(struct fuse_session *se_,
const fuse_msgbuf_t *msgbuf_)
{
return fuse_ll_buf_process_read(se_,msgbuf_);
}
return res;
static
void
fuse_ll_buf_process_splice_init(struct fuse_session *se_,
const fuse_msgbuf_t *msgbuf_)
{
return fuse_ll_buf_process_read_init(se_,msgbuf_);
} }
#endif #endif
@ -2272,10 +2386,21 @@ fuse_lowlevel_new_common(struct fuse_args *args,
f->owner = getuid(); f->owner = getuid();
f->userdata = userdata; f->userdata = userdata;
if(f->splice_read)
{
se = fuse_session_new(f, se = fuse_session_new(f,
fuse_ll_receive_buf,
fuse_ll_process_buf,
fuse_ll_buf_receive_splice,
fuse_ll_buf_process_splice_init,
fuse_ll_destroy); fuse_ll_destroy);
}
else
{
se = fuse_session_new(f,
fuse_ll_buf_receive_read,
fuse_ll_buf_process_read_init,
fuse_ll_destroy);
}
if(!se) if(!se)
goto out_key_destroy; goto out_key_destroy;

123
libfuse/lib/fuse_msgbuf.cpp

@ -0,0 +1,123 @@
/*
ISC License
Copyright (c) 2022, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include "fuse_msgbuf.h"
#include <assert.h>
#include <fcntl.h>
#include <unistd.h>
#include <mutex>
#include <stack>
static std::size_t g_BUFSIZE = (1024 * 1024 * 2);
static std::mutex g_MUTEX;
static std::stack<fuse_msgbuf_t*> g_MSGBUF_STACK;
static
__attribute__((destructor))
void
msgbuf_destroy()
{
// TODO: cleanup?
}
std::size_t
msgbuf_bufsize()
{
return g_BUFSIZE;
}
void
msgbuf_bufsize(const std::size_t size_)
{
g_BUFSIZE = size_;
}
fuse_msgbuf_t*
msgbuf_alloc()
{
int rv;
fuse_msgbuf_t *msgbuf;
g_MUTEX.lock();
if(g_MSGBUF_STACK.empty())
{
g_MUTEX.unlock();
msgbuf = (fuse_msgbuf_t*)malloc(sizeof(fuse_msgbuf_t));
if(msgbuf == NULL)
return NULL;
rv = pipe(msgbuf->pipefd);
assert(rv == 0);
rv = fcntl(msgbuf->pipefd[0],F_SETPIPE_SZ,g_BUFSIZE);
assert(rv > 0);
msgbuf->mem = (char*)malloc(rv);
msgbuf->size = rv;
msgbuf->used = 0;
}
else
{
msgbuf = g_MSGBUF_STACK.top();
g_MSGBUF_STACK.pop();
g_MUTEX.unlock();
}
return msgbuf;
}
fuse_msgbuf_t*
msgbuf_alloc_memonly()
{
fuse_msgbuf_t *msgbuf;
g_MUTEX.lock();
if(g_MSGBUF_STACK.empty())
{
g_MUTEX.unlock();
msgbuf = (fuse_msgbuf_t*)malloc(sizeof(fuse_msgbuf_t));
if(msgbuf == NULL)
return NULL;
msgbuf->pipefd[0] = -1;
msgbuf->pipefd[1] = -1;
msgbuf->mem = (char*)malloc(g_BUFSIZE);
msgbuf->size = g_BUFSIZE;
msgbuf->used = 0;
}
else
{
msgbuf = g_MSGBUF_STACK.top();
g_MSGBUF_STACK.pop();
g_MUTEX.unlock();
}
return msgbuf;
}
void
msgbuf_free(fuse_msgbuf_t *msgbuf_)
{
std::lock_guard<std::mutex> lck(g_MUTEX);
g_MSGBUF_STACK.push(msgbuf_);
}

29
libfuse/lib/fuse_msgbuf.hpp

@ -0,0 +1,29 @@
/*
ISC License
Copyright (c) 2022, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include "fuse_msgbuf.h"
void msgbuf_bufsize(const uint32_t size);
std::size_t msgbuf_bufsize();
fuse_msgbuf_t* msgbuf_alloc();
fuse_msgbuf_t* msgbuf_alloc_memonly();
void msgbuf_free(fuse_msgbuf_t *msgbuf);

3
libfuse/lib/fuse_mt.c

@ -27,7 +27,8 @@ fuse_loop_mt(struct fuse *f)
return -1; return -1;
res = fuse_session_loop_mt(fuse_get_session(f), res = fuse_session_loop_mt(fuse_get_session(f),
fuse_config_num_threads(f));
fuse_config_read_thread_count(f),
fuse_config_process_thread_count(f));
fuse_stop_maintenance_thread(f); fuse_stop_maintenance_thread(f);

102
libfuse/lib/fuse_session.c

@ -26,7 +26,8 @@ struct fuse_chan
size_t bufsize; size_t bufsize;
}; };
struct fuse_session *fuse_session_new(void *data,
struct fuse_session *
fuse_session_new(void *data,
void *receive_buf, void *receive_buf,
void *process_buf, void *process_buf,
void *destroy) void *destroy)
@ -38,7 +39,7 @@ struct fuse_session *fuse_session_new(void *data,
} }
memset(se, 0, sizeof(*se)); memset(se, 0, sizeof(*se));
se->data = data;
se->f = data;
se->receive_buf = receive_buf; se->receive_buf = receive_buf;
se->process_buf = process_buf; se->process_buf = process_buf;
se->destroy = destroy; se->destroy = destroy;
@ -67,7 +68,7 @@ void fuse_session_remove_chan(struct fuse_chan *ch)
void void
fuse_session_destroy(struct fuse_session *se) fuse_session_destroy(struct fuse_session *se)
{ {
se->destroy(se->data);
se->destroy(se->f);
if(se->ch != NULL) if(se->ch != NULL)
fuse_chan_destroy(se->ch); fuse_chan_destroy(se->ch);
free(se); free(se);
@ -90,23 +91,10 @@ fuse_session_exit(struct fuse_session *se_)
se_->exited = 1; se_->exited = 1;
} }
void *fuse_session_data(struct fuse_session *se)
void*
fuse_session_data(struct fuse_session *se)
{ {
return se->data;
}
int
fuse_session_receive(struct fuse_session *se_,
struct fuse_buf *buf_)
{
return se_->receive_buf(se_,buf_,se_->ch);
}
void
fuse_session_process(struct fuse_session *se_,
const struct fuse_buf *buf_)
{
se_->process_buf(se_->data,buf_,se_->ch);
return se->f;
} }
struct fuse_chan * struct fuse_chan *
@ -152,82 +140,6 @@ struct fuse_session *fuse_chan_session(struct fuse_chan *ch)
return ch->se; return ch->se;
} }
int
fuse_chan_recv(struct fuse_chan *ch,
char *buf,
size_t size)
{
int err;
ssize_t res;
struct fuse_session *se = fuse_chan_session(ch);
assert(se != NULL);
restart:
res = read(fuse_chan_fd(ch), buf, size);
err = errno;
if(fuse_session_exited(se))
return 0;
if(res == -1)
{
/* ENOENT means the operation was interrupted, it's safe
to restart */
if (err == ENOENT)
goto restart;
if(err == ENODEV)
{
se->exited = 1;
return 0;
}
/* Errors occurring during normal operation: EINTR (read
interrupted), EAGAIN (nonblocking I/O), ENODEV (filesystem
umounted) */
if(err != EINTR && err != EAGAIN)
perror("fuse: reading device");
return -err;
}
if((size_t) res < sizeof(struct fuse_in_header))
{
fprintf(stderr, "short read on fuse device\n");
return -EIO;
}
return res;
}
int
fuse_chan_send(struct fuse_chan *ch,
const struct iovec iov[],
size_t count)
{
if(!iov)
return 0;
int err;
ssize_t res;
res = writev(fuse_chan_fd(ch), iov, count);
err = errno;
if(res == -1)
{
struct fuse_session *se = fuse_chan_session(ch);
assert(se != NULL);
/* ENOENT means the operation was interrupted */
if(!fuse_session_exited(se) && err != ENOENT)
perror("fuse: writing device");
return -err;
}
return 0;
}
void void
fuse_chan_destroy(struct fuse_chan *ch) fuse_chan_destroy(struct fuse_chan *ch)
{ {

112
libfuse/lib/thread_pool.hpp

@ -0,0 +1,112 @@
#pragma once
#include "unbounded_queue.hpp"
#include <tuple>
#include <atomic>
#include <vector>
#include <thread>
#include <memory>
#include <future>
#include <utility>
#include <functional>
#include <type_traits>
class ThreadPool
{
public:
explicit
ThreadPool(const std::size_t thread_count_ = std::thread::hardware_concurrency())
: _queues(thread_count_),
_count(thread_count_)
{
auto worker = [this](std::size_t i)
{
while(true)
{
Proc f;
for(std::size_t n = 0; n < (_count * K); ++n)
{
if(_queues[(i + n) % _count].try_pop(f))
break;
}
if(!f && !_queues[i].pop(f))
break;
f();
}
};
_threads.reserve(thread_count_);
for(std::size_t i = 0; i < thread_count_; ++i)
_threads.emplace_back(worker, i);
}
~ThreadPool()
{
for(auto& queue : _queues)
queue.unblock();
for(auto& thread : _threads)
thread.join();
}
template<typename F>
void
enqueue_work(F&& f_)
{
auto i = _index++;
for(std::size_t n = 0; n < (_count * K); ++n)
{
if(_queues[(i + n) % _count].try_push(f_))
return;
}
_queues[i % _count].push(std::move(f_));
}
template<typename F>
[[nodiscard]]
std::future<typename std::result_of<F()>::type>
enqueue_task(F&& f_)
{
using TaskReturnType = typename std::result_of<F()>::type;
using Promise = std::promise<TaskReturnType>;
auto i = _index++;
auto promise = std::make_shared<Promise>();
auto future = promise->get_future();
auto work = [=]() {
auto rv = f_();
promise->set_value(rv);
};
for(std::size_t n = 0; n < (_count * K); ++n)
{
if(_queues[(i + n) % _count].try_push(work))
return future;
}
_queues[i % _count].push(std::move(work));
return future;
}
private:
using Proc = std::function<void(void)>;
using Queue = UnboundedQueue<Proc>;
using Queues = std::vector<Queue>;
Queues _queues;
private:
std::vector<std::thread> _threads;
private:
const std::size_t _count;
std::atomic_uint _index;
static const unsigned int K = 2;
};

161
libfuse/lib/unbounded_queue.hpp

@ -0,0 +1,161 @@
#pragma once
#include <condition_variable>
#include <mutex>
#include <queue>
#include <utility>
template<typename T>
class UnboundedQueue
{
public:
explicit
UnboundedQueue(bool block_ = true)
: _block(block_)
{
}
void
push(const T& item_)
{
{
std::lock_guard<std::mutex> guard(_queue_lock);
_queue.push(item_);
}
_condition.notify_one();
}
void
push(T&& item_)
{
{
std::lock_guard<std::mutex> guard(_queue_lock);
_queue.push(std::move(item_));
}
_condition.notify_one();
}
template<typename... Args>
void
emplace(Args&&... args_)
{
{
std::lock_guard<std::mutex> guard(_queue_lock);
_queue.emplace(std::forward<Args>(args_)...);
}
_condition.notify_one();
}
bool
try_push(const T& item_)
{
{
std::unique_lock<std::mutex> lock(_queue_lock, std::try_to_lock);
if(!lock)
return false;
_queue.push(item_);
}
_condition.notify_one();
return true;
}
bool
try_push(T&& item_)
{
{
std::unique_lock<std::mutex> lock(_queue_lock, std::try_to_lock);
if(!lock)
return false;
_queue.push(std::move(item_));
}
_condition.notify_one();
return true;
}
//TODO: push multiple T at once
bool
pop(T& item_)
{
std::unique_lock<std::mutex> guard(_queue_lock);
_condition.wait(guard, [&]() { return !_queue.empty() || !_block; });
if(_queue.empty())
return false;
item_ = std::move(_queue.front());
_queue.pop();
return true;
}
bool
try_pop(T& item_)
{
std::unique_lock<std::mutex> lock(_queue_lock, std::try_to_lock);
if(!lock || _queue.empty())
return false;
item_ = std::move(_queue.front());
_queue.pop();
return true;
}
std::size_t
size() const
{
std::lock_guard<std::mutex> guard(_queue_lock);
return _queue.size();
}
bool
empty() const
{
std::lock_guard<std::mutex> guard(_queue_lock);
return _queue.empty();
}
void
block()
{
std::lock_guard<std::mutex> guard(_queue_lock);
_block = true;
}
void
unblock()
{
{
std::lock_guard<std::mutex> guard(_queue_lock);
_block = false;
}
_condition.notify_all();
}
bool
blocking() const
{
std::lock_guard<std::mutex> guard(_queue_lock);
return _block;
}
private:
mutable std::mutex _queue_lock;
private:
bool _block;
std::queue<T> _queue;
std::condition_variable _condition;
};

7
src/config.cpp

@ -107,7 +107,8 @@ Config::Config()
statfs_ignore(StatFSIgnore::ENUM::NONE), statfs_ignore(StatFSIgnore::ENUM::NONE),
symlinkify(false), symlinkify(false),
symlinkify_timeout(3600), symlinkify_timeout(3600),
threads(0),
fuse_read_thread_count(-1),
fuse_process_thread_count(-1),
version(MERGERFS_VERSION), version(MERGERFS_VERSION),
writeback_cache(false), writeback_cache(false),
xattr(XAttr::ENUM::PASSTHROUGH) xattr(XAttr::ENUM::PASSTHROUGH)
@ -173,7 +174,9 @@ Config::Config()
_map["statfs_ignore"] = &statfs_ignore; _map["statfs_ignore"] = &statfs_ignore;
_map["symlinkify"] = &symlinkify; _map["symlinkify"] = &symlinkify;
_map["symlinkify_timeout"] = &symlinkify_timeout; _map["symlinkify_timeout"] = &symlinkify_timeout;
_map["threads"] = &threads;
_map["threads"] = &fuse_read_thread_count;
_map["read-thread-count"] = &fuse_read_thread_count;
_map["process-thread-count"] = &fuse_process_thread_count;
_map["version"] = &version; _map["version"] = &version;
_map["xattr"] = &xattr; _map["xattr"] = &xattr;
} }

3
src/config.hpp

@ -138,7 +138,8 @@ public:
StatFSIgnore statfs_ignore; StatFSIgnore statfs_ignore;
ConfigBOOL symlinkify; ConfigBOOL symlinkify;
ConfigUINT64 symlinkify_timeout; ConfigUINT64 symlinkify_timeout;
ConfigINT threads;
ConfigINT fuse_read_thread_count;
ConfigINT fuse_process_thread_count;
ConfigSTR version; ConfigSTR version;
ConfigBOOL writeback_cache; ConfigBOOL writeback_cache;
XAttr xattr; XAttr xattr;

13
src/option_parser.cpp

@ -95,16 +95,11 @@ set_kv_option(const std::string &key_,
static static
void void
set_threads(Config::Write &cfg_,
set_fuse_threads(Config::Write &cfg_,
fuse_args *args_) fuse_args *args_)
{ {
int threads;
threads = l::calculate_thread_count(cfg_->threads);
cfg_->threads = threads;
set_kv_option("threads",cfg_->threads.to_string(),args_);
set_kv_option("read-thread-count",cfg_->fuse_read_thread_count.to_string(),args_);
set_kv_option("process-thread-count",cfg_->fuse_process_thread_count.to_string(),args_);
} }
static static
@ -413,6 +408,6 @@ namespace options
set_default_options(args_); set_default_options(args_);
set_fsname(cfg,args_); set_fsname(cfg,args_);
set_subtype(args_); set_subtype(args_);
set_threads(cfg,args_);
set_fuse_threads(cfg,args_);
} }
} }

14
tools/create-branches

@ -0,0 +1,14 @@
#!/bin/sh
set -x
BASEPATH="/tmp"
for x in $(seq -w 2)
do
FILEPATH="${BASEPATH}/mergerfs-${x}.img"
MOUNTPOINT="${BASEPATH}/mergerfs-${x}"
truncate -s 1G "${FILEPATH}"
mkdir -p "${MOUNTPOINT}"
mkfs.ext4 -m0 -L "mergerfs${x}" "${FILEPATH}"
sudo mount "${FILEPATH}" "${MOUNTPOINT}"
done
Loading…
Cancel
Save