From 6a14a10e6c4b4a2fe7d02b3172635e02c974ca8d Mon Sep 17 00:00:00 2001 From: Antonio SJ Musumeci Date: Sun, 18 Jun 2023 00:31:59 -0500 Subject: [PATCH] Fix read/write behavior and return value depending on direct_io Also add parallel direct write option for 6.2+ kernels. --- README.md | 4 + libfuse/include/fuse_common.h | 2 + libfuse/include/fuse_kernel.h | 143 ++++++++++++++++++++++----------- libfuse/lib/fuse_lowlevel.c | 31 ++++---- man/mergerfs.1 | 6 ++ src/config.cpp | 2 + src/config.hpp | 2 +- src/fileinfo.hpp | 12 ++- src/fs_copydata_readwrite.cpp | 4 +- src/fs_dup2.hpp | 38 +++++++++ src/fs_movefile.cpp | 2 - src/fs_preadn.hpp | 59 ++++++++++++++ src/fs_pwrite.hpp | 42 ++++++++++ src/fs_pwriten.hpp | 59 ++++++++++++++ src/fs_write.hpp | 11 --- src/fuse_create.cpp | 43 +++++----- src/fuse_open.cpp | 42 +++++----- src/fuse_read.cpp | 32 +++++--- src/fuse_write.cpp | 145 +++++++++++++++++++++++++++------- 19 files changed, 526 insertions(+), 153 deletions(-) create mode 100644 src/fs_dup2.hpp create mode 100644 src/fs_preadn.hpp create mode 100644 src/fs_pwrite.hpp create mode 100644 src/fs_pwriten.hpp diff --git a/README.md b/README.md index 79e9b8c0..98263926 100644 --- a/README.md +++ b/README.md @@ -279,6 +279,9 @@ These options are the same regardless of whether you use them with the (default: false) * **cache.readdir=BOOL**: Cache readdir (if supported by kernel) (default: false) +* **parallel-direct-writes=BOOL**: Allow the kernel to dispatch + multiple, parallel (non-extending) write requests for files opened + with `direct_io=true` (if supported by the kernel) * **direct_io**: deprecated - Bypass page cache. Use `cache.files=off` instead. (default: false) * **kernel_cache**: deprecated - Do not invalidate data cache on file @@ -1461,6 +1464,7 @@ understand what behaviors it may impact * disable `security_capability` and/or `xattr` * increase cache timeouts `cache.attr`, `cache.entry`, `cache.negative_entry` * enable (or disable) page caching (`cache.files`) +* enable `parallel-direct-writes` * enable `cache.writeback` * enable `cache.statfs` * enable `cache.symlinks` diff --git a/libfuse/include/fuse_common.h b/libfuse/include/fuse_common.h index 68b50f06..4f9a28ff 100644 --- a/libfuse/include/fuse_common.h +++ b/libfuse/include/fuse_common.h @@ -84,6 +84,8 @@ struct fuse_file_info_t uint32_t auto_cache : 1; + uint32_t parallel_direct_writes:1; + /** File handle. May be filled in by filesystem in open(). Available in all other file operations */ uint64_t fh; diff --git a/libfuse/include/fuse_kernel.h b/libfuse/include/fuse_kernel.h index 1e0495b5..b8de10eb 100644 --- a/libfuse/include/fuse_kernel.h +++ b/libfuse/include/fuse_kernel.h @@ -197,6 +197,15 @@ * * 7.37 * - add FUSE_TMPFILE + * + * 7.38 + * - add FUSE_EXPIRE_ONLY flag to fuse_notify_inval_entry + * - add FOPEN_PARALLEL_DIRECT_WRITES + * - add total_extlen to fuse_in_header + * - add FUSE_MAX_NR_SECCTX + * - add extension header + * - add FUSE_EXT_GROUPS + * - add FUSE_CREATE_SUPP_GROUP */ #ifndef _LINUX_FUSE_H @@ -232,7 +241,7 @@ #define FUSE_KERNEL_VERSION 7 /** Minor version number of this interface */ -#define FUSE_KERNEL_MINOR_VERSION 37 +#define FUSE_KERNEL_MINOR_VERSION 38 /** The node ID of the root inode */ #define FUSE_ROOT_ID 1 @@ -304,13 +313,15 @@ struct fuse_file_lock { * FOPEN_CACHE_DIR: allow caching this directory * FOPEN_STREAM: the file is stream-like (no file position at all) * FOPEN_NOFLUSH: don't flush data cache on close (unless FUSE_WRITEBACK_CACHE) + * FOPEN_PARALLEL_DIRECT_WRITES: Allow concurrent direct writes on the same inode */ -#define FOPEN_DIRECT_IO (1 << 0) -#define FOPEN_KEEP_CACHE (1 << 1) -#define FOPEN_NONSEEKABLE (1 << 2) -#define FOPEN_CACHE_DIR (1 << 3) -#define FOPEN_STREAM (1 << 4) -#define FOPEN_NOFLUSH (1 << 5) +#define FOPEN_DIRECT_IO (1 << 0) +#define FOPEN_KEEP_CACHE (1 << 1) +#define FOPEN_NONSEEKABLE (1 << 2) +#define FOPEN_CACHE_DIR (1 << 3) +#define FOPEN_STREAM (1 << 4) +#define FOPEN_NOFLUSH (1 << 5) +#define FOPEN_PARALLEL_DIRECT_WRITES (1 << 6) /** * INIT request/reply flags @@ -356,42 +367,45 @@ struct fuse_file_lock { * FUSE_SECURITY_CTX: add security context to create, mkdir, symlink, and * mknod * FUSE_HAS_INODE_DAX: use per inode DAX + * FUSE_CREATE_SUPP_GROUP: add supplementary group info to create, mkdir, + * symlink and mknod (single group that matches parent) */ -#define FUSE_ASYNC_READ (1 << 0) -#define FUSE_POSIX_LOCKS (1 << 1) -#define FUSE_FILE_OPS (1 << 2) -#define FUSE_ATOMIC_O_TRUNC (1 << 3) -#define FUSE_EXPORT_SUPPORT (1 << 4) -#define FUSE_BIG_WRITES (1 << 5) -#define FUSE_DONT_MASK (1 << 6) -#define FUSE_SPLICE_WRITE (1 << 7) -#define FUSE_SPLICE_MOVE (1 << 8) -#define FUSE_SPLICE_READ (1 << 9) -#define FUSE_FLOCK_LOCKS (1 << 10) -#define FUSE_HAS_IOCTL_DIR (1 << 11) -#define FUSE_AUTO_INVAL_DATA (1 << 12) -#define FUSE_DO_READDIRPLUS (1 << 13) -#define FUSE_READDIRPLUS_AUTO (1 << 14) -#define FUSE_ASYNC_DIO (1 << 15) -#define FUSE_WRITEBACK_CACHE (1 << 16) -#define FUSE_NO_OPEN_SUPPORT (1 << 17) -#define FUSE_PARALLEL_DIROPS (1 << 18) -#define FUSE_HANDLE_KILLPRIV (1 << 19) -#define FUSE_POSIX_ACL (1 << 20) -#define FUSE_ABORT_ERROR (1 << 21) -#define FUSE_MAX_PAGES (1 << 22) -#define FUSE_CACHE_SYMLINKS (1 << 23) -#define FUSE_NO_OPENDIR_SUPPORT (1 << 24) +#define FUSE_ASYNC_READ (1 << 0) +#define FUSE_POSIX_LOCKS (1 << 1) +#define FUSE_FILE_OPS (1 << 2) +#define FUSE_ATOMIC_O_TRUNC (1 << 3) +#define FUSE_EXPORT_SUPPORT (1 << 4) +#define FUSE_BIG_WRITES (1 << 5) +#define FUSE_DONT_MASK (1 << 6) +#define FUSE_SPLICE_WRITE (1 << 7) +#define FUSE_SPLICE_MOVE (1 << 8) +#define FUSE_SPLICE_READ (1 << 9) +#define FUSE_FLOCK_LOCKS (1 << 10) +#define FUSE_HAS_IOCTL_DIR (1 << 11) +#define FUSE_AUTO_INVAL_DATA (1 << 12) +#define FUSE_DO_READDIRPLUS (1 << 13) +#define FUSE_READDIRPLUS_AUTO (1 << 14) +#define FUSE_ASYNC_DIO (1 << 15) +#define FUSE_WRITEBACK_CACHE (1 << 16) +#define FUSE_NO_OPEN_SUPPORT (1 << 17) +#define FUSE_PARALLEL_DIROPS (1 << 18) +#define FUSE_HANDLE_KILLPRIV (1 << 19) +#define FUSE_POSIX_ACL (1 << 20) +#define FUSE_ABORT_ERROR (1 << 21) +#define FUSE_MAX_PAGES (1 << 22) +#define FUSE_CACHE_SYMLINKS (1 << 23) +#define FUSE_NO_OPENDIR_SUPPORT (1 << 24) #define FUSE_EXPLICIT_INVAL_DATA (1 << 25) -#define FUSE_MAP_ALIGNMENT (1 << 26) -#define FUSE_SUBMOUNTS (1 << 27) -#define FUSE_HANDLE_KILLPRIV_V2 (1 << 28) -#define FUSE_SETXATTR_EXT (1 << 29) -#define FUSE_INIT_EXT (1 << 30) -#define FUSE_INIT_RESERVED (1 << 31) +#define FUSE_MAP_ALIGNMENT (1 << 26) +#define FUSE_SUBMOUNTS (1 << 27) +#define FUSE_HANDLE_KILLPRIV_V2 (1 << 28) +#define FUSE_SETXATTR_EXT (1 << 29) +#define FUSE_INIT_EXT (1 << 30) +#define FUSE_INIT_RESERVED (1 << 31) /* bits 32..63 get shifted down 32 bits into the flags2 field */ -#define FUSE_SECURITY_CTX (1ULL << 32) -#define FUSE_HAS_INODE_DAX (1ULL << 33) +#define FUSE_SECURITY_CTX (1ULL << 32) +#define FUSE_HAS_INODE_DAX (1ULL << 33) +#define FUSE_CREATE_SUPP_GROUP (1ULL << 34) /** * CUSE INIT request/reply flags @@ -403,8 +417,8 @@ struct fuse_file_lock { /** * Release flags */ -#define FUSE_RELEASE_FLUSH (1 << 0) -#define FUSE_RELEASE_FLOCK_UNLOCK (1 << 1) +#define FUSE_RELEASE_FLUSH (1 << 0) +#define FUSE_RELEASE_FLOCK_UNLOCK (1 << 1) /** * Getattr flags @@ -491,6 +505,23 @@ struct fuse_file_lock { */ #define FUSE_SETXATTR_ACL_KILL_SGID (1 << 0) +/** + * notify_inval_entry flags + * FUSE_EXPIRE_ONLY + */ +#define FUSE_EXPIRE_ONLY (1 << 0) + +/** + * extension type + * FUSE_MAX_NR_SECCTX: maximum value of &fuse_secctx_header.nr_secctx + * FUSE_EXT_GROUPS: &fuse_supp_groups extension + */ +enum fuse_ext_type { + /* Types 0..31 are reserved for fuse_secctx_header */ + FUSE_MAX_NR_SECCTX = 31, + FUSE_EXT_GROUPS = 32, +}; + enum fuse_opcode { FUSE_LOOKUP = 1, FUSE_FORGET = 2, /* no reply */ @@ -874,7 +905,8 @@ struct fuse_in_header { uint32_t uid; uint32_t gid; uint32_t pid; - uint32_t padding; + uint16_t total_extlen; /* length of extensions in 8byte units */ + uint16_t padding; }; struct fuse_out_header { @@ -919,7 +951,7 @@ struct fuse_notify_inval_inode_out { struct fuse_notify_inval_entry_out { uint64_t parent; uint32_t namelen; - uint32_t padding; + uint32_t flags; }; struct fuse_notify_delete_out { @@ -1035,4 +1067,27 @@ struct fuse_secctx_header { uint32_t nr_secctx; }; +/** + * struct fuse_ext_header - extension header + * @size: total size of this extension including this header + * @type: type of extension + * + * This is made compatible with fuse_secctx_header by using type values > + * FUSE_MAX_NR_SECCTX + */ +struct fuse_ext_header { + uint32_t size; + uint32_t type; +}; + +/** + * struct fuse_supp_groups - Supplementary group extension + * @nr_groups: number of supplementary groups + * @groups: flexible array of group IDs + */ +struct fuse_supp_groups { + uint32_t nr_groups; + uint32_t groups[]; +}; + #endif /* _LINUX_FUSE_H */ diff --git a/libfuse/lib/fuse_lowlevel.c b/libfuse/lib/fuse_lowlevel.c index fa8a8535..7ed57981 100644 --- a/libfuse/lib/fuse_lowlevel.c +++ b/libfuse/lib/fuse_lowlevel.c @@ -261,18 +261,20 @@ fill_entry(struct fuse_entry_out *arg, static void -fill_open(struct fuse_open_out *arg, - const fuse_file_info_t *f) -{ - arg->fh = f->fh; - if(f->direct_io) - arg->open_flags |= FOPEN_DIRECT_IO; - if(f->keep_cache) - arg->open_flags |= FOPEN_KEEP_CACHE; - if(f->nonseekable) - arg->open_flags |= FOPEN_NONSEEKABLE; - if(f->cache_readdir) - arg->open_flags |= FOPEN_CACHE_DIR; +fill_open(struct fuse_open_out *arg_, + const fuse_file_info_t *ffi_) +{ + arg_->fh = ffi_->fh; + if(ffi_->direct_io) + arg_->open_flags |= FOPEN_DIRECT_IO; + if(ffi_->keep_cache) + arg_->open_flags |= FOPEN_KEEP_CACHE; + if(ffi_->nonseekable) + arg_->open_flags |= FOPEN_NONSEEKABLE; + if(ffi_->cache_readdir) + arg_->open_flags |= FOPEN_CACHE_DIR; + if(ffi_->parallel_direct_writes) + arg_->open_flags |= FOPEN_PARALLEL_DIRECT_WRITES; } int @@ -1414,9 +1416,10 @@ fuse_lowlevel_notify_inval_entry(struct fuse_chan *ch, if(!f) return -ENODEV; - outarg.parent = parent; + outarg.parent = parent; outarg.namelen = namelen; - outarg.padding = 0; + // TODO: Add ability to set `flags` + outarg.flags = 0; iov[1].iov_base = &outarg; iov[1].iov_len = sizeof(outarg); diff --git a/man/mergerfs.1 b/man/mergerfs.1 index 8b018766..afb6601b 100644 --- a/man/mergerfs.1 +++ b/man/mergerfs.1 @@ -377,6 +377,10 @@ to enable page caching for when \f[C]cache.files=per-process\f[R]. \f[B]cache.readdir=BOOL\f[R]: Cache readdir (if supported by kernel) (default: false) .IP \[bu] 2 +\f[B]parallel-direct-writes=BOOL\f[R]: Allow the kernel to dispatch +multiple, parallel (non-extending) write requests for files opened with +\f[C]direct_io=true\f[R] (if supported by the kernel) +.IP \[bu] 2 \f[B]direct_io\f[R]: deprecated - Bypass page cache. Use \f[C]cache.files=off\f[R] instead. (default: false) @@ -1902,6 +1906,8 @@ increase cache timeouts \f[C]cache.attr\f[R], \f[C]cache.entry\f[R], .IP \[bu] 2 enable (or disable) page caching (\f[C]cache.files\f[R]) .IP \[bu] 2 +enable \f[C]parallel-direct-writes\f[R] +.IP \[bu] 2 enable \f[C]cache.writeback\f[R] .IP \[bu] 2 enable \f[C]cache.statfs\f[R] diff --git a/src/config.cpp b/src/config.cpp index b58e7753..e2420c8a 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -107,6 +107,7 @@ Config::Config() moveonenospc(false), nfsopenhack(NFSOpenHack::ENUM::OFF), nullrw(false), + parallel_direct_writes(false), pid(::getpid()), posix_acl(false), readahead(0), @@ -182,6 +183,7 @@ Config::Config() _map["nfsopenhack"] = &nfsopenhack; _map["nullrw"] = &nullrw; _map["pid"] = &pid; + _map["parallel-direct-writes"] = ¶llel_direct_writes; _map["pin-threads"] = &fuse_pin_threads; _map["posix_acl"] = &posix_acl; _map["readahead"] = &readahead; diff --git a/src/config.hpp b/src/config.hpp index ab2270b2..75f6cbe1 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -131,6 +131,7 @@ public: MoveOnENOSPC moveonenospc; NFSOpenHack nfsopenhack; ConfigBOOL nullrw; + ConfigBOOL parallel_direct_writes; ConfigUINT64 pid; ConfigBOOL posix_acl; ConfigUINT64 readahead; @@ -151,7 +152,6 @@ public: ConfigBOOL writeback_cache; XAttr xattr; - private: bool _initialized; diff --git a/src/fileinfo.hpp b/src/fileinfo.hpp index 4d4948e4..4c8beaba 100644 --- a/src/fileinfo.hpp +++ b/src/fileinfo.hpp @@ -18,19 +18,25 @@ #include "fh.hpp" +#include #include +#include class FileInfo : public FH { public: - FileInfo(const int fd_, - const char *fusepath_) + FileInfo(int const fd_, + char const *fusepath_, + bool const direct_io_) : FH(fusepath_), - fd(fd_) + fd(fd_), + direct_io(direct_io_) { } public: int fd; + uint32_t direct_io:1; + std::mutex mutex; }; diff --git a/src/fs_copydata_readwrite.cpp b/src/fs_copydata_readwrite.cpp index 45db4bf9..abf0c449 100644 --- a/src/fs_copydata_readwrite.cpp +++ b/src/fs_copydata_readwrite.cpp @@ -65,11 +65,9 @@ namespace l size_t totalwritten; vector buf; - bufsize = (128 * 1024); + bufsize = (1024 * 1024); buf.resize(bufsize); - fs::lseek(src_fd_,0,SEEK_SET); - totalwritten = 0; while(totalwritten < count_) { diff --git a/src/fs_dup2.hpp b/src/fs_dup2.hpp new file mode 100644 index 00000000..d073ec3a --- /dev/null +++ b/src/fs_dup2.hpp @@ -0,0 +1,38 @@ +/* + ISC License + + Copyright (c) 2023, Antonio SJ Musumeci + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#pragma once + +#include + + +namespace fs +{ + static + inline + int + dup2(int const oldfd_, + int const newfd_) + { + int rv; + + rv = ::dup2(oldfd_,newfd_); + + return ((rv == -1) ? -errno : rv); + } +} diff --git a/src/fs_movefile.cpp b/src/fs_movefile.cpp index 51590a70..5c45ed64 100644 --- a/src/fs_movefile.cpp +++ b/src/fs_movefile.cpp @@ -156,8 +156,6 @@ namespace l fs::unlink(srcfd_filepath); - fs::close(origfd_); - return rv; } } diff --git a/src/fs_preadn.hpp b/src/fs_preadn.hpp new file mode 100644 index 00000000..1e964a3f --- /dev/null +++ b/src/fs_preadn.hpp @@ -0,0 +1,59 @@ +/* + ISC License + + Copyright (c) 2023, Antonio SJ Musumeci + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#pragma once + +#include "fs_pread.hpp" + + +namespace fs +{ + static + inline + ssize_t + preadn(int const fd_, + void *buf_, + size_t const count_, + off_t const offset_, + int *err_) + { + ssize_t rv; + ssize_t count = count_; + off_t offset = offset_; + char const *buf = (char const *)buf_; + + *err_ = 0; + while(count > 0) + { + rv = fs::pread(fd_,buf,count,offset); + if(rv == 0) + return (count_ - count); + if(rv < 0) + { + *err_ = rv; + return (count_ - count); + } + + buf += rv; + count -= rv; + offset += rv; + } + + return count_; + } +} diff --git a/src/fs_pwrite.hpp b/src/fs_pwrite.hpp new file mode 100644 index 00000000..9acc0f11 --- /dev/null +++ b/src/fs_pwrite.hpp @@ -0,0 +1,42 @@ +/* + ISC License + + Copyright (c) 2016, Antonio SJ Musumeci + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#pragma once + +#include + + +namespace fs +{ + static + inline + ssize_t + pwrite(int const fd_, + void const *buf_, + size_t const count_, + off_t const offset_) + { + ssize_t rv; + + rv = ::pwrite(fd_,buf_,count_,offset_); + if(rv == -1) + return -errno; + + return rv; + } +} diff --git a/src/fs_pwriten.hpp b/src/fs_pwriten.hpp new file mode 100644 index 00000000..0e00b8e8 --- /dev/null +++ b/src/fs_pwriten.hpp @@ -0,0 +1,59 @@ +/* + ISC License + + Copyright (c) 2023, Antonio SJ Musumeci + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#pragma once + +#include "fs_pwrite.hpp" + + +namespace fs +{ + static + inline + ssize_t + pwriten(const int fd_, + const void *buf_, + const size_t count_, + const off_t offset_, + int *err_) + { + ssize_t rv; + ssize_t count = count_; + off_t offset = offset_; + char const *buf = (char const *)buf_; + + *err_ = 0; + while(count > 0) + { + rv = fs::pwrite(fd_,buf,count,offset); + if(rv == 0) + return (count_ - count); + if(rv < 0) + { + *err_ = rv; + return (count_ - count); + } + + buf += rv; + count -= rv; + offset += rv; + } + + return count_; + } +} diff --git a/src/fs_write.hpp b/src/fs_write.hpp index 750bef03..ac329b57 100644 --- a/src/fs_write.hpp +++ b/src/fs_write.hpp @@ -32,15 +32,4 @@ namespace fs { return ::write(fd_,buf_,count_); } - - static - inline - ssize_t - pwrite(const int fd_, - const void *buf_, - const size_t count_, - const off_t offset_) - { - return ::pwrite(fd_,buf_,count_,offset_); - } } diff --git a/src/fuse_create.cpp b/src/fuse_create.cpp index 88270294..11179118 100644 --- a/src/fuse_create.cpp +++ b/src/fuse_create.cpp @@ -102,6 +102,9 @@ namespace l } break; } + + if(cfg_->parallel_direct_writes == true) + ffi_->parallel_direct_writes = ffi_->direct_io; } static @@ -121,21 +124,23 @@ namespace l int create_core(const std::string &createpath_, const char *fusepath_, + fuse_file_info_t *ffi_, const mode_t mode_, - const mode_t umask_, - const int flags_, - uint64_t *fh_) + const mode_t umask_) { int rv; + FileInfo *fi; std::string fullpath; fullpath = fs::path::make(createpath_,fusepath_); - rv = l::create_core(fullpath,mode_,umask_,flags_); + rv = l::create_core(fullpath,mode_,umask_,ffi_->flags); if(rv == -1) return -errno; - *fh_ = reinterpret_cast(new FileInfo(rv,fusepath_)); + fi = new FileInfo(rv,fusepath_,ffi_->direct_io); + + ffi_->fh = reinterpret_cast(fi); return 0; } @@ -146,10 +151,9 @@ namespace l const Policy::Create &createFunc_, const Branches &branches_, const char *fusepath_, + fuse_file_info_t *ffi_, const mode_t mode_, - const mode_t umask_, - const int flags_, - uint64_t *fh_) + const mode_t umask_) { int rv; std::string fullpath; @@ -173,10 +177,9 @@ namespace l return l::create_core(createpaths[0], fusepath_, + ffi_, mode_, - umask_, - flags_, - fh_); + umask_); } } @@ -187,6 +190,7 @@ namespace FUSE mode_t mode_, fuse_file_info_t *ffi_) { + int rv; Config::Read cfg; const fuse_context *fc = fuse_get_context(); const ugid::Set ugid(fc->uid,fc->gid); @@ -196,13 +200,14 @@ namespace FUSE if(cfg->writeback_cache) l::tweak_flags_writeback_cache(&ffi_->flags); - return l::create(cfg->func.getattr.policy, - cfg->func.create.policy, - cfg->branches, - fusepath_, - mode_, - fc->umask, - ffi_->flags, - &ffi_->fh); + rv = l::create(cfg->func.getattr.policy, + cfg->func.create.policy, + cfg->branches, + fusepath_, + ffi_, + mode_, + fc->umask); + + return rv; } } diff --git a/src/fuse_open.cpp b/src/fuse_open.cpp index 147152dc..1cb083dc 100644 --- a/src/fuse_open.cpp +++ b/src/fuse_open.cpp @@ -165,32 +165,37 @@ namespace l } break; } + + if(cfg_->parallel_direct_writes == true) + ffi_->parallel_direct_writes = ffi_->direct_io; } static int open_core(const std::string &basepath_, const char *fusepath_, - const int flags_, + fuse_file_info_t *ffi_, const bool link_cow_, - const NFSOpenHack nfsopenhack_, - uint64_t *fh_) + const NFSOpenHack nfsopenhack_) { int fd; + FileInfo *fi; std::string fullpath; fullpath = fs::path::make(basepath_,fusepath_); - if(link_cow_ && fs::cow::is_eligible(fullpath.c_str(),flags_)) + if(link_cow_ && fs::cow::is_eligible(fullpath.c_str(),ffi_->flags)) fs::cow::break_link(fullpath.c_str()); - fd = fs::open(fullpath,flags_); + fd = fs::open(fullpath,ffi_->flags); if((fd == -1) && (errno == EACCES)) - fd = l::nfsopenhack(fullpath,flags_,nfsopenhack_); + fd = l::nfsopenhack(fullpath,ffi_->flags,nfsopenhack_); if(fd == -1) return -errno; - *fh_ = reinterpret_cast(new FileInfo(fd,fusepath_)); + fi = new FileInfo(fd,fusepath_,ffi_->direct_io); + + ffi_->fh = reinterpret_cast(fi); return 0; } @@ -200,10 +205,9 @@ namespace l open(const Policy::Search &searchFunc_, const Branches &branches_, const char *fusepath_, - const int flags_, + fuse_file_info_t *ffi_, const bool link_cow_, - const NFSOpenHack nfsopenhack_, - uint64_t *fh_) + const NFSOpenHack nfsopenhack_) { int rv; StrVec basepaths; @@ -212,7 +216,7 @@ namespace l if(rv == -1) return -errno; - return l::open_core(basepaths[0],fusepath_,flags_,link_cow_,nfsopenhack_,fh_); + return l::open_core(basepaths[0],fusepath_,ffi_,link_cow_,nfsopenhack_); } } @@ -222,6 +226,7 @@ namespace FUSE open(const char *fusepath_, fuse_file_info_t *ffi_) { + int rv; Config::Read cfg; const fuse_context *fc = fuse_get_context(); const ugid::Set ugid(fc->uid,fc->gid); @@ -231,12 +236,13 @@ namespace FUSE if(cfg->writeback_cache) l::tweak_flags_writeback_cache(&ffi_->flags); - return l::open(cfg->func.open.policy, - cfg->branches, - fusepath_, - ffi_->flags, - cfg->link_cow, - cfg->nfsopenhack, - &ffi_->fh); + rv = l::open(cfg->func.open.policy, + cfg->branches, + fusepath_, + ffi_, + cfg->link_cow, + cfg->nfsopenhack); + + return rv; } } diff --git a/src/fuse_read.cpp b/src/fuse_read.cpp index e3d29de5..8d6bf126 100644 --- a/src/fuse_read.cpp +++ b/src/fuse_read.cpp @@ -23,17 +23,29 @@ #include #include -typedef struct fuse_bufvec fuse_bufvec; - namespace l { static int - read(const int fd_, - char *buf_, - const size_t size_, - const off_t offset_) + read_direct_io(const int fd_, + char *buf_, + const size_t size_, + const off_t offset_) + { + int rv; + + rv = fs::pread(fd_,buf_,size_,offset_); + + return rv; + } + + static + int + read_cached(const int fd_, + char *buf_, + const size_t size_, + const off_t offset_) { int rv; @@ -53,10 +65,10 @@ namespace FUSE { FileInfo *fi = reinterpret_cast(ffi_->fh); - return l::read(fi->fd, - buf_, - size_, - offset_); + if(fi->direct_io) + return l::read_direct_io(fi->fd,buf_,size_,offset_); + + return l::read_cached(fi->fd,buf_,size_,offset_); } int diff --git a/src/fuse_write.cpp b/src/fuse_write.cpp index 2be75ce2..6b6fe59f 100644 --- a/src/fuse_write.cpp +++ b/src/fuse_write.cpp @@ -17,8 +17,11 @@ #include "config.hpp" #include "errno.hpp" #include "fileinfo.hpp" +#include "fs_close.hpp" +#include "fs_dup2.hpp" #include "fs_movefile.hpp" -#include "fs_write.hpp" +#include "fs_pwrite.hpp" +#include "fs_pwriten.hpp" #include "ugid.hpp" #include "fuse.h" @@ -36,37 +39,53 @@ namespace l { static bool - out_of_space(const int error_) + out_of_space(const ssize_t error_) { - return ((error_ == ENOSPC) || - (error_ == EDQUOT)); + return ((error_ == -ENOSPC) || + (error_ == -EDQUOT)); } static int - write(const int fd_, - const void *buf_, - const size_t count_, - const off_t offset_) + move_and_pwrite(const char *buf_, + const size_t count_, + const off_t offset_, + FileInfo *fi_, + int err_) { - int rv; + int err; + ssize_t rv; + Config::Read cfg; - rv = fs::pwrite(fd_,buf_,count_,offset_); - if(rv == -1) - return -errno; + if(cfg->moveonenospc.enabled == false) + return err_; - return rv; + rv = fs::movefile_as_root(cfg->moveonenospc.policy, + cfg->branches, + fi_->fusepath, + fi_->fd); + if(rv < 0) + return err_; + + err = fs::dup2(rv,fi_->fd); + fs::close(rv); + if(err < 0) + return err; + + return fs::pwrite(fi_->fd,buf_,count_,offset_); } static int - move_and_write(const char *buf_, - const size_t count_, - const off_t offset_, - FileInfo *fi_, - int err_) + move_and_pwriten(char const *buf_, + size_t const count_, + off_t const offset_, + FileInfo *fi_, + ssize_t const err_, + ssize_t const written_) { - int rv; + int err; + ssize_t rv; Config::Read cfg; if(cfg->moveonenospc.enabled == false) @@ -79,9 +98,70 @@ namespace l if(rv < 0) return err_; - fi_->fd = rv; + err = fs::dup2(rv,fi_->fd); + fs::close(rv); + if(err < 0) + return err; + + rv = fs::pwriten(fi_->fd, + buf_ + written_, + count_ - written_, + offset_ + written_, + &err); + if(err < 0) + return err; + + return (rv + written_); + } + + // When in direct_io mode write's return value should match that of + // the operation. + // 0 on EOF + // N bytes written (short writes included) + // -errno on error + // See libfuse/include/fuse.h for more details + static + int + write_direct_io(const char *buf_, + const size_t count_, + const off_t offset_, + FileInfo *fi_) + { + ssize_t rv; + + rv = fs::pwrite(fi_->fd,buf_,count_,offset_); + if(l::out_of_space(rv)) + rv = l::move_and_pwrite(buf_,count_,offset_,fi_,rv); + + return rv; + } + + // When not in direct_io mode write's return value is more complex. + // 0 or less than `count` on EOF + // `count` on non-errors + // -errno on error + // See libfuse/include/fuse.h for more details + static + int + write_cached(const char *buf_, + const size_t count_, + const off_t offset_, + FileInfo *fi_) + { + int err; + ssize_t rv; + + rv = fs::pwriten(fi_->fd,buf_,count_,offset_,&err); + if(rv == (ssize_t)count_) + return count_; + if(rv == 0) + return 0; + if(err && !l::out_of_space(err)) + return err; - return l::write(fi_->fd,buf_,count_,offset_); + rv = l::move_and_pwriten(buf_,count_,offset_,fi_,err,rv); + + return rv; } static @@ -91,16 +171,25 @@ namespace l const size_t count_, const off_t offset_) { - int rv; - FileInfo* fi; + FileInfo *fi; fi = reinterpret_cast(ffi_->fh); - rv = l::write(fi->fd,buf_,count_,offset_); - if(l::out_of_space(-rv)) - rv = l::move_and_write(buf_,count_,offset_,fi,rv); - - return rv; + // Concurrent writes can only happen if: + // 1) writeback-cache is enabled and using page caching + // 2) parallel_direct_writes is enabled and file has `direct_io=true` + // Will look into selectively locking in the future + // A reader/writer lock would probably be the best option given + // the expense of the write itself in comparison. Alternatively, + // could change the move file behavior to use a known target file + // and have threads use O_EXCL and back off and wait for the + // transfer to complete before retrying. + std::lock_guard guard(fi->mutex); + + if(fi->direct_io) + return l::write_direct_io(buf_,count_,offset_,fi); + + return l::write_cached(buf_,count_,offset_,fi); } }