From 24423b8d2a0256f5a6d509240bcaccab8d38941e Mon Sep 17 00:00:00 2001 From: Antonio SJ Musumeci Date: Tue, 29 Nov 2022 17:24:09 -0600 Subject: [PATCH] Add async message processing --- README.md | 199 +++++++--- libfuse/Makefile | 27 +- libfuse/include/fuse.h | 9 +- libfuse/include/fuse_lowlevel.h | 14 +- libfuse/include/fuse_msgbuf.h | 12 + libfuse/lib/bounded_queue.hpp | 165 ++++++++ libfuse/lib/fuse.c | 80 +--- libfuse/lib/fuse_i.h | 19 +- libfuse/lib/fuse_ll.hpp | 15 + libfuse/lib/fuse_loop_mt.c | 231 ----------- libfuse/lib/fuse_loop_mt.cpp | 266 +++++++++++++ libfuse/lib/fuse_lowlevel.c | 669 +++++++++++++++++++------------- libfuse/lib/fuse_msgbuf.cpp | 123 ++++++ libfuse/lib/fuse_msgbuf.hpp | 29 ++ libfuse/lib/fuse_mt.c | 3 +- libfuse/lib/fuse_session.c | 108 +----- libfuse/lib/thread_pool.hpp | 112 ++++++ libfuse/lib/unbounded_queue.hpp | 161 ++++++++ src/config.cpp | 7 +- src/config.hpp | 3 +- src/option_parser.cpp | 15 +- tools/create-branches | 14 + 22 files changed, 1526 insertions(+), 755 deletions(-) create mode 100644 libfuse/include/fuse_msgbuf.h create mode 100644 libfuse/lib/bounded_queue.hpp create mode 100644 libfuse/lib/fuse_ll.hpp delete mode 100644 libfuse/lib/fuse_loop_mt.c create mode 100644 libfuse/lib/fuse_loop_mt.cpp create mode 100644 libfuse/lib/fuse_msgbuf.cpp create mode 100644 libfuse/lib/fuse_msgbuf.hpp create mode 100644 libfuse/lib/thread_pool.hpp create mode 100644 libfuse/lib/unbounded_queue.hpp create mode 100755 tools/create-branches diff --git a/README.md b/README.md index b196d125..1917eeec 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ % mergerfs(1) mergerfs user manual % Antonio SJ Musumeci -% 2022-05-23 +% 2023-01-16 # NAME @@ -14,7 +14,9 @@ mergerfs -o<options> <branches> <mountpoint> # DESCRIPTION -**mergerfs** is a union filesystem geared towards simplifying storage and management of files across numerous commodity storage devices. It is similar to **mhddfs**, **unionfs**, and **aufs**. +**mergerfs** is a union filesystem geared towards simplifying storage +and management of files across numerous commodity storage devices. It +is similar to **mhddfs**, **unionfs**, and **aufs**. # FEATURES @@ -36,7 +38,10 @@ mergerfs -o<options> <branches> <mountpoint> # HOW IT WORKS -mergerfs logically merges multiple paths together. Think a union of sets. The file/s or directory/s acted on or presented through mergerfs are based on the policy chosen for that particular action. Read more about policies below. +mergerfs logically merges multiple paths together. Think a union of +sets. The file/s or directory/s acted on or presented through mergerfs +are based on the policy chosen for that particular action. Read more +about policies below. ``` A + B = C @@ -59,7 +64,12 @@ A + B = C +-- file6 ``` -mergerfs does **NOT** support the copy-on-write (CoW) or whiteout behaviors found in **aufs** and **overlayfs**. You can **not** mount a read-only filesystem and write to it. However, mergerfs will ignore read-only drives when creating new files so you can mix read-write and read-only drives. It also does **NOT** split data across drives. It is not RAID0 / striping. It is simply a union of other filesystems. +mergerfs does **NOT** support the copy-on-write (CoW) or whiteout +behaviors found in **aufs** and **overlayfs**. You can **not** mount a +read-only filesystem and write to it. However, mergerfs will ignore +read-only drives when creating new files so you can mix read-write and +read-only drives. It also does **NOT** split data across drives. It is +not RAID0 / striping. It is simply a union of other filesystems. # TERMINOLOGY @@ -75,7 +85,8 @@ mergerfs does **NOT** support the copy-on-write (CoW) or whiteout behaviors foun # BASIC SETUP -If you don't already know that you have a special use case then just start with one of the following option sets. +If you don't already know that you have a special use case then just +start with one of the following option sets. #### You need `mmap` (used by rtorrent and many sqlite3 base software) @@ -95,50 +106,142 @@ These options are the same regardless of whether you use them with the `mergerfs ### mount options -* **config**: Path to a config file. Same arguments as below in key=val / ini style format. +* **config**: Path to a config file. Same arguments as below in + key=val / ini style format. * **branches**: Colon delimited list of branches. -* **allow_other**: A libfuse option which allows users besides the one which ran mergerfs to see the filesystem. This is required for most use-cases. -* **minfreespace=SIZE**: The minimum space value used for creation policies. Can be overridden by branch specific option. Understands 'K', 'M', and 'G' to represent kilobyte, megabyte, and gigabyte respectively. (default: 4G) -* **moveonenospc=BOOL|POLICY**: When enabled if a **write** fails with **ENOSPC** (no space left on device) or **EDQUOT** (disk quota exceeded) the policy selected will run to find a new location for the file. An attempt to move the file to that branch will occur (keeping all metadata possible) and if successful the original is unlinked and the write retried. (default: false, true = mfs) -* **use_ino**: Causes mergerfs to supply file/directory inodes rather than libfuse. While not a default it is recommended it be enabled so that linked files share the same inode value. -* **inodecalc=passthrough|path-hash|devino-hash|hybrid-hash**: Selects the inode calculation algorithm. (default: hybrid-hash) -* **dropcacheonclose=BOOL**: When a file is requested to be closed call `posix_fadvise` on it first to instruct the kernel that we no longer need the data and it can drop its cache. Recommended when **cache.files=partial|full|auto-full** to limit double caching. (default: false) -* **symlinkify=BOOL**: When enabled and a file is not writable and its mtime or ctime is older than **symlinkify_timeout** files will be reported as symlinks to the original files. Please read more below before using. (default: false) -* **symlinkify_timeout=UINT**: Time to wait, in seconds, to activate the **symlinkify** behavior. (default: 3600) -* **nullrw=BOOL**: Turns reads and writes into no-ops. The request will succeed but do nothing. Useful for benchmarking mergerfs. (default: false) -* **ignorepponrename=BOOL**: Ignore path preserving on rename. Typically rename and link act differently depending on the policy of `create` (read below). Enabling this will cause rename and link to always use the non-path preserving behavior. This means files, when renamed or linked, will stay on the same drive. (default: false) -* **security_capability=BOOL**: If false return ENOATTR when xattr security.capability is queried. (default: true) -* **xattr=passthrough|noattr|nosys**: Runtime control of xattrs. Default is to passthrough xattr requests. 'noattr' will short circuit as if nothing exists. 'nosys' will respond with ENOSYS as if xattrs are not supported or disabled. (default: passthrough) -* **link_cow=BOOL**: When enabled if a regular file is opened which has a link count > 1 it will copy the file to a temporary file and rename over the original. Breaking the link and providing a basic copy-on-write function similar to cow-shell. (default: false) -* **statfs=base|full**: Controls how statfs works. 'base' means it will always use all branches in statfs calculations. 'full' is in effect path preserving and only includes drives where the path exists. (default: base) -* **statfs_ignore=none|ro|nc**: 'ro' will cause statfs calculations to ignore available space for branches mounted or tagged as 'read-only' or 'no create'. 'nc' will ignore available space for branches tagged as 'no create'. (default: none) -* **nfsopenhack=off|git|all**: A workaround for exporting mergerfs over NFS where there are issues with creating files for write while setting the mode to read-only. (default: off) -* **follow-symlinks=never|directory|regular|all**: Turns symlinks into what they point to. (default: never) -* **link-exdev=passthrough|rel-symlink|abs-base-symlink|abs-pool-symlink**: When a link fails with EXDEV optionally create a symlink to the file instead. -* **rename-exdev=passthrough|rel-symlink|abs-symlink**: When a rename fails with EXDEV optionally move the file to a special directory and symlink to it. -* **posix_acl=BOOL**: Enable POSIX ACL support (if supported by kernel and underlying filesystem). (default: false) -* **async_read=BOOL**: Perform reads asynchronously. If disabled or unavailable the kernel will ensure there is at most one pending read request per file handle and will attempt to order requests by offset. (default: true) -* **fuse_msg_size=UINT**: Set the max number of pages per FUSE message. Only available on Linux >= 4.20 and ignored otherwise. (min: 1; max: 256; default: 256) -* **threads=INT**: Number of threads to use in multithreaded mode. When set to zero it will attempt to discover and use the number of logical cores. If the lookup fails it will fall back to using 4. If the thread count is set negative it will look up the number of cores then divide by the absolute value. ie. threads=-2 on an 8 core machine will result in 8 / 2 = 4 threads. There will always be at least 1 thread. NOTE: higher number of threads increases parallelism but usually decreases throughput. (default: 0) -* **fsname=STR**: Sets the name of the filesystem as seen in **mount**, **df**, etc. Defaults to a list of the source paths concatenated together with the longest common prefix removed. -* **func.FUNC=POLICY**: Sets the specific FUSE function's policy. See below for the list of value types. Example: **func.getattr=newest** -* **category.action=POLICY**: Sets policy of all FUSE functions in the action category. (default: epall) -* **category.create=POLICY**: Sets policy of all FUSE functions in the create category. (default: epmfs) -* **category.search=POLICY**: Sets policy of all FUSE functions in the search category. (default: ff) -* **cache.open=UINT**: 'open' policy cache timeout in seconds. (default: 0) -* **cache.statfs=UINT**: 'statfs' cache timeout in seconds. (default: 0) -* **cache.attr=UINT**: File attribute cache timeout in seconds. (default: 1) -* **cache.entry=UINT**: File name lookup cache timeout in seconds. (default: 1) -* **cache.negative_entry=UINT**: Negative file name lookup cache timeout in seconds. (default: 0) -* **cache.files=libfuse|off|partial|full|auto-full**: File page caching mode (default: libfuse) -* **cache.writeback=BOOL**: Enable kernel writeback caching (default: false) -* **cache.symlinks=BOOL**: Cache symlinks (if supported by kernel) (default: false) -* **cache.readdir=BOOL**: Cache readdir (if supported by kernel) (default: false) -* **direct_io**: deprecated - Bypass page cache. Use `cache.files=off` instead. (default: false) -* **kernel_cache**: deprecated - Do not invalidate data cache on file open. Use `cache.files=full` instead. (default: false) -* **auto_cache**: deprecated - Invalidate data cache if file mtime or size change. Use `cache.files=auto-full` instead. (default: false) -* **async_read**: deprecated - Perform reads asynchronously. Use `async_read=true` instead. -* **sync_read**: deprecated - Perform reads synchronously. Use `async_read=false` instead. +* **allow_other**: A libfuse option which allows users besides the one + which ran mergerfs to see the filesystem. This is required for most + use-cases. +* **minfreespace=SIZE**: The minimum space value used for creation + policies. Can be overridden by branch specific option. Understands + 'K', 'M', and 'G' to represent kilobyte, megabyte, and gigabyte + respectively. (default: 4G) +* **moveonenospc=BOOL|POLICY**: When enabled if a **write** fails with + **ENOSPC** (no space left on device) or **EDQUOT** (disk quota + exceeded) the policy selected will run to find a new location for + the file. An attempt to move the file to that branch will occur + (keeping all metadata possible) and if successful the original is + unlinked and the write retried. (default: false, true = mfs) +* **use_ino**: Causes mergerfs to supply file/directory inodes rather + than libfuse. While not a default it is recommended it be enabled so + that linked files share the same inode value. +* **inodecalc=passthrough|path-hash|devino-hash|hybrid-hash**: Selects + the inode calculation algorithm. (default: hybrid-hash) +* **dropcacheonclose=BOOL**: When a file is requested to be closed + call `posix_fadvise` on it first to instruct the kernel that we no + longer need the data and it can drop its cache. Recommended when + **cache.files=partial|full|auto-full** to limit double + caching. (default: false) +* **symlinkify=BOOL**: When enabled and a file is not writable and its + mtime or ctime is older than **symlinkify_timeout** files will be + reported as symlinks to the original files. Please read more below + before using. (default: false) +* **symlinkify_timeout=UINT**: Time to wait, in seconds, to activate + the **symlinkify** behavior. (default: 3600) +* **nullrw=BOOL**: Turns reads and writes into no-ops. The request + will succeed but do nothing. Useful for benchmarking + mergerfs. (default: false) +* **ignorepponrename=BOOL**: Ignore path preserving on + rename. Typically rename and link act differently depending on the + policy of `create` (read below). Enabling this will cause rename and + link to always use the non-path preserving behavior. This means + files, when renamed or linked, will stay on the same + drive. (default: false) +* **security_capability=BOOL**: If false return ENOATTR when xattr + security.capability is queried. (default: true) +* **xattr=passthrough|noattr|nosys**: Runtime control of + xattrs. Default is to passthrough xattr requests. 'noattr' will + short circuit as if nothing exists. 'nosys' will respond with ENOSYS + as if xattrs are not supported or disabled. (default: passthrough) +* **link_cow=BOOL**: When enabled if a regular file is opened which + has a link count > 1 it will copy the file to a temporary file and + rename over the original. Breaking the link and providing a basic + copy-on-write function similar to cow-shell. (default: false) +* **statfs=base|full**: Controls how statfs works. 'base' means it + will always use all branches in statfs calculations. 'full' is in + effect path preserving and only includes drives where the path + exists. (default: base) +* **statfs_ignore=none|ro|nc**: 'ro' will cause statfs calculations to + ignore available space for branches mounted or tagged as 'read-only' + or 'no create'. 'nc' will ignore available space for branches tagged + as 'no create'. (default: none) +* **nfsopenhack=off|git|all**: A workaround for exporting mergerfs + over NFS where there are issues with creating files for write while + setting the mode to read-only. (default: off) +* **follow-symlinks=never|directory|regular|all**: Turns symlinks into + what they point to. (default: never) +* **link-exdev=passthrough|rel-symlink|abs-base-symlink|abs-pool-symlink**: + When a link fails with EXDEV optionally create a symlink to the file + instead. +* **rename-exdev=passthrough|rel-symlink|abs-symlink**: When a rename + fails with EXDEV optionally move the file to a special directory and + symlink to it. +* **posix_acl=BOOL**: Enable POSIX ACL support (if supported by kernel + and underlying filesystem). (default: false) +* **async_read=BOOL**: Perform reads asynchronously. If disabled or + unavailable the kernel will ensure there is at most one pending read + request per file handle and will attempt to order requests by + offset. (default: true) +* **fuse_msg_size=UINT**: Set the max number of pages per FUSE + message. Only available on Linux >= 4.20 and ignored + otherwise. (min: 1; max: 256; default: 256) +* **threads=INT**: Number of threads to use. When used alone + (`process-thread-count=-1`) it sets the number of threads reading + and processing FUSE messages. When used together it sets the number + of threads reading from FUSE. When set to zero it will attempt to + discover and use the number of logical cores. If the thread count is + set negative it will look up the number of cores then divide by the + absolute value. ie. threads=-2 on an 8 core machine will result in 8 + / 2 = 4 threads. There will always be at least 1 thread. If set to + -1 in combination with `process-thread-count` then it will try to + pick reasonable values based on CPU thread count. NOTE: higher + number of threads increases parallelism but usually decreases + throughput. (default: 0) +* **read-thread-count=INT**: Alias for `threads`. +* **process-thread-count=INT**: Enables separate thread pool to + asynchronously process FUSE requests. In this mode + `read-thread-count` refers to the number of threads reading FUSE + messages which are dispatched to process threads. -1 means disabled + otherwise acts like `read-thread-count`. (default: -1) +* **fsname=STR**: Sets the name of the filesystem as seen in + **mount**, **df**, etc. Defaults to a list of the source paths + concatenated together with the longest common prefix removed. +* **func.FUNC=POLICY**: Sets the specific FUSE function's policy. See + below for the list of value types. Example: **func.getattr=newest** +* **category.action=POLICY**: Sets policy of all FUSE functions in the + action category. (default: epall) +* **category.create=POLICY**: Sets policy of all FUSE functions in the + create category. (default: epmfs) +* **category.search=POLICY**: Sets policy of all FUSE functions in the + search category. (default: ff) +* **cache.open=UINT**: 'open' policy cache timeout in + seconds. (default: 0) +* **cache.statfs=UINT**: 'statfs' cache timeout in seconds. (default: + 0) +* **cache.attr=UINT**: File attribute cache timeout in + seconds. (default: 1) +* **cache.entry=UINT**: File name lookup cache timeout in + seconds. (default: 1) +* **cache.negative_entry=UINT**: Negative file name lookup cache + timeout in seconds. (default: 0) +* **cache.files=libfuse|off|partial|full|auto-full**: File page + caching mode (default: libfuse) +* **cache.writeback=BOOL**: Enable kernel writeback caching (default: + false) +* **cache.symlinks=BOOL**: Cache symlinks (if supported by kernel) + (default: false) +* **cache.readdir=BOOL**: Cache readdir (if supported by kernel) + (default: false) +* **direct_io**: deprecated - Bypass page cache. Use `cache.files=off` + instead. (default: false) +* **kernel_cache**: deprecated - Do not invalidate data cache on file + open. Use `cache.files=full` instead. (default: false) +* **auto_cache**: deprecated - Invalidate data cache if file mtime or + size change. Use `cache.files=auto-full` instead. (default: false) +* **async_read**: deprecated - Perform reads asynchronously. Use + `async_read=true` instead. +* **sync_read**: deprecated - Perform reads synchronously. Use + `async_read=false` instead. **NOTE:** Options are evaluated in the order listed so if the options are **func.rmdir=rand,category.action=ff** the **action** category setting will override the **rmdir** setting. diff --git a/libfuse/Makefile b/libfuse/Makefile index 2a387f00..eb20b685 100644 --- a/libfuse/Makefile +++ b/libfuse/Makefile @@ -31,13 +31,12 @@ INSTALLMAN1DIR = $(DESTDIR)$(MAN1DIR) AR ?= ar -SRC = \ +SRC_C = \ lib/buffer.c \ lib/crc32b.c \ lib/debug.c \ lib/fuse.c \ lib/fuse_dirents.c \ - lib/fuse_loop_mt.c \ lib/fuse_lowlevel.c \ lib/fuse_mt.c \ lib/fuse_node.c \ @@ -46,8 +45,13 @@ SRC = \ lib/fuse_signals.c \ lib/helper.c \ lib/mount.c -OBJS = $(SRC:lib/%.c=build/%.o) -DEPS = $(SRC:lib/%.c=build/%.d) +SRC_CPP = \ + lib/fuse_loop_mt.cpp \ + lib/fuse_msgbuf.cpp +OBJS_C = $(SRC_C:lib/%.c=build/%.o) +OBJS_CPP = $(SRC_CPP:lib/%.cpp=build/%.o) +DEPS_C = $(SRC_C:lib/%.c=build/%.d) +DEPS_CPP = $(SRC_CPP:lib/%.cpp=build/%.d) CFLAGS ?= \ $(OPT_FLAGS) CFLAGS := \ @@ -56,6 +60,12 @@ CFLAGS := \ -Wall \ -pipe \ -MMD +CXXFLAGS := \ + ${CXXFLAGS} \ + -std=c++11 \ + -Wall \ + -pipe \ + -MMD FUSERMOUNT_DIR = $(BINDIR) FUSE_FLAGS = \ -Iinclude \ @@ -80,10 +90,10 @@ build/stamp: touch $@ objects: build/config.h - $(MAKE) $(OBJS) + $(MAKE) $(OBJS_C) $(OBJS_CPP) build/libfuse.a: objects - ${AR} rcs build/libfuse.a $(OBJS) + ${AR} rcs build/libfuse.a $(OBJS_C) $(OBJS_CPP) utils: mergerfs-fusermount mount.mergerfs @@ -100,6 +110,9 @@ mount.mergerfs: build/mount.mergerfs build/%.o: lib/%.c $(CC) $(CFLAGS) $(FUSE_FLAGS) -c $< -o $@ +build/%.o: lib/%.cpp + $(CXX) $(CXXFLAGS) $(FUSE_FLAGS) -c $< -o $@ + clean: rm -rf build @@ -119,4 +132,4 @@ install: $(INSTALLUTILS) .PHONY: objects strip utils install install-utils --include $(DEPS) +-include $(DEPS_C) $(DEPS_CPP) diff --git a/libfuse/include/fuse.h b/libfuse/include/fuse.h index 29fdfe3b..32e5ca88 100644 --- a/libfuse/include/fuse.h +++ b/libfuse/include/fuse.h @@ -641,7 +641,8 @@ void fuse_destroy(struct fuse *f); */ void fuse_exit(struct fuse *f); -int fuse_config_num_threads(const struct fuse *fuse_); +int fuse_config_read_thread_count(const struct fuse *f); +int fuse_config_process_thread_count(const struct fuse *f); /** * FUSE event loop with multiple threads @@ -762,12 +763,6 @@ struct fuse *fuse_setup(int argc, char *argv[], /** This is the part of fuse_main() after the event loop */ void fuse_teardown(struct fuse *fuse, char *mountpoint); -/** Read a single command. If none are read, return NULL */ -struct fuse_cmd *fuse_read_cmd(struct fuse *f); - -/** Process a single command */ -void fuse_process_cmd(struct fuse *f, struct fuse_cmd *cmd); - /** Multi threaded event loop, which calls the custom command processor function */ int fuse_loop_mt_proc(struct fuse *f, fuse_processor_t proc, void *data); diff --git a/libfuse/include/fuse_lowlevel.h b/libfuse/include/fuse_lowlevel.h index e432f759..8785968b 100644 --- a/libfuse/include/fuse_lowlevel.h +++ b/libfuse/include/fuse_lowlevel.h @@ -19,6 +19,7 @@ * 25 */ +#include "fuse_msgbuf.h" #ifndef FUSE_USE_VERSION #define FUSE_USE_VERSION 24 #endif @@ -1478,9 +1479,12 @@ void *fuse_session_data(struct fuse_session *se); int fuse_session_receive(struct fuse_session *se, struct fuse_buf *buf); void fuse_session_process(struct fuse_session *se, - const struct fuse_buf *buf); + const void *buf, + const size_t bufsize); -int fuse_session_loop_mt(struct fuse_session *se, const int threads); +int fuse_session_loop_mt(struct fuse_session *se, + const int read_thread_count, + const int process_thread_count); /* ----------------------------------------------------------- * * Channel interface * @@ -1520,12 +1524,6 @@ void *fuse_chan_data(struct fuse_chan *ch); */ struct fuse_session *fuse_chan_session(struct fuse_chan *ch); -int fuse_chan_recv(struct fuse_chan *ch, - char *buf, - size_t size); -int fuse_chan_send(struct fuse_chan *ch, - const struct iovec iov[], - size_t count); void fuse_chan_destroy(struct fuse_chan *ch); EXTERN_C_END diff --git a/libfuse/include/fuse_msgbuf.h b/libfuse/include/fuse_msgbuf.h new file mode 100644 index 00000000..bd1f5a9a --- /dev/null +++ b/libfuse/include/fuse_msgbuf.h @@ -0,0 +1,12 @@ +#pragma once + +#include + +typedef struct fuse_msgbuf_t fuse_msgbuf_t; +struct fuse_msgbuf_t +{ + char *mem; + uint32_t size; + uint32_t used; + int pipefd[2]; +}; diff --git a/libfuse/lib/bounded_queue.hpp b/libfuse/lib/bounded_queue.hpp new file mode 100644 index 00000000..3e58da77 --- /dev/null +++ b/libfuse/lib/bounded_queue.hpp @@ -0,0 +1,165 @@ +#pragma once + +#include +#include +#include +#include + + +template +class BoundedQueue +{ +public: + explicit + BoundedQueue(std::size_t max_size_, + bool block_ = true) + : _block(block), + _max_size(max_size_) + { + if(_max_size == 0) + _max_size = 1; + } + + bool + push(const T& item_) + { + { + std::unique_lock guard(_queue_lock); + + _condition_push.wait(guard, [&]() { return _queue.size() < _max_size || !_block; }); + + if(_queue.size() == _max_size) + return false; + + _queue.push(item); + } + + _condition_pop.notify_one(); + + return true; + } + + bool + push(T&& item_) + { + { + std::unique_lock guard(_queue_lock); + + _condition_push.wait(guard, [&]() { return _queue.size() < _max_size || !_block; }); + + if(_queue.size() == _max_size) + return false; + + _queue.push(std::move(item_)); + } + _condition_pop.notify_one(); + return true; + } + + template + bool + emplace(Args&&... args_) + { + { + std::unique_lock guard(_queue_lock); + + _condition_push.wait(guard, [&]() { return _queue.size() < _max_size || !_block; }); + + if(_queue.size() == _max_size) + return false; + + _queue.emplace(std::forward(args_)...); + } + + _condition_pop.notify_one(); + + return true; + } + + bool + pop(T& item_) + { + { + std::unique_lock guard(_queue_lock); + + _condition_pop.wait(guard, [&]() { return !_queue.empty() || !_block; }); + if(_queue.empty()) + return false; + + item_ = std::move(_queue.front()); + + _queue.pop(); + } + + _condition_push.notify_one(); + + return true; + } + + std::size_t + size() const + { + std::lock_guard guard(_queue_lock); + + return _queue.size(); + } + + std::size_t + capacity() const + { + return _max_size; + } + + bool + empty() const + { + std::lock_guard guard(_queue_lock); + + return _queue.empty(); + } + + bool + full() const + { + std::lock_guard lock(_queue_lock); + + return (_queue.size() == capacity()); + } + + void + block() + { + std::lock_guard guard(_queue_lock); + _block = true; + } + + void + unblock() + { + { + std::lock_guard guard(_queue_lock); + _block = false; + } + + _condition_push.notify_all(); + _condition_pop.notify_all(); + } + + bool + blocking() const + { + std::lock_guard guard(_queue_lock); + + return _block; + } + +private: + mutable std::mutex _queue_lock; + +private: + bool _block; + std::queue _queue; + const std::size_t _max_size; + std::condition_variable _condition_push; + std::condition_variable _condition_pop; +}; diff --git a/libfuse/lib/fuse.c b/libfuse/lib/fuse.c index 1c910443..e250c1fb 100644 --- a/libfuse/lib/fuse.c +++ b/libfuse/lib/fuse.c @@ -72,7 +72,8 @@ struct fuse_config int set_uid; int set_gid; int help; - int threads; + int read_thread_count; + int process_thread_count; }; struct fuse_fs @@ -3657,14 +3658,6 @@ fuse_notify_poll(fuse_pollhandle_t *ph) return fuse_lowlevel_notify_poll(ph); } -static -void -free_cmd(struct fuse_cmd *cmd) -{ - free(cmd->buf); - free(cmd); -} - int fuse_exited(struct fuse *f) { @@ -3677,53 +3670,6 @@ fuse_get_session(struct fuse *f) return f->se; } -static -struct fuse_cmd* -fuse_alloc_cmd(size_t bufsize) -{ - struct fuse_cmd *cmd = (struct fuse_cmd *)malloc(sizeof(*cmd)); - - if(cmd == NULL) - { - fprintf(stderr,"fuse: failed to allocate cmd\n"); - return NULL; - } - - cmd->buf = (char *)malloc(bufsize); - if(cmd->buf == NULL) - { - fprintf(stderr,"fuse: failed to allocate read buffer\n"); - free(cmd); - return NULL; - } - - return cmd; -} - -struct fuse_cmd* -fuse_read_cmd(struct fuse *f) -{ - struct fuse_chan *ch = f->se->ch; - size_t bufsize = fuse_chan_bufsize(ch); - struct fuse_cmd *cmd = fuse_alloc_cmd(bufsize); - - if(cmd != NULL) - { - int res = fuse_chan_recv(ch,cmd->buf,bufsize); - if(res <= 0) - { - free_cmd(cmd); - if(res < 0 && res != -EINTR && res != -EAGAIN) - fuse_exit(f); - return NULL; - } - cmd->buflen = res; - cmd->ch = ch; - } - - return cmd; -} - void fuse_exit(struct fuse *f) { @@ -3753,13 +3699,15 @@ static const struct fuse_opt fuse_lib_opts[] = FUSE_LIB_OPT("nogc", nogc,1), FUSE_LIB_OPT("umask=", set_mode,1), FUSE_LIB_OPT("umask=%o", umask,0), - FUSE_LIB_OPT("uid=", set_uid,1), + FUSE_LIB_OPT("uid=", set_uid,1), FUSE_LIB_OPT("uid=%d", uid,0), - FUSE_LIB_OPT("gid=", set_gid,1), + FUSE_LIB_OPT("gid=", set_gid,1), FUSE_LIB_OPT("gid=%d", gid,0), - FUSE_LIB_OPT("noforget", remember,-1), - FUSE_LIB_OPT("remember=%u", remember,0), - FUSE_LIB_OPT("threads=%d", threads,0), + FUSE_LIB_OPT("noforget", remember,-1), + FUSE_LIB_OPT("remember=%u", remember,0), + FUSE_LIB_OPT("threads=%d", read_thread_count,0), + FUSE_LIB_OPT("read-thread-count=%d", read_thread_count,0), + FUSE_LIB_OPT("process-thread-count=%d", process_thread_count,-1), FUSE_LIB_OPT("use_ino", use_ino,1), FUSE_OPT_END }; @@ -4118,9 +4066,15 @@ fuse_destroy(struct fuse *f) } int -fuse_config_num_threads(const struct fuse *fuse_) +fuse_config_read_thread_count(const struct fuse *f_) +{ + return f_->conf.read_thread_count; +} + +int +fuse_config_process_thread_count(const struct fuse *f_) { - return fuse_->conf.threads; + return f_->conf.process_thread_count; } void diff --git a/libfuse/lib/fuse_i.h b/libfuse/lib/fuse_i.h index 958e3e35..af3d899f 100644 --- a/libfuse/lib/fuse_i.h +++ b/libfuse/lib/fuse_i.h @@ -10,6 +10,9 @@ #include "fuse.h" #include "fuse_lowlevel.h" +#include "fuse_msgbuf.h" + +#include "extern_c.h" struct fuse_chan; struct fuse_ll; @@ -17,16 +20,14 @@ struct fuse_ll; struct fuse_session { int (*receive_buf)(struct fuse_session *se, - struct fuse_buf *buf, - struct fuse_chan *ch); + fuse_msgbuf_t *msgbuf); - void (*process_buf)(void *data, - const struct fuse_buf *buf, - struct fuse_chan *ch); + void (*process_buf)(struct fuse_session *se, + const fuse_msgbuf_t *msgbuf); void (*destroy)(void *data); - void *data; + struct fuse_ll *f; volatile int exited; struct fuse_chan *ch; }; @@ -64,11 +65,11 @@ struct fuse_ll int no_splice_move; int no_splice_read; struct fuse_lowlevel_ops op; - int got_init; void *userdata; uid_t owner; struct fuse_conn_info conn; pthread_mutex_t lock; + int got_init; int got_destroy; pthread_key_t pipe_key; int broken_splice_nonblock; @@ -83,6 +84,8 @@ struct fuse_cmd struct fuse_chan *ch; }; +EXTERN_C_BEGIN + struct fuse *fuse_new_common(struct fuse_chan *ch, struct fuse_args *args, const struct fuse_operations *op, size_t op_size); @@ -110,3 +113,5 @@ struct fuse *fuse_setup_common(int argc, char *argv[], int *fd); int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg); + +EXTERN_C_END diff --git a/libfuse/lib/fuse_ll.hpp b/libfuse/lib/fuse_ll.hpp new file mode 100644 index 00000000..965243c0 --- /dev/null +++ b/libfuse/lib/fuse_ll.hpp @@ -0,0 +1,15 @@ +#pragma once + +#include "fuse_msgbuf.h" + +int fuse_receive_buf(struct fuse_session *se, + fuse_msgbuf_t *msgbuf); +void fuse_process_buf(void *data, + const fuse_msgbuf_t *msgbuf, + struct fuse_chan *ch); + +int fuse_receive_buf_splice(struct fuse_chan *ch, + fuse_msgbuf_t *msgbuf); +void fuse_process_buf_splice(struct fuse_chan *ch, + const fuse_msgbuf_t *msgbuf, + void *data); diff --git a/libfuse/lib/fuse_loop_mt.c b/libfuse/lib/fuse_loop_mt.c deleted file mode 100644 index 687d5bbc..00000000 --- a/libfuse/lib/fuse_loop_mt.c +++ /dev/null @@ -1,231 +0,0 @@ -/* - FUSE: Filesystem in Userspace - Copyright (C) 2001-2007 Miklos Szeredi - - This program can be distributed under the terms of the GNU LGPLv2. - See the file COPYING.LIB. -*/ - -#include "fuse_i.h" -#include "fuse_kernel.h" -#include "fuse_lowlevel.h" -#include "fuse_misc.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -/* Environment var controlling the thread stack size */ -#define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK" - -struct fuse_worker -{ - struct fuse_worker *prev; - struct fuse_worker *next; - pthread_t thread_id; - size_t bufsize; - char *buf; - struct fuse_mt *mt; -}; - -struct fuse_mt -{ - struct fuse_session *se; - struct fuse_worker main; - sem_t finish; - int exit; - int error; -}; - -static -void -list_add_worker(struct fuse_worker *w, - struct fuse_worker *next) -{ - struct fuse_worker *prev = next->prev; - w->next = next; - w->prev = prev; - prev->next = w; - next->prev = w; -} - -static void list_del_worker(struct fuse_worker *w) -{ - struct fuse_worker *prev = w->prev; - struct fuse_worker *next = w->next; - prev->next = next; - next->prev = prev; -} - -static int fuse_loop_start_thread(struct fuse_mt *mt); - -static -void* -fuse_do_work(void *data) -{ - struct fuse_worker *w = (struct fuse_worker *) data; - struct fuse_mt *mt = w->mt; - - while(!fuse_session_exited(mt->se)) - { - int res; - struct fuse_buf fbuf; - - fbuf = (struct fuse_buf){ .mem = w->buf, - .size = w->bufsize }; - - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - res = fuse_session_receive(mt->se,&fbuf); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); - if(res == -EINTR) - continue; - if(res <= 0) - { - if(res < 0) - { - mt->se->exited = 1; - mt->error = -1; - } - break; - } - - if(mt->exit) - return NULL; - - fuse_session_process(mt->se,&fbuf); - } - - sem_post(&mt->finish); - - return NULL; -} - -int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg) -{ - sigset_t oldset; - sigset_t newset; - int res; - pthread_attr_t attr; - char *stack_size; - - /* Override default stack size */ - pthread_attr_init(&attr); - stack_size = getenv(ENVNAME_THREAD_STACK); - if(stack_size && pthread_attr_setstacksize(&attr, atoi(stack_size))) - fprintf(stderr, "fuse: invalid stack size: %s\n", stack_size); - - /* Disallow signal reception in worker threads */ - sigfillset(&newset); - pthread_sigmask(SIG_BLOCK,&newset,&oldset); - res = pthread_create(thread_id, &attr, func, arg); - pthread_sigmask(SIG_SETMASK,&oldset,NULL); - pthread_attr_destroy(&attr); - if(res != 0) { - fprintf(stderr, "fuse: error creating thread: %s\n", - strerror(res)); - return -1; - } - - return 0; -} - -static int fuse_loop_start_thread(struct fuse_mt *mt) -{ - int res; - struct fuse_worker *w = malloc(sizeof(struct fuse_worker)); - if(!w) { - fprintf(stderr, "fuse: failed to allocate worker structure\n"); - return -1; - } - memset(w, 0, sizeof(struct fuse_worker)); - w->bufsize = fuse_chan_bufsize(mt->se->ch); - w->buf = calloc(w->bufsize,1); - w->mt = mt; - if(!w->buf) { - fprintf(stderr, "fuse: failed to allocate read buffer\n"); - free(w); - return -1; - } - - res = fuse_start_thread(&w->thread_id, fuse_do_work, w); - if(res == -1) { - free(w->buf); - free(w); - return -1; - } - list_add_worker(w, &mt->main); - - return 0; -} - -static void fuse_join_worker(struct fuse_worker *w) -{ - pthread_join(w->thread_id, NULL); - list_del_worker(w); - free(w->buf); - free(w); -} - -static int number_of_threads(void) -{ -#ifdef _SC_NPROCESSORS_ONLN - return sysconf(_SC_NPROCESSORS_ONLN); -#endif - - return 4; -} - -int -fuse_session_loop_mt(struct fuse_session *se_, - const int threads_) -{ - int i; - int err; - int threads; - struct fuse_mt mt; - struct fuse_worker *w; - - memset(&mt,0,sizeof(struct fuse_mt)); - mt.se = se_; - mt.error = 0; - mt.main.thread_id = pthread_self(); - mt.main.prev = mt.main.next = &mt.main; - sem_init(&mt.finish,0,0); - - threads = ((threads_ > 0) ? threads_ : number_of_threads()); - if(threads_ < 0) - threads /= -threads_; - if(threads == 0) - threads = 1; - - err = 0; - for(i = 0; (i < threads) && !err; i++) - err = fuse_loop_start_thread(&mt); - - if(!err) - { - /* sem_wait() is interruptible */ - while(!fuse_session_exited(se_)) - sem_wait(&mt.finish); - - for(w = mt.main.next; w != &mt.main; w = w->next) - pthread_cancel(w->thread_id); - mt.exit = 1; - - while(mt.main.next != &mt.main) - fuse_join_worker(mt.main.next); - - err = mt.error; - } - - sem_destroy(&mt.finish); - fuse_session_reset(se_); - - return err; -} diff --git a/libfuse/lib/fuse_loop_mt.cpp b/libfuse/lib/fuse_loop_mt.cpp new file mode 100644 index 00000000..c1c9b931 --- /dev/null +++ b/libfuse/lib/fuse_loop_mt.cpp @@ -0,0 +1,266 @@ +#include "thread_pool.hpp" + +#include "fuse_i.h" +#include "fuse_kernel.h" +#include "fuse_lowlevel.h" +#include "fuse_misc.h" + +#include "fuse_msgbuf.hpp" +#include "fuse_ll.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + + +struct fuse_worker_data_t +{ + struct fuse_session *se; + sem_t finished; + std::function msgbuf_processor; + std::function msgbuf_allocator; + std::shared_ptr tp; +}; + +class WorkerCleanup +{ +public: + WorkerCleanup(fuse_worker_data_t *wd_) + : _wd(wd_) + { + } + + ~WorkerCleanup() + { + fuse_session_exit(_wd->se); + sem_post(&_wd->finished); + } + +private: + fuse_worker_data_t *_wd; +}; + +static +bool +retriable_receive_error(const int err_) +{ + switch(err_) + { + case -EINTR: + case -EAGAIN: + case -ENOENT: + return true; + default: + return false; + } +} + +static +bool +fatal_receive_error(const int err_) +{ + return (err_ < 0); +} + +static +void* +handle_receive_error(const int rv_, + fuse_msgbuf_t *msgbuf_) +{ + msgbuf_free(msgbuf_); + + fprintf(stderr, + "mergerfs: error reading from /dev/fuse - %s (%d)\n", + strerror(-rv_), + -rv_); + + return NULL; +} + +static +void* +fuse_do_work(void *data) +{ + fuse_worker_data_t *wd = (fuse_worker_data_t*)data; + fuse_session *se = wd->se; + auto &process_msgbuf = wd->msgbuf_processor; + auto &msgbuf_allocator = wd->msgbuf_allocator; + WorkerCleanup workercleanup(wd); + + while(!fuse_session_exited(se)) + { + int rv; + fuse_msgbuf_t *msgbuf; + + msgbuf = msgbuf_allocator(); + + do + { + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL); + rv = se->receive_buf(se,msgbuf); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL); + if(rv == 0) + return NULL; + if(retriable_receive_error(rv)) + continue; + if(fatal_receive_error(rv)) + return handle_receive_error(rv,msgbuf); + } while(false); + + process_msgbuf(wd,msgbuf); + } + + return NULL; +} + +int +fuse_start_thread(pthread_t *thread_id, + void *(*func)(void *), + void *arg) +{ + int res; + sigset_t oldset; + sigset_t newset; + + sigfillset(&newset); + pthread_sigmask(SIG_BLOCK,&newset,&oldset); + res = pthread_create(thread_id,NULL,func,arg); + pthread_sigmask(SIG_SETMASK,&oldset,NULL); + + if(res != 0) + { + fprintf(stderr, + "fuse: error creating thread: %s\n", + strerror(res)); + return -1; + } + + return 0; +} + +static +int +calculate_thread_count(const int raw_thread_count_) +{ + int thread_count; + + thread_count = 4; + if(raw_thread_count_ == 0) + thread_count = std::thread::hardware_concurrency(); + else if(raw_thread_count_ < 0) + thread_count = (std::thread::hardware_concurrency() / -raw_thread_count_); + else if(raw_thread_count_ > 0) + thread_count = raw_thread_count_; + + if(thread_count <= 0) + thread_count = 1; + + return thread_count; +} + +static +void +calculate_thread_counts(int *read_thread_count_, + int *process_thread_count_) +{ + if((*read_thread_count_ == -1) && (*process_thread_count_ == -1)) + { + int nproc; + + nproc = std::thread::hardware_concurrency(); + *read_thread_count_ = 2; + *process_thread_count_ = std::max(2,(nproc - 2)); + } + else + { + *read_thread_count_ = ::calculate_thread_count(*read_thread_count_); + if(*process_thread_count_ != -1) + *process_thread_count_ = ::calculate_thread_count(*process_thread_count_); + } +} + +static +void +process_msgbuf_sync(fuse_worker_data_t *wd_, + fuse_msgbuf_t *msgbuf_) +{ + wd_->se->process_buf(wd_->se,msgbuf_); + msgbuf_free(msgbuf_); +} + +static +void +process_msgbuf_async(fuse_worker_data_t *wd_, + fuse_msgbuf_t *msgbuf_) +{ + const auto func = [=] { + process_msgbuf_sync(wd_,msgbuf_); + }; + + wd_->tp->enqueue_work(func); +} + +int +fuse_session_loop_mt(struct fuse_session *se_, + const int raw_read_thread_count_, + const int raw_process_thread_count_) +{ + int err; + int read_thread_count; + int process_thread_count; + fuse_worker_data_t wd = {0}; + std::vector threads; + + read_thread_count = raw_read_thread_count_; + process_thread_count = raw_process_thread_count_; + ::calculate_thread_counts(&read_thread_count,&process_thread_count); + + if(process_thread_count > 0) + { + wd.tp = std::make_shared(process_thread_count); + wd.msgbuf_processor = process_msgbuf_async; + } + else + { + wd.msgbuf_processor = process_msgbuf_sync; + } + + wd.msgbuf_allocator = ((se_->f->splice_read) ? msgbuf_alloc : msgbuf_alloc_memonly); + + wd.se = se_; + sem_init(&wd.finished,0,0); + + err = 0; + for(int i = 0; i < read_thread_count; i++) + { + pthread_t thread_id; + err = fuse_start_thread(&thread_id,fuse_do_work,&wd); + assert(err == 0); + threads.push_back(thread_id); + } + + if(!err) + { + /* sem_wait() is interruptible */ + while(!fuse_session_exited(se_)) + sem_wait(&wd.finished); + + for(const auto &thread_id : threads) + pthread_cancel(thread_id); + + for(const auto &thread_id : threads) + pthread_join(thread_id,NULL); + } + + sem_destroy(&wd.finished); + + return err; +} diff --git a/libfuse/lib/fuse_lowlevel.c b/libfuse/lib/fuse_lowlevel.c index 98f622a5..04b0f905 100644 --- a/libfuse/lib/fuse_lowlevel.c +++ b/libfuse/lib/fuse_lowlevel.c @@ -17,6 +17,7 @@ #include "fuse_opt.h" #include "fuse_misc.h" #include "fuse_pollhandle.h" +#include "fuse_msgbuf.h" #include #include @@ -40,7 +41,7 @@ #define OFFSET_MAX 0x7fffffffffffffffLL #define container_of(ptr, type, member) ({ \ - const typeof( ((type*)0)->member ) *__mptr = (ptr); \ + const typeof( ((type*)0)->member ) *__mptr = (ptr); \ (type *)( (char*)__mptr - offsetof(type,member) );}) static size_t pagesize; @@ -113,7 +114,7 @@ fuse_ll_alloc_req(struct fuse_ll *f) struct fuse_req *req; req = (struct fuse_req*)lfmp_calloc(&g_FMP_fuse_req); - if (req == NULL) + if(req == NULL) { fprintf(stderr, "fuse: failed to allocate request\n"); } @@ -133,11 +134,16 @@ fuse_send_msg(struct fuse_ll *f, struct iovec *iov, int count) { + int rv; struct fuse_out_header *out = iov[0].iov_base; out->len = iov_length(iov, count); - return fuse_chan_send(ch, iov, count); + rv = writev(fuse_chan_fd(ch),iov,count); + if(rv == -1) + return -errno; + + return 0; } int @@ -148,7 +154,7 @@ fuse_send_reply_iov_nofree(fuse_req_t req, { struct fuse_out_header out; - if (error <= -1000 || error > 0) + if(error <= -1000 || error > 0) { fprintf(stderr, "fuse: bad error value: %i\n",error); error = -ERANGE; @@ -187,7 +193,7 @@ send_reply(fuse_req_t req, { struct iovec iov[2]; int count = 1; - if (argsize) + if(argsize) { iov[1].iov_base = (void *) arg; iov[1].iov_len = argsize; @@ -231,8 +237,6 @@ fuse_reply_err(fuse_req_t req_, void fuse_reply_none(fuse_req_t req) { - if (req->ch) - fuse_chan_send(req->ch, NULL, 0); destroy_req(req); } @@ -256,13 +260,13 @@ fill_open(struct fuse_open_out *arg, const fuse_file_info_t *f) { arg->fh = f->fh; - if (f->direct_io) + if(f->direct_io) arg->open_flags |= FOPEN_DIRECT_IO; - if (f->keep_cache) + if(f->keep_cache) arg->open_flags |= FOPEN_KEEP_CACHE; - if (f->nonseekable) + if(f->nonseekable) arg->open_flags |= FOPEN_NONSEEKABLE; - if (f->cache_readdir) + if(f->cache_readdir) arg->open_flags |= FOPEN_CACHE_DIR; } @@ -276,7 +280,7 @@ fuse_reply_entry(fuse_req_t req, /* before ABI 7.4 e->ino == 0 was invalid, only ENOENT meant negative entry */ - if (!e->ino && req->f->conn.proto_minor < 4) + if(!e->ino && req->f->conn.proto_minor < 4) return fuse_reply_err(req, ENOENT); fill_entry(&arg, e); @@ -374,7 +378,7 @@ fuse_send_data_iov_fallback(struct fuse_ll *f, struct fuse_bufvec mem_buf = FUSE_BUFVEC_INIT(len); /* Optimize common case */ - if (buf->count == 1 && buf->idx == 0 && buf->off == 0 && + if(buf->count == 1 && buf->idx == 0 && buf->off == 0 && !(buf->buf[0].flags & FUSE_BUF_IS_FD)) { /* FIXME: also avoid memory copy if there are multiple buffers @@ -388,12 +392,12 @@ fuse_send_data_iov_fallback(struct fuse_ll *f, } res = posix_memalign(&mbuf, pagesize, len); - if (res != 0) + if(res != 0) return res; mem_buf.buf[0].mem = mbuf; res = fuse_buf_copy(&mem_buf, buf, 0); - if (res < 0) + if(res < 0) { free(mbuf); return -res; @@ -431,22 +435,22 @@ struct fuse_ll_pipe* fuse_ll_get_pipe(struct fuse_ll *f) { struct fuse_ll_pipe *llp = pthread_getspecific(f->pipe_key); - if (llp == NULL) + if(llp == NULL) { int res; llp = malloc(sizeof(struct fuse_ll_pipe)); - if (llp == NULL) + if(llp == NULL) return NULL; res = pipe(llp->pipe); - if (res == -1) + if(res == -1) { free(llp); return NULL; } - if (fcntl(llp->pipe[0], F_SETFL, O_NONBLOCK) == -1 || + if(fcntl(llp->pipe[0], F_SETFL, O_NONBLOCK) == -1 || fcntl(llp->pipe[1], F_SETFL, O_NONBLOCK) == -1) { close(llp->pipe[0]); @@ -474,7 +478,7 @@ fuse_ll_clear_pipe(struct fuse_ll *f) { struct fuse_ll_pipe *llp = pthread_getspecific(f->pipe_key); - if (llp) + if(llp) { pthread_setspecific(f->pipe_key, NULL); fuse_ll_pipe_free(llp); @@ -491,13 +495,13 @@ read_back(int fd, int res; res = read(fd, buf, len); - if (res == -1) + if(res == -1) { fprintf(stderr, "fuse: internal error: failed to read back from pipe: %s\n", strerror(errno)); return -EIO; } - if (res != len) + if(res != len) { fprintf(stderr, "fuse: internal error: short read back from pipe: %i from %zi\n", res, len); return -EIO; @@ -526,31 +530,31 @@ fuse_send_data_iov(struct fuse_ll *f, size_t headerlen; struct fuse_bufvec pipe_buf = FUSE_BUFVEC_INIT(len); - if (f->broken_splice_nonblock) + if(f->broken_splice_nonblock) goto fallback; - if (flags & FUSE_BUF_NO_SPLICE) + if(flags & FUSE_BUF_NO_SPLICE) goto fallback; total_fd_size = 0; for (idx = buf->idx; idx < buf->count; idx++) { - if (buf->buf[idx].flags & FUSE_BUF_IS_FD) + if(buf->buf[idx].flags & FUSE_BUF_IS_FD) { total_fd_size = buf->buf[idx].size; - if (idx == buf->idx) + if(idx == buf->idx) total_fd_size -= buf->off; } } - if (total_fd_size < 2 * pagesize) + if(total_fd_size < 2 * pagesize) goto fallback; - if (f->conn.proto_minor < 14 || !(f->conn.want & FUSE_CAP_SPLICE_WRITE)) + if(f->conn.proto_minor < 14 || !(f->conn.want & FUSE_CAP_SPLICE_WRITE)) goto fallback; llp = fuse_ll_get_pipe(f); - if (llp == NULL) + if(llp == NULL) goto fallback; headerlen = iov_length(iov, iov_count); @@ -563,12 +567,12 @@ fuse_send_data_iov(struct fuse_ll *f, */ pipesize = pagesize * (iov_count + buf->count + 1) + out->len; - if (llp->size < pipesize) + if(llp->size < pipesize) { - if (llp->can_grow) + if(llp->can_grow) { res = fcntl(llp->pipe[0], F_SETPIPE_SZ, pipesize); - if (res == -1) + if(res == -1) { llp->can_grow = 0; goto fallback; @@ -576,15 +580,15 @@ fuse_send_data_iov(struct fuse_ll *f, llp->size = res; } - if (llp->size < pipesize) + if(llp->size < pipesize) goto fallback; } res = vmsplice(llp->pipe[1], iov, iov_count, SPLICE_F_NONBLOCK); - if (res == -1) + if(res == -1) goto fallback; - if (res != headerlen) + if(res != headerlen) { res = -EIO; fprintf(stderr, "fuse: short vmsplice to pipe: %u/%zu\n", res, @@ -597,9 +601,9 @@ fuse_send_data_iov(struct fuse_ll *f, res = fuse_buf_copy(&pipe_buf, buf, FUSE_BUF_FORCE_SPLICE | FUSE_BUF_SPLICE_NONBLOCK); - if (res < 0) + if(res < 0) { - if (res == -EAGAIN || res == -EINVAL) + if(res == -EAGAIN || res == -EINVAL) { /* * Should only get EAGAIN on kernels with @@ -611,7 +615,7 @@ fuse_send_data_iov(struct fuse_ll *f, * EINVAL might mean that splice can't handle * this combination of input and output. */ - if (res == -EAGAIN) + if(res == -EAGAIN) f->broken_splice_nonblock = 1; pthread_setspecific(f->pipe_key, NULL); @@ -622,7 +626,7 @@ fuse_send_data_iov(struct fuse_ll *f, goto clear_pipe; } - if (res != 0 && res < len) + if(res != 0 && res < len) { struct fuse_bufvec mem_buf = FUSE_BUFVEC_INIT(len); void *mbuf; @@ -637,13 +641,13 @@ fuse_send_data_iov(struct fuse_ll *f, */ res = posix_memalign(&mbuf, pagesize, len); - if (res != 0) + if(res != 0) goto clear_pipe; mem_buf.buf[0].mem = mbuf; mem_buf.off = now_len; res = fuse_buf_copy(&mem_buf, buf, 0); - if (res > 0) + if(res > 0) { char *tmpbuf; size_t extra_len = res; @@ -653,7 +657,7 @@ fuse_send_data_iov(struct fuse_ll *f, * back to regular write. */ tmpbuf = malloc(headerlen); - if (tmpbuf == NULL) + if(tmpbuf == NULL) { free(mbuf); res = ENOMEM; @@ -661,13 +665,13 @@ fuse_send_data_iov(struct fuse_ll *f, } res = read_back(llp->pipe[0], tmpbuf, headerlen); free(tmpbuf); - if (res != 0) + if(res != 0) { free(mbuf); goto clear_pipe; } res = read_back(llp->pipe[0], mbuf, now_len); - if (res != 0) + if(res != 0) { free(mbuf); goto clear_pipe; @@ -687,19 +691,19 @@ fuse_send_data_iov(struct fuse_ll *f, out->len = headerlen + len; splice_flags = 0; - if ((flags & FUSE_BUF_SPLICE_MOVE) && + if((flags & FUSE_BUF_SPLICE_MOVE) && (f->conn.want & FUSE_CAP_SPLICE_MOVE)) splice_flags |= SPLICE_F_MOVE; res = splice(llp->pipe[0], NULL, fuse_chan_fd(ch), NULL, out->len, splice_flags); - if (res == -1) + if(res == -1) { res = -errno; perror("fuse: splice from pipe"); goto clear_pipe; } - if (res != out->len) + if(res != out->len) { res = -EIO; fprintf(stderr, "fuse: short splice from pipe: %u/%u\n", @@ -749,7 +753,7 @@ fuse_reply_data(fuse_req_t req, out.error = 0; res = fuse_send_data_iov(req->f, req->ch, iov, 1, bufv, flags); - if (res <= 0) + if(res <= 0) { destroy_req(req); return res; @@ -791,10 +795,10 @@ fuse_reply_lock(fuse_req_t req, struct fuse_lk_out arg = {0}; arg.lk.type = lock->l_type; - if (lock->l_type != F_UNLCK) + if(lock->l_type != F_UNLCK) { arg.lk.start = lock->l_start; - if (lock->l_len == 0) + if(lock->l_len == 0) arg.lk.end = OFFSET_MAX; else arg.lk.end = lock->l_start + lock->l_len - 1; @@ -824,7 +828,7 @@ fuse_ioctl_iovec_copy(const struct iovec *iov, size_t i; fiov = malloc(sizeof(fiov[0]) * count); - if (!fiov) + if(!fiov) return NULL; for (i = 0; i < count; i++) @@ -857,16 +861,16 @@ fuse_reply_ioctl_retry(fuse_req_t req, iov[count].iov_len = sizeof(arg); count++; - if (req->f->conn.proto_minor < 16) + if(req->f->conn.proto_minor < 16) { - if (in_count) + if(in_count) { iov[count].iov_base = (void *)in_iov; iov[count].iov_len = sizeof(in_iov[0]) * in_count; count++; } - if (out_count) + if(out_count) { iov[count].iov_base = (void *)out_iov; iov[count].iov_len = sizeof(out_iov[0]) * out_count; @@ -882,20 +886,20 @@ fuse_reply_ioctl_retry(fuse_req_t req, goto out; } - if (in_count) + if(in_count) { in_fiov = fuse_ioctl_iovec_copy(in_iov, in_count); - if (!in_fiov) + if(!in_fiov) goto enomem; iov[count].iov_base = (void *)in_fiov; iov[count].iov_len = sizeof(in_fiov[0]) * in_count; count++; } - if (out_count) + if(out_count) { out_fiov = fuse_ioctl_iovec_copy(out_iov, out_count); - if (!out_fiov) + if(!out_fiov) goto enomem; iov[count].iov_base = (void *)out_fiov; @@ -957,7 +961,7 @@ fuse_reply_ioctl_iov(fuse_req_t req, int res; padded_iov = malloc((count + 2) * sizeof(struct iovec)); - if (padded_iov == NULL) + if(padded_iov == NULL) return fuse_reply_err(req, ENOMEM); arg.result = result; @@ -1240,7 +1244,7 @@ convert_fuse_file_lock(struct fuse_file_lock *fl, flock->l_type = fl->type; flock->l_whence = SEEK_SET; flock->l_start = fl->start; - if (fl->end == OFFSET_MAX) + if(fl->end == OFFSET_MAX) flock->l_len = 0; else flock->l_len = fl->end - fl->start + 1; @@ -1269,7 +1273,7 @@ do_setlk_common(fuse_req_t req, fi.fh = arg->fh; fi.lock_owner = arg->owner; - if (arg->lk_flags & FUSE_LK_FLOCK) + if(arg->lk_flags & FUSE_LK_FLOCK) { int op = 0; @@ -1286,7 +1290,7 @@ do_setlk_common(fuse_req_t req, break; } - if (!sleep) + if(!sleep) op |= LOCK_NB; req->f->op.flock(req,nodeid,&fi,op); @@ -1383,7 +1387,7 @@ do_init(fuse_req_t req, outarg.minor = FUSE_KERNEL_MINOR_VERSION; outarg.max_pages = FUSE_DEFAULT_MAX_PAGES_PER_REQ; - if (arg->major < 7) + if(arg->major < 7) { fprintf(stderr, "fuse: unsupported protocol version: %u.%u\n", arg->major, arg->minor); @@ -1391,48 +1395,48 @@ do_init(fuse_req_t req, return; } - if (arg->major > 7) + if(arg->major > 7) { /* Wait for a second INIT request with a 7.X version */ send_reply_ok(req, &outarg, sizeof(outarg)); return; } - if (arg->minor >= 6) + if(arg->minor >= 6) { - if (arg->max_readahead < f->conn.max_readahead) + if(arg->max_readahead < f->conn.max_readahead) f->conn.max_readahead = arg->max_readahead; - if (arg->flags & FUSE_ASYNC_READ) + if(arg->flags & FUSE_ASYNC_READ) f->conn.capable |= FUSE_CAP_ASYNC_READ; - if (arg->flags & FUSE_POSIX_LOCKS) + if(arg->flags & FUSE_POSIX_LOCKS) f->conn.capable |= FUSE_CAP_POSIX_LOCKS; - if (arg->flags & FUSE_ATOMIC_O_TRUNC) + if(arg->flags & FUSE_ATOMIC_O_TRUNC) f->conn.capable |= FUSE_CAP_ATOMIC_O_TRUNC; - if (arg->flags & FUSE_EXPORT_SUPPORT) + if(arg->flags & FUSE_EXPORT_SUPPORT) f->conn.capable |= FUSE_CAP_EXPORT_SUPPORT; - if (arg->flags & FUSE_BIG_WRITES) + if(arg->flags & FUSE_BIG_WRITES) f->conn.capable |= FUSE_CAP_BIG_WRITES; - if (arg->flags & FUSE_DONT_MASK) + if(arg->flags & FUSE_DONT_MASK) f->conn.capable |= FUSE_CAP_DONT_MASK; - if (arg->flags & FUSE_FLOCK_LOCKS) + if(arg->flags & FUSE_FLOCK_LOCKS) f->conn.capable |= FUSE_CAP_FLOCK_LOCKS; - if (arg->flags & FUSE_POSIX_ACL) + if(arg->flags & FUSE_POSIX_ACL) f->conn.capable |= FUSE_CAP_POSIX_ACL; - if (arg->flags & FUSE_CACHE_SYMLINKS) + if(arg->flags & FUSE_CACHE_SYMLINKS) f->conn.capable |= FUSE_CAP_CACHE_SYMLINKS; - if (arg->flags & FUSE_ASYNC_DIO) + if(arg->flags & FUSE_ASYNC_DIO) f->conn.capable |= FUSE_CAP_ASYNC_DIO; - if (arg->flags & FUSE_PARALLEL_DIROPS) + if(arg->flags & FUSE_PARALLEL_DIROPS) f->conn.capable |= FUSE_CAP_PARALLEL_DIROPS; - if (arg->flags & FUSE_MAX_PAGES) + if(arg->flags & FUSE_MAX_PAGES) f->conn.capable |= FUSE_CAP_MAX_PAGES; - if (arg->flags & FUSE_WRITEBACK_CACHE) + if(arg->flags & FUSE_WRITEBACK_CACHE) f->conn.capable |= FUSE_CAP_WRITEBACK_CACHE; - if (arg->flags & FUSE_DO_READDIRPLUS) + if(arg->flags & FUSE_DO_READDIRPLUS) f->conn.capable |= FUSE_CAP_READDIR_PLUS; - if (arg->flags & FUSE_READDIRPLUS_AUTO) + if(arg->flags & FUSE_READDIRPLUS_AUTO) f->conn.capable |= FUSE_CAP_READDIR_PLUS_AUTO; - if (arg->flags & FUSE_SETXATTR_EXT) + if(arg->flags & FUSE_SETXATTR_EXT) f->conn.capable |= FUSE_CAP_SETXATTR_EXT; } else @@ -1441,31 +1445,31 @@ do_init(fuse_req_t req, f->conn.max_readahead = 0; } - if (req->f->conn.proto_minor >= 14) + if(req->f->conn.proto_minor >= 14) { #ifdef HAVE_SPLICE #ifdef HAVE_VMSPLICE f->conn.capable |= FUSE_CAP_SPLICE_WRITE | FUSE_CAP_SPLICE_MOVE; - if (f->splice_write) + if(f->splice_write) f->conn.want |= FUSE_CAP_SPLICE_WRITE; - if (f->splice_move) + if(f->splice_move) f->conn.want |= FUSE_CAP_SPLICE_MOVE; #endif f->conn.capable |= FUSE_CAP_SPLICE_READ; - if (f->splice_read) + if(f->splice_read) f->conn.want |= FUSE_CAP_SPLICE_READ; #endif } - if (req->f->conn.proto_minor >= 18) + if(req->f->conn.proto_minor >= 18) f->conn.capable |= FUSE_CAP_IOCTL_DIR; - if (f->op.getlk && f->op.setlk && !f->no_remote_posix_lock) + if(f->op.getlk && f->op.setlk && !f->no_remote_posix_lock) f->conn.want |= FUSE_CAP_POSIX_LOCKS; - if (f->op.flock && !f->no_remote_flock) + if(f->op.flock && !f->no_remote_flock) f->conn.want |= FUSE_CAP_FLOCK_LOCKS; - if (bufsize < FUSE_MIN_READ_BUFFER) + if(bufsize < FUSE_MIN_READ_BUFFER) { fprintf(stderr, "fuse: warning: buffer size too small: %zu\n", bufsize); @@ -1473,66 +1477,66 @@ do_init(fuse_req_t req, } bufsize -= 4096; - if (bufsize < f->conn.max_write) + if(bufsize < f->conn.max_write) f->conn.max_write = bufsize; f->got_init = 1; - if (f->op.init) + if(f->op.init) f->op.init(f->userdata, &f->conn); - if (f->no_splice_read) + if(f->no_splice_read) f->conn.want &= ~FUSE_CAP_SPLICE_READ; - if (f->no_splice_write) + if(f->no_splice_write) f->conn.want &= ~FUSE_CAP_SPLICE_WRITE; - if (f->no_splice_move) + if(f->no_splice_move) f->conn.want &= ~FUSE_CAP_SPLICE_MOVE; - if ((arg->flags & FUSE_MAX_PAGES) && (f->conn.want & FUSE_CAP_MAX_PAGES)) + if((arg->flags & FUSE_MAX_PAGES) && (f->conn.want & FUSE_CAP_MAX_PAGES)) { outarg.flags |= FUSE_MAX_PAGES; outarg.max_pages = f->conn.max_pages; } - if (f->conn.want & FUSE_CAP_ASYNC_READ) + if(f->conn.want & FUSE_CAP_ASYNC_READ) outarg.flags |= FUSE_ASYNC_READ; - if (f->conn.want & FUSE_CAP_POSIX_LOCKS) + if(f->conn.want & FUSE_CAP_POSIX_LOCKS) outarg.flags |= FUSE_POSIX_LOCKS; - if (f->conn.want & FUSE_CAP_ATOMIC_O_TRUNC) + if(f->conn.want & FUSE_CAP_ATOMIC_O_TRUNC) outarg.flags |= FUSE_ATOMIC_O_TRUNC; - if (f->conn.want & FUSE_CAP_EXPORT_SUPPORT) + if(f->conn.want & FUSE_CAP_EXPORT_SUPPORT) outarg.flags |= FUSE_EXPORT_SUPPORT; - if (f->conn.want & FUSE_CAP_BIG_WRITES) + if(f->conn.want & FUSE_CAP_BIG_WRITES) outarg.flags |= FUSE_BIG_WRITES; - if (f->conn.want & FUSE_CAP_DONT_MASK) + if(f->conn.want & FUSE_CAP_DONT_MASK) outarg.flags |= FUSE_DONT_MASK; - if (f->conn.want & FUSE_CAP_FLOCK_LOCKS) + if(f->conn.want & FUSE_CAP_FLOCK_LOCKS) outarg.flags |= FUSE_FLOCK_LOCKS; - if (f->conn.want & FUSE_CAP_POSIX_ACL) + if(f->conn.want & FUSE_CAP_POSIX_ACL) outarg.flags |= FUSE_POSIX_ACL; - if (f->conn.want & FUSE_CAP_CACHE_SYMLINKS) + if(f->conn.want & FUSE_CAP_CACHE_SYMLINKS) outarg.flags |= FUSE_CACHE_SYMLINKS; - if (f->conn.want & FUSE_CAP_ASYNC_DIO) + if(f->conn.want & FUSE_CAP_ASYNC_DIO) outarg.flags |= FUSE_ASYNC_DIO; - if (f->conn.want & FUSE_CAP_PARALLEL_DIROPS) + if(f->conn.want & FUSE_CAP_PARALLEL_DIROPS) outarg.flags |= FUSE_PARALLEL_DIROPS; - if (f->conn.want & FUSE_CAP_WRITEBACK_CACHE) + if(f->conn.want & FUSE_CAP_WRITEBACK_CACHE) outarg.flags |= FUSE_WRITEBACK_CACHE; - if (f->conn.want & FUSE_CAP_READDIR_PLUS) + if(f->conn.want & FUSE_CAP_READDIR_PLUS) outarg.flags |= FUSE_DO_READDIRPLUS; - if (f->conn.want & FUSE_CAP_READDIR_PLUS_AUTO) + if(f->conn.want & FUSE_CAP_READDIR_PLUS_AUTO) outarg.flags |= FUSE_READDIRPLUS_AUTO; - if (f->conn.want & FUSE_CAP_SETXATTR_EXT) + if(f->conn.want & FUSE_CAP_SETXATTR_EXT) outarg.flags |= FUSE_SETXATTR_EXT; outarg.max_readahead = f->conn.max_readahead; outarg.max_write = f->conn.max_write; - if (f->conn.proto_minor >= 13) + if(f->conn.proto_minor >= 13) { - if (f->conn.max_background >= (1 << 16)) + if(f->conn.max_background >= (1 << 16)) f->conn.max_background = (1 << 16) - 1; - if (f->conn.congestion_threshold > f->conn.max_background) + if(f->conn.congestion_threshold > f->conn.max_background) f->conn.congestion_threshold = f->conn.max_background; - if (!f->conn.congestion_threshold) + if(!f->conn.congestion_threshold) { f->conn.congestion_threshold = f->conn.max_background * 3 / 4; } @@ -1683,11 +1687,11 @@ fuse_lowlevel_notify_inval_inode(struct fuse_chan *ch, struct fuse_ll *f; struct iovec iov[2]; - if (!ch) + if(!ch) return -EINVAL; - f = (struct fuse_ll *)fuse_session_data(fuse_chan_session(ch)); - if (!f) + f = (struct fuse_ll*)fuse_session_data(fuse_chan_session(ch)); + if(!f) return -ENODEV; outarg.ino = ino; @@ -1702,7 +1706,7 @@ fuse_lowlevel_notify_inval_inode(struct fuse_chan *ch, int fuse_lowlevel_notify_inval_entry(struct fuse_chan *ch, - uint64_t parent, + uint64_t parent, const char *name, size_t namelen) { @@ -1710,11 +1714,11 @@ fuse_lowlevel_notify_inval_entry(struct fuse_chan *ch, struct fuse_ll *f; struct iovec iov[3]; - if (!ch) + if(!ch) return -EINVAL; - f = (struct fuse_ll *)fuse_session_data(fuse_chan_session(ch)); - if (!f) + f = (struct fuse_ll*)fuse_session_data(fuse_chan_session(ch)); + if(!f) return -ENODEV; outarg.parent = parent; @@ -1731,8 +1735,8 @@ fuse_lowlevel_notify_inval_entry(struct fuse_chan *ch, int fuse_lowlevel_notify_delete(struct fuse_chan *ch, - uint64_t parent, - uint64_t child, + uint64_t parent, + uint64_t child, const char *name, size_t namelen) { @@ -1740,14 +1744,14 @@ fuse_lowlevel_notify_delete(struct fuse_chan *ch, struct fuse_ll *f; struct iovec iov[3]; - if (!ch) + if(!ch) return -EINVAL; - f = (struct fuse_ll *)fuse_session_data(fuse_chan_session(ch)); - if (!f) + f = (struct fuse_ll*)fuse_session_data(fuse_chan_session(ch)); + if(!f) return -ENODEV; - if (f->conn.proto_minor < 18) + if(f->conn.proto_minor < 18) return -ENOSYS; outarg.parent = parent; @@ -1765,7 +1769,7 @@ fuse_lowlevel_notify_delete(struct fuse_chan *ch, int fuse_lowlevel_notify_store(struct fuse_chan *ch, - uint64_t ino, + uint64_t ino, off_t offset, struct fuse_bufvec *bufv, enum fuse_buf_copy_flags flags) @@ -1777,14 +1781,14 @@ fuse_lowlevel_notify_store(struct fuse_chan *ch, size_t size = fuse_buf_size(bufv); int res; - if (!ch) + if(!ch) return -EINVAL; - f = (struct fuse_ll *)fuse_session_data(fuse_chan_session(ch)); - if (!f) + f = (struct fuse_ll*)fuse_session_data(fuse_chan_session(ch)); + if(!f) return -ENODEV; - if (f->conn.proto_minor < 15) + if(f->conn.proto_minor < 15) return -ENOSYS; out.unique = 0; @@ -1801,7 +1805,7 @@ fuse_lowlevel_notify_store(struct fuse_chan *ch, iov[1].iov_len = sizeof(outarg); res = fuse_send_data_iov(f, ch, iov, 2, bufv, flags); - if (res > 0) + if(res > 0) res = -res; return res; @@ -1841,7 +1845,7 @@ fuse_lowlevel_notify_retrieve(struct fuse_chan *ch, struct fuse_retrieve_req *rreq; int err; - if (!ch) + if(!ch) return -EINVAL; f = (struct fuse_ll*)fuse_session_data(fuse_chan_session(ch)); @@ -1947,70 +1951,6 @@ static struct { #define FUSE_MAXOP (sizeof(fuse_ll_ops) / sizeof(fuse_ll_ops[0])) -static -void -fuse_ll_process_buf(void *data, - const struct fuse_buf *buf, - struct fuse_chan *ch) -{ - struct fuse_ll *f = (struct fuse_ll*)data; - struct fuse_in_header *in; - struct fuse_req *req; - int err; - - in = buf->mem; - - req = fuse_ll_alloc_req(f); - if(req == NULL) - { - struct fuse_out_header out = { - .unique = in->unique, - .error = -ENOMEM, - }; - struct iovec iov = { - .iov_base = &out, - .iov_len = sizeof(struct fuse_out_header), - }; - - fuse_send_msg(f, ch, &iov, 1); - return; - } - - req->unique = in->unique; - req->ctx.uid = in->uid; - req->ctx.gid = in->gid; - req->ctx.pid = in->pid; - req->ch = ch; - - err = EIO; - if(!f->got_init) - { - enum fuse_opcode expected; - - expected = FUSE_INIT; - if (in->opcode != expected) - goto reply_err; - } - else if(in->opcode == FUSE_INIT) - { - goto reply_err; - } - - err = ENOSYS; - if(in->opcode >= FUSE_MAXOP) - goto reply_err; - if(fuse_ll_ops[in->opcode].func == NULL) - goto reply_err; - - fuse_ll_ops[in->opcode].func(req, in); - - return; - - reply_err: - fuse_reply_err(req, err); - return; -} - enum { KEY_HELP, KEY_VERSION, @@ -2103,17 +2043,17 @@ static void fuse_ll_destroy(void *data) { - struct fuse_ll *f = (struct fuse_ll *) data; + struct fuse_ll *f = (struct fuse_ll *)data; struct fuse_ll_pipe *llp; - if (f->got_init && !f->got_destroy) + if(f->got_init && !f->got_destroy) { - if (f->op.destroy) + if(f->op.destroy) f->op.destroy(f->userdata); } llp = pthread_getspecific(f->pipe_key); - if (llp != NULL) + if(llp != NULL) fuse_ll_pipe_free(llp); pthread_key_delete(f->pipe_key); pthread_mutex_destroy(&f->lock); @@ -2130,96 +2070,270 @@ fuse_ll_pipe_destructor(void *data) fuse_ll_pipe_free(llp); } -#ifdef HAVE_SPLICE +static +void +fuse_send_errno(struct fuse_ll *f_, + struct fuse_chan *ch_, + const int errno_, + const uint64_t unique_id_) +{ + struct fuse_out_header out = {0}; + struct iovec iov = {0}; + + out.unique = unique_id_; + out.error = -errno_; + iov.iov_base = &out; + iov.iov_len = sizeof(struct fuse_out_header); + + fuse_send_msg(f_,ch_,&iov,1); +} + +static +void +fuse_send_enomem(struct fuse_ll *f_, + struct fuse_chan *ch_, + const uint64_t unique_id_) +{ + fuse_send_errno(f_,ch_,ENOMEM,unique_id_); +} + static int -fuse_ll_receive_buf(struct fuse_session *se, - struct fuse_buf *buf) +fuse_ll_buf_receive_read(struct fuse_session *se_, + fuse_msgbuf_t *msgbuf_) +{ + int rv; + + rv = read(fuse_chan_fd(se_->ch),msgbuf_->mem,msgbuf_->size); + if(rv == -1) + return -errno; + + if(rv < sizeof(struct fuse_in_header)) + { + fprintf(stderr, "short splice from fuse device\n"); + return -EIO; + } + + return rv; +} + +static +void +fuse_ll_buf_process_read(struct fuse_session *se_, + const fuse_msgbuf_t *msgbuf_) { - struct fuse_ll *f = fuse_session_data(se); - size_t bufsize = buf->size; - struct fuse_ll_pipe *llp; int err; - int res; + struct fuse_req *req; + struct fuse_in_header *in; - if(f->conn.proto_minor < 14 || !(f->conn.want & FUSE_CAP_SPLICE_READ)) - goto fallback; + in = (struct fuse_in_header*)msgbuf_->mem; - llp = fuse_ll_get_pipe(f); - if (llp == NULL) - goto fallback; + req = fuse_ll_alloc_req(se_->f); + if(req == NULL) + return fuse_send_enomem(se_->f,se_->ch,in->unique); + + req->unique = in->unique; + req->ctx.uid = in->uid; + req->ctx.gid = in->gid; + req->ctx.pid = in->pid; + req->ch = se_->ch; + + err = ENOSYS; + if(in->opcode >= FUSE_MAXOP) + goto reply_err; + if(fuse_ll_ops[in->opcode].func == NULL) + goto reply_err; + + fuse_ll_ops[in->opcode].func(req, in); + + return; + + reply_err: + fuse_reply_err(req, err); + return; +} + +static +void +fuse_ll_buf_process_read_init(struct fuse_session *se_, + const fuse_msgbuf_t *msgbuf_) +{ + int err; + struct fuse_req *req; + struct fuse_in_header *in; + + in = (struct fuse_in_header*)msgbuf_->mem; - if(llp->size < bufsize) + req = fuse_ll_alloc_req(se_->f); + if(req == NULL) + return fuse_send_enomem(se_->f,se_->ch,in->unique); + + req->unique = in->unique; + req->ctx.uid = in->uid; + req->ctx.gid = in->gid; + req->ctx.pid = in->pid; + req->ch = se_->ch; + + err = EIO; + if(in->opcode != FUSE_INIT) + goto reply_err; + if(fuse_ll_ops[in->opcode].func == NULL) + goto reply_err; + + se_->process_buf = fuse_ll_buf_process_read; + + fuse_ll_ops[in->opcode].func(req, in); + + return; + + reply_err: + fuse_reply_err(req, err); + return; +} + + +#if defined(HAVE_SPLICE) && defined(HAVE_VMSPLICE) +static +int +fuse_ll_buf_receive_splice(struct fuse_session *se_, + fuse_msgbuf_t *msgbuf_) +{ + int rv; + size_t bufsize = msgbuf_->size; + + rv = splice(fuse_chan_fd(se_->ch),NULL,msgbuf_->pipefd[1],NULL,bufsize,SPLICE_F_MOVE); + if(rv == -1) + return -errno; + + if(rv < sizeof(struct fuse_in_header)) { - if(llp->can_grow) - { - res = fcntl(llp->pipe[0], F_SETPIPE_SZ, bufsize); - if(res == -1) - { - llp->can_grow = 0; - goto fallback; - } - llp->size = res; - } - if(llp->size < bufsize) - goto fallback; + fprintf(stderr,"short splice from fuse device\n"); + return -EIO; } - res = splice(fuse_chan_fd(se->ch), NULL, llp->pipe[1], NULL, bufsize, SPLICE_F_MOVE); - err = errno; + return rv; +} - if(fuse_session_exited(se)) - return 0; +static +void +fuse_ll_buf_process_splice(struct fuse_session *se_, + const fuse_msgbuf_t *msgbuf_) +{ + int rv; + struct fuse_req *req; + struct fuse_in_header *in; + struct iovec iov = { msgbuf_->mem, msgbuf_->size }; - if(res == -1) + retry: + rv = vmsplice(msgbuf_->pipefd[0], &iov, 1, 0); + if(rv == -1) { - if(err == ENODEV) - { - fuse_session_exit(se); - return 0; - } + rv = errno; + if(rv == EAGAIN) + goto retry; + // TODO: Need to propagate back errors to caller + return; + } - if(err != EINTR && err != EAGAIN) - perror("fuse: splice from device"); + in = (struct fuse_in_header*)msgbuf_->mem; - return -err; - } + req = fuse_ll_alloc_req(se_->f); + if(req == NULL) + return fuse_send_enomem(se_->f,se_->ch,in->unique); - if(res < sizeof(struct fuse_in_header)) + req->unique = in->unique; + req->ctx.uid = in->uid; + req->ctx.gid = in->gid; + req->ctx.pid = in->pid; + req->ch = se_->ch; + + rv = ENOSYS; + if(in->opcode >= FUSE_MAXOP) + goto reply_err; + if(fuse_ll_ops[in->opcode].func == NULL) + goto reply_err; + + fuse_ll_ops[in->opcode].func(req, in); + + return; + + reply_err: + fuse_reply_err(req, rv); + return; +} + +static +void +fuse_ll_buf_process_splice_init(struct fuse_session *se_, + const fuse_msgbuf_t *msgbuf_) +{ + int rv; + struct fuse_req *req; + struct fuse_in_header *in; + struct iovec iov = { msgbuf_->mem, msgbuf_->size }; + + retry: + rv = vmsplice(msgbuf_->pipefd[0], &iov, 1, 0); + if(rv == -1) { - fprintf(stderr, "short splice from fuse device\n"); - return -EIO; + rv = errno; + if(rv == EAGAIN) + goto retry; + // TODO: Need to propagate back errors to caller + return; } - struct iovec iov = { buf->mem, res }; - res = vmsplice(llp->pipe[0], &iov, 1, 0); + in = (struct fuse_in_header*)msgbuf_->mem; - return res; + req = fuse_ll_alloc_req(se_->f); + if(req == NULL) + return fuse_send_enomem(se_->f,se_->ch,in->unique); - fallback: - res = fuse_chan_recv(se->ch, buf->mem, buf->size); - if(res <= 0) - return res; + req->unique = in->unique; + req->ctx.uid = in->uid; + req->ctx.gid = in->gid; + req->ctx.pid = in->pid; + req->ch = se_->ch; + + rv = EIO; + if(in->opcode != FUSE_INIT) + goto reply_err; + if(fuse_ll_ops[in->opcode].func == NULL) + goto reply_err; - buf->size = res; + se_->process_buf = fuse_ll_buf_process_splice; - return res; + fuse_ll_ops[in->opcode].func(req, in); + + return; + + reply_err: + fuse_reply_err(req, rv); + return; } #else static int -fuse_ll_receive_buf(struct fuse_session *se, - struct fuse_buf *buf) +fuse_ll_buf_receive_splice(struct fuse_session *se_, + fuse_msgbuf_t *msgbuf_) { - int res; - - res = fuse_chan_recv(se->ch, buf->mem, buf->size); - if(res <= 0) - return res; + return fuse_ll_buf_receive_read(se_,msgbuf_); +} - buf->size = res; +static +void +fuse_ll_buf_process_splice(struct fuse_session *se_, + const fuse_msgbuf_t *msgbuf_) +{ + return fuse_ll_buf_process_read(se_,msgbuf_); +} - return res; +static +void +fuse_ll_buf_process_splice_init(struct fuse_session *se_, + const fuse_msgbuf_t *msgbuf_) +{ + return fuse_ll_buf_process_read_init(se_,msgbuf_); } #endif @@ -2238,14 +2352,14 @@ fuse_lowlevel_new_common(struct fuse_args *args, struct fuse_ll *f; struct fuse_session *se; - if (sizeof(struct fuse_lowlevel_ops) < op_size) + if(sizeof(struct fuse_lowlevel_ops) < op_size) { fprintf(stderr, "fuse: warning: library too old, some operations may not work\n"); op_size = sizeof(struct fuse_lowlevel_ops); } f = (struct fuse_ll *) calloc(1, sizeof(struct fuse_ll)); - if (f == NULL) + if(f == NULL) { fprintf(stderr, "fuse: failed to allocate fuse object\n"); goto out; @@ -2258,25 +2372,36 @@ fuse_lowlevel_new_common(struct fuse_args *args, fuse_mutex_init(&f->lock); err = pthread_key_create(&f->pipe_key, fuse_ll_pipe_destructor); - if (err) + if(err) { fprintf(stderr, "fuse: failed to create thread specific key: %s\n", strerror(err)); goto out_free; } - if (fuse_opt_parse(args, f, fuse_ll_opts, fuse_ll_opt_proc) == -1) + if(fuse_opt_parse(args, f, fuse_ll_opts, fuse_ll_opt_proc) == -1) goto out_key_destroy; memcpy(&f->op, op, op_size); f->owner = getuid(); f->userdata = userdata; - se = fuse_session_new(f, - fuse_ll_receive_buf, - fuse_ll_process_buf, - fuse_ll_destroy); - if (!se) + if(f->splice_read) + { + se = fuse_session_new(f, + fuse_ll_buf_receive_splice, + fuse_ll_buf_process_splice_init, + fuse_ll_destroy); + } + else + { + se = fuse_session_new(f, + fuse_ll_buf_receive_read, + fuse_ll_buf_process_read_init, + fuse_ll_destroy); + } + + if(!se) goto out_key_destroy; return se; diff --git a/libfuse/lib/fuse_msgbuf.cpp b/libfuse/lib/fuse_msgbuf.cpp new file mode 100644 index 00000000..0c29bc94 --- /dev/null +++ b/libfuse/lib/fuse_msgbuf.cpp @@ -0,0 +1,123 @@ +/* + ISC License + + Copyright (c) 2022, Antonio SJ Musumeci + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#include "fuse_msgbuf.h" + +#include +#include +#include + +#include +#include + + +static std::size_t g_BUFSIZE = (1024 * 1024 * 2); + +static std::mutex g_MUTEX; +static std::stack g_MSGBUF_STACK; + +static +__attribute__((destructor)) +void +msgbuf_destroy() +{ + // TODO: cleanup? +} + +std::size_t +msgbuf_bufsize() +{ + return g_BUFSIZE; +} + +void +msgbuf_bufsize(const std::size_t size_) +{ + g_BUFSIZE = size_; +} + +fuse_msgbuf_t* +msgbuf_alloc() +{ + int rv; + fuse_msgbuf_t *msgbuf; + + g_MUTEX.lock(); + if(g_MSGBUF_STACK.empty()) + { + g_MUTEX.unlock(); + + msgbuf = (fuse_msgbuf_t*)malloc(sizeof(fuse_msgbuf_t)); + if(msgbuf == NULL) + return NULL; + + rv = pipe(msgbuf->pipefd); + assert(rv == 0); + rv = fcntl(msgbuf->pipefd[0],F_SETPIPE_SZ,g_BUFSIZE); + assert(rv > 0); + msgbuf->mem = (char*)malloc(rv); + msgbuf->size = rv; + msgbuf->used = 0; + } + else + { + msgbuf = g_MSGBUF_STACK.top(); + g_MSGBUF_STACK.pop(); + g_MUTEX.unlock(); + } + + return msgbuf; +} + +fuse_msgbuf_t* +msgbuf_alloc_memonly() +{ + fuse_msgbuf_t *msgbuf; + + g_MUTEX.lock(); + if(g_MSGBUF_STACK.empty()) + { + g_MUTEX.unlock(); + + msgbuf = (fuse_msgbuf_t*)malloc(sizeof(fuse_msgbuf_t)); + if(msgbuf == NULL) + return NULL; + + msgbuf->pipefd[0] = -1; + msgbuf->pipefd[1] = -1; + msgbuf->mem = (char*)malloc(g_BUFSIZE); + msgbuf->size = g_BUFSIZE; + msgbuf->used = 0; + } + else + { + msgbuf = g_MSGBUF_STACK.top(); + g_MSGBUF_STACK.pop(); + g_MUTEX.unlock(); + } + + return msgbuf; +} + +void +msgbuf_free(fuse_msgbuf_t *msgbuf_) +{ + std::lock_guard lck(g_MUTEX); + + g_MSGBUF_STACK.push(msgbuf_); +} diff --git a/libfuse/lib/fuse_msgbuf.hpp b/libfuse/lib/fuse_msgbuf.hpp new file mode 100644 index 00000000..2579131d --- /dev/null +++ b/libfuse/lib/fuse_msgbuf.hpp @@ -0,0 +1,29 @@ +/* + ISC License + + Copyright (c) 2022, Antonio SJ Musumeci + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#pragma once + +#include "fuse_msgbuf.h" + +void msgbuf_bufsize(const uint32_t size); +std::size_t msgbuf_bufsize(); + +fuse_msgbuf_t* msgbuf_alloc(); +fuse_msgbuf_t* msgbuf_alloc_memonly(); + +void msgbuf_free(fuse_msgbuf_t *msgbuf); diff --git a/libfuse/lib/fuse_mt.c b/libfuse/lib/fuse_mt.c index aee80b60..1752bef5 100644 --- a/libfuse/lib/fuse_mt.c +++ b/libfuse/lib/fuse_mt.c @@ -27,7 +27,8 @@ fuse_loop_mt(struct fuse *f) return -1; res = fuse_session_loop_mt(fuse_get_session(f), - fuse_config_num_threads(f)); + fuse_config_read_thread_count(f), + fuse_config_process_thread_count(f)); fuse_stop_maintenance_thread(f); diff --git a/libfuse/lib/fuse_session.c b/libfuse/lib/fuse_session.c index cb3587de..6a7cc0cf 100644 --- a/libfuse/lib/fuse_session.c +++ b/libfuse/lib/fuse_session.c @@ -26,10 +26,11 @@ struct fuse_chan size_t bufsize; }; -struct fuse_session *fuse_session_new(void *data, - void *receive_buf, - void *process_buf, - void *destroy) +struct fuse_session * +fuse_session_new(void *data, + void *receive_buf, + void *process_buf, + void *destroy) { struct fuse_session *se = (struct fuse_session *) malloc(sizeof(*se)); if (se == NULL) { @@ -38,7 +39,7 @@ struct fuse_session *fuse_session_new(void *data, } memset(se, 0, sizeof(*se)); - se->data = data; + se->f = data; se->receive_buf = receive_buf; se->process_buf = process_buf; se->destroy = destroy; @@ -67,7 +68,7 @@ void fuse_session_remove_chan(struct fuse_chan *ch) void fuse_session_destroy(struct fuse_session *se) { - se->destroy(se->data); + se->destroy(se->f); if(se->ch != NULL) fuse_chan_destroy(se->ch); free(se); @@ -90,23 +91,10 @@ fuse_session_exit(struct fuse_session *se_) se_->exited = 1; } -void *fuse_session_data(struct fuse_session *se) +void* +fuse_session_data(struct fuse_session *se) { - return se->data; -} - -int -fuse_session_receive(struct fuse_session *se_, - struct fuse_buf *buf_) -{ - return se_->receive_buf(se_,buf_,se_->ch); -} - -void -fuse_session_process(struct fuse_session *se_, - const struct fuse_buf *buf_) -{ - se_->process_buf(se_->data,buf_,se_->ch); + return se->f; } struct fuse_chan * @@ -152,82 +140,6 @@ struct fuse_session *fuse_chan_session(struct fuse_chan *ch) return ch->se; } -int -fuse_chan_recv(struct fuse_chan *ch, - char *buf, - size_t size) -{ - int err; - ssize_t res; - struct fuse_session *se = fuse_chan_session(ch); - assert(se != NULL); - - restart: - res = read(fuse_chan_fd(ch), buf, size); - err = errno; - - if(fuse_session_exited(se)) - return 0; - - if(res == -1) - { - /* ENOENT means the operation was interrupted, it's safe - to restart */ - if (err == ENOENT) - goto restart; - - if(err == ENODEV) - { - se->exited = 1; - return 0; - } - - /* Errors occurring during normal operation: EINTR (read - interrupted), EAGAIN (nonblocking I/O), ENODEV (filesystem - umounted) */ - if(err != EINTR && err != EAGAIN) - perror("fuse: reading device"); - return -err; - } - - if((size_t) res < sizeof(struct fuse_in_header)) - { - fprintf(stderr, "short read on fuse device\n"); - return -EIO; - } - - return res; -} - -int -fuse_chan_send(struct fuse_chan *ch, - const struct iovec iov[], - size_t count) -{ - if(!iov) - return 0; - - int err; - ssize_t res; - - res = writev(fuse_chan_fd(ch), iov, count); - err = errno; - - if(res == -1) - { - struct fuse_session *se = fuse_chan_session(ch); - - assert(se != NULL); - - /* ENOENT means the operation was interrupted */ - if(!fuse_session_exited(se) && err != ENOENT) - perror("fuse: writing device"); - return -err; - } - - return 0; -} - void fuse_chan_destroy(struct fuse_chan *ch) { diff --git a/libfuse/lib/thread_pool.hpp b/libfuse/lib/thread_pool.hpp new file mode 100644 index 00000000..1599bef9 --- /dev/null +++ b/libfuse/lib/thread_pool.hpp @@ -0,0 +1,112 @@ +#pragma once + +#include "unbounded_queue.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +class ThreadPool +{ +public: + explicit + ThreadPool(const std::size_t thread_count_ = std::thread::hardware_concurrency()) + : _queues(thread_count_), + _count(thread_count_) + { + auto worker = [this](std::size_t i) + { + while(true) + { + Proc f; + + for(std::size_t n = 0; n < (_count * K); ++n) + { + if(_queues[(i + n) % _count].try_pop(f)) + break; + } + + if(!f && !_queues[i].pop(f)) + break; + + f(); + } + }; + + _threads.reserve(thread_count_); + for(std::size_t i = 0; i < thread_count_; ++i) + _threads.emplace_back(worker, i); + } + + ~ThreadPool() + { + for(auto& queue : _queues) + queue.unblock(); + for(auto& thread : _threads) + thread.join(); + } + + template + void + enqueue_work(F&& f_) + { + auto i = _index++; + + for(std::size_t n = 0; n < (_count * K); ++n) + { + if(_queues[(i + n) % _count].try_push(f_)) + return; + } + + _queues[i % _count].push(std::move(f_)); + } + + template + [[nodiscard]] + std::future::type> + enqueue_task(F&& f_) + { + using TaskReturnType = typename std::result_of::type; + using Promise = std::promise; + + auto i = _index++; + auto promise = std::make_shared(); + auto future = promise->get_future(); + auto work = [=]() { + auto rv = f_(); + promise->set_value(rv); + }; + + for(std::size_t n = 0; n < (_count * K); ++n) + { + if(_queues[(i + n) % _count].try_push(work)) + return future; + } + + _queues[i % _count].push(std::move(work)); + + return future; + } + +private: + using Proc = std::function; + using Queue = UnboundedQueue; + using Queues = std::vector; + Queues _queues; + +private: + std::vector _threads; + +private: + const std::size_t _count; + std::atomic_uint _index; + + static const unsigned int K = 2; +}; diff --git a/libfuse/lib/unbounded_queue.hpp b/libfuse/lib/unbounded_queue.hpp new file mode 100644 index 00000000..1e527b78 --- /dev/null +++ b/libfuse/lib/unbounded_queue.hpp @@ -0,0 +1,161 @@ +#pragma once + +#include +#include +#include +#include + + +template +class UnboundedQueue +{ +public: + explicit + UnboundedQueue(bool block_ = true) + : _block(block_) + { + } + + void + push(const T& item_) + { + { + std::lock_guard guard(_queue_lock); + _queue.push(item_); + } + _condition.notify_one(); + } + + void + push(T&& item_) + { + { + std::lock_guard guard(_queue_lock); + _queue.push(std::move(item_)); + } + + _condition.notify_one(); + } + + template + void + emplace(Args&&... args_) + { + { + std::lock_guard guard(_queue_lock); + _queue.emplace(std::forward(args_)...); + } + + _condition.notify_one(); + } + + bool + try_push(const T& item_) + { + { + std::unique_lock lock(_queue_lock, std::try_to_lock); + if(!lock) + return false; + _queue.push(item_); + } + + _condition.notify_one(); + + return true; + } + + bool + try_push(T&& item_) + { + { + std::unique_lock lock(_queue_lock, std::try_to_lock); + if(!lock) + return false; + _queue.push(std::move(item_)); + } + + _condition.notify_one(); + + return true; + } + + //TODO: push multiple T at once + + bool + pop(T& item_) + { + std::unique_lock guard(_queue_lock); + + _condition.wait(guard, [&]() { return !_queue.empty() || !_block; }); + if(_queue.empty()) + return false; + + item_ = std::move(_queue.front()); + _queue.pop(); + + return true; + } + + bool + try_pop(T& item_) + { + std::unique_lock lock(_queue_lock, std::try_to_lock); + if(!lock || _queue.empty()) + return false; + + item_ = std::move(_queue.front()); + _queue.pop(); + + return true; + } + + std::size_t + size() const + { + std::lock_guard guard(_queue_lock); + + return _queue.size(); + } + + bool + empty() const + { + std::lock_guard guard(_queue_lock); + + return _queue.empty(); + } + + void + block() + { + std::lock_guard guard(_queue_lock); + _block = true; + } + + void + unblock() + { + { + std::lock_guard guard(_queue_lock); + _block = false; + } + + _condition.notify_all(); + } + + bool + blocking() const + { + std::lock_guard guard(_queue_lock); + + return _block; + } + +private: + mutable std::mutex _queue_lock; + +private: + bool _block; + std::queue _queue; + std::condition_variable _condition; +}; diff --git a/src/config.cpp b/src/config.cpp index fd10f71f..a9e3793e 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -107,7 +107,8 @@ Config::Config() statfs_ignore(StatFSIgnore::ENUM::NONE), symlinkify(false), symlinkify_timeout(3600), - threads(0), + fuse_read_thread_count(-1), + fuse_process_thread_count(-1), version(MERGERFS_VERSION), writeback_cache(false), xattr(XAttr::ENUM::PASSTHROUGH) @@ -173,7 +174,9 @@ Config::Config() _map["statfs_ignore"] = &statfs_ignore; _map["symlinkify"] = &symlinkify; _map["symlinkify_timeout"] = &symlinkify_timeout; - _map["threads"] = &threads; + _map["threads"] = &fuse_read_thread_count; + _map["read-thread-count"] = &fuse_read_thread_count; + _map["process-thread-count"] = &fuse_process_thread_count; _map["version"] = &version; _map["xattr"] = &xattr; } diff --git a/src/config.hpp b/src/config.hpp index fe3ce3ee..5e3a796d 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -138,7 +138,8 @@ public: StatFSIgnore statfs_ignore; ConfigBOOL symlinkify; ConfigUINT64 symlinkify_timeout; - ConfigINT threads; + ConfigINT fuse_read_thread_count; + ConfigINT fuse_process_thread_count; ConfigSTR version; ConfigBOOL writeback_cache; XAttr xattr; diff --git a/src/option_parser.cpp b/src/option_parser.cpp index 519b52c3..982267e3 100644 --- a/src/option_parser.cpp +++ b/src/option_parser.cpp @@ -95,16 +95,11 @@ set_kv_option(const std::string &key_, static void -set_threads(Config::Write &cfg_, - fuse_args *args_) +set_fuse_threads(Config::Write &cfg_, + fuse_args *args_) { - int threads; - - threads = l::calculate_thread_count(cfg_->threads); - - cfg_->threads = threads; - - set_kv_option("threads",cfg_->threads.to_string(),args_); + set_kv_option("read-thread-count",cfg_->fuse_read_thread_count.to_string(),args_); + set_kv_option("process-thread-count",cfg_->fuse_process_thread_count.to_string(),args_); } static @@ -413,6 +408,6 @@ namespace options set_default_options(args_); set_fsname(cfg,args_); set_subtype(args_); - set_threads(cfg,args_); + set_fuse_threads(cfg,args_); } } diff --git a/tools/create-branches b/tools/create-branches new file mode 100755 index 00000000..4a893e44 --- /dev/null +++ b/tools/create-branches @@ -0,0 +1,14 @@ +#!/bin/sh +set -x + +BASEPATH="/tmp" + +for x in $(seq -w 2) +do + FILEPATH="${BASEPATH}/mergerfs-${x}.img" + MOUNTPOINT="${BASEPATH}/mergerfs-${x}" + truncate -s 1G "${FILEPATH}" + mkdir -p "${MOUNTPOINT}" + mkfs.ext4 -m0 -L "mergerfs${x}" "${FILEPATH}" + sudo mount "${FILEPATH}" "${MOUNTPOINT}" +done