Browse Source

Merge pull request #1203 from trapexit/append-move

Fix move when in append mode + fix read/write direct_io vs cached behavior
pull/1204/head
trapexit 2 years ago
committed by GitHub
parent
commit
42836e0961
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      README.md
  2. 2
      libfuse/include/fuse_common.h
  3. 61
      libfuse/include/fuse_kernel.h
  4. 29
      libfuse/lib/fuse_lowlevel.c
  5. 6
      man/mergerfs.1
  6. 2
      src/config.cpp
  7. 2
      src/config.hpp
  8. 12
      src/fileinfo.hpp
  9. 10
      src/fs_copydata_readwrite.cpp
  10. 38
      src/fs_dup2.hpp
  11. 2
      src/fs_mktemp.cpp
  12. 148
      src/fs_movefile.cpp
  13. 4
      src/fs_movefile.hpp
  14. 59
      src/fs_preadn.hpp
  15. 42
      src/fs_pwrite.hpp
  16. 59
      src/fs_pwriten.hpp
  17. 11
      src/fs_write.hpp
  18. 35
      src/fuse_create.cpp
  19. 36
      src/fuse_open.cpp
  20. 26
      src/fuse_read.cpp
  21. 167
      src/fuse_write.cpp

4
README.md

@ -279,6 +279,9 @@ These options are the same regardless of whether you use them with the
(default: false)
* **cache.readdir=BOOL**: Cache readdir (if supported by kernel)
(default: false)
* **parallel-direct-writes=BOOL**: Allow the kernel to dispatch
multiple, parallel (non-extending) write requests for files opened
with `direct_io=true` (if supported by the kernel)
* **direct_io**: deprecated - Bypass page cache. Use `cache.files=off`
instead. (default: false)
* **kernel_cache**: deprecated - Do not invalidate data cache on file
@ -1461,6 +1464,7 @@ understand what behaviors it may impact
* disable `security_capability` and/or `xattr`
* increase cache timeouts `cache.attr`, `cache.entry`, `cache.negative_entry`
* enable (or disable) page caching (`cache.files`)
* enable `parallel-direct-writes`
* enable `cache.writeback`
* enable `cache.statfs`
* enable `cache.symlinks`

2
libfuse/include/fuse_common.h

@ -84,6 +84,8 @@ struct fuse_file_info_t
uint32_t auto_cache : 1;
uint32_t parallel_direct_writes:1;
/** File handle. May be filled in by filesystem in open().
Available in all other file operations */
uint64_t fh;

61
libfuse/include/fuse_kernel.h

@ -197,6 +197,15 @@
*
* 7.37
* - add FUSE_TMPFILE
*
* 7.38
* - add FUSE_EXPIRE_ONLY flag to fuse_notify_inval_entry
* - add FOPEN_PARALLEL_DIRECT_WRITES
* - add total_extlen to fuse_in_header
* - add FUSE_MAX_NR_SECCTX
* - add extension header
* - add FUSE_EXT_GROUPS
* - add FUSE_CREATE_SUPP_GROUP
*/
#ifndef _LINUX_FUSE_H
@ -232,7 +241,7 @@
#define FUSE_KERNEL_VERSION 7
/** Minor version number of this interface */
#define FUSE_KERNEL_MINOR_VERSION 37
#define FUSE_KERNEL_MINOR_VERSION 38
/** The node ID of the root inode */
#define FUSE_ROOT_ID 1
@ -304,6 +313,7 @@ struct fuse_file_lock {
* FOPEN_CACHE_DIR: allow caching this directory
* FOPEN_STREAM: the file is stream-like (no file position at all)
* FOPEN_NOFLUSH: don't flush data cache on close (unless FUSE_WRITEBACK_CACHE)
* FOPEN_PARALLEL_DIRECT_WRITES: Allow concurrent direct writes on the same inode
*/
#define FOPEN_DIRECT_IO (1 << 0)
#define FOPEN_KEEP_CACHE (1 << 1)
@ -311,6 +321,7 @@ struct fuse_file_lock {
#define FOPEN_CACHE_DIR (1 << 3)
#define FOPEN_STREAM (1 << 4)
#define FOPEN_NOFLUSH (1 << 5)
#define FOPEN_PARALLEL_DIRECT_WRITES (1 << 6)
/**
* INIT request/reply flags
@ -356,6 +367,8 @@ struct fuse_file_lock {
* FUSE_SECURITY_CTX: add security context to create, mkdir, symlink, and
* mknod
* FUSE_HAS_INODE_DAX: use per inode DAX
* FUSE_CREATE_SUPP_GROUP: add supplementary group info to create, mkdir,
* symlink and mknod (single group that matches parent)
*/
#define FUSE_ASYNC_READ (1 << 0)
#define FUSE_POSIX_LOCKS (1 << 1)
@ -392,6 +405,7 @@ struct fuse_file_lock {
/* bits 32..63 get shifted down 32 bits into the flags2 field */
#define FUSE_SECURITY_CTX (1ULL << 32)
#define FUSE_HAS_INODE_DAX (1ULL << 33)
#define FUSE_CREATE_SUPP_GROUP (1ULL << 34)
/**
* CUSE INIT request/reply flags
@ -491,6 +505,23 @@ struct fuse_file_lock {
*/
#define FUSE_SETXATTR_ACL_KILL_SGID (1 << 0)
/**
* notify_inval_entry flags
* FUSE_EXPIRE_ONLY
*/
#define FUSE_EXPIRE_ONLY (1 << 0)
/**
* extension type
* FUSE_MAX_NR_SECCTX: maximum value of &fuse_secctx_header.nr_secctx
* FUSE_EXT_GROUPS: &fuse_supp_groups extension
*/
enum fuse_ext_type {
/* Types 0..31 are reserved for fuse_secctx_header */
FUSE_MAX_NR_SECCTX = 31,
FUSE_EXT_GROUPS = 32,
};
enum fuse_opcode {
FUSE_LOOKUP = 1,
FUSE_FORGET = 2, /* no reply */
@ -874,7 +905,8 @@ struct fuse_in_header {
uint32_t uid;
uint32_t gid;
uint32_t pid;
uint32_t padding;
uint16_t total_extlen; /* length of extensions in 8byte units */
uint16_t padding;
};
struct fuse_out_header {
@ -919,7 +951,7 @@ struct fuse_notify_inval_inode_out {
struct fuse_notify_inval_entry_out {
uint64_t parent;
uint32_t namelen;
uint32_t padding;
uint32_t flags;
};
struct fuse_notify_delete_out {
@ -1035,4 +1067,27 @@ struct fuse_secctx_header {
uint32_t nr_secctx;
};
/**
* struct fuse_ext_header - extension header
* @size: total size of this extension including this header
* @type: type of extension
*
* This is made compatible with fuse_secctx_header by using type values >
* FUSE_MAX_NR_SECCTX
*/
struct fuse_ext_header {
uint32_t size;
uint32_t type;
};
/**
* struct fuse_supp_groups - Supplementary group extension
* @nr_groups: number of supplementary groups
* @groups: flexible array of group IDs
*/
struct fuse_supp_groups {
uint32_t nr_groups;
uint32_t groups[];
};
#endif /* _LINUX_FUSE_H */

29
libfuse/lib/fuse_lowlevel.c

@ -261,18 +261,20 @@ fill_entry(struct fuse_entry_out *arg,
static
void
fill_open(struct fuse_open_out *arg,
const fuse_file_info_t *f)
{
arg->fh = f->fh;
if(f->direct_io)
arg->open_flags |= FOPEN_DIRECT_IO;
if(f->keep_cache)
arg->open_flags |= FOPEN_KEEP_CACHE;
if(f->nonseekable)
arg->open_flags |= FOPEN_NONSEEKABLE;
if(f->cache_readdir)
arg->open_flags |= FOPEN_CACHE_DIR;
fill_open(struct fuse_open_out *arg_,
const fuse_file_info_t *ffi_)
{
arg_->fh = ffi_->fh;
if(ffi_->direct_io)
arg_->open_flags |= FOPEN_DIRECT_IO;
if(ffi_->keep_cache)
arg_->open_flags |= FOPEN_KEEP_CACHE;
if(ffi_->nonseekable)
arg_->open_flags |= FOPEN_NONSEEKABLE;
if(ffi_->cache_readdir)
arg_->open_flags |= FOPEN_CACHE_DIR;
if(ffi_->parallel_direct_writes)
arg_->open_flags |= FOPEN_PARALLEL_DIRECT_WRITES;
}
int
@ -1416,7 +1418,8 @@ fuse_lowlevel_notify_inval_entry(struct fuse_chan *ch,
outarg.parent = parent;
outarg.namelen = namelen;
outarg.padding = 0;
// TODO: Add ability to set `flags`
outarg.flags = 0;
iov[1].iov_base = &outarg;
iov[1].iov_len = sizeof(outarg);

6
man/mergerfs.1

@ -377,6 +377,10 @@ to enable page caching for when \f[C]cache.files=per-process\f[R].
\f[B]cache.readdir=BOOL\f[R]: Cache readdir (if supported by kernel)
(default: false)
.IP \[bu] 2
\f[B]parallel-direct-writes=BOOL\f[R]: Allow the kernel to dispatch
multiple, parallel (non-extending) write requests for files opened with
\f[C]direct_io=true\f[R] (if supported by the kernel)
.IP \[bu] 2
\f[B]direct_io\f[R]: deprecated - Bypass page cache.
Use \f[C]cache.files=off\f[R] instead.
(default: false)
@ -1902,6 +1906,8 @@ increase cache timeouts \f[C]cache.attr\f[R], \f[C]cache.entry\f[R],
.IP \[bu] 2
enable (or disable) page caching (\f[C]cache.files\f[R])
.IP \[bu] 2
enable \f[C]parallel-direct-writes\f[R]
.IP \[bu] 2
enable \f[C]cache.writeback\f[R]
.IP \[bu] 2
enable \f[C]cache.statfs\f[R]

2
src/config.cpp

@ -107,6 +107,7 @@ Config::Config()
moveonenospc(false),
nfsopenhack(NFSOpenHack::ENUM::OFF),
nullrw(false),
parallel_direct_writes(false),
pid(::getpid()),
posix_acl(false),
readahead(0),
@ -182,6 +183,7 @@ Config::Config()
_map["nfsopenhack"] = &nfsopenhack;
_map["nullrw"] = &nullrw;
_map["pid"] = &pid;
_map["parallel-direct-writes"] = &parallel_direct_writes;
_map["pin-threads"] = &fuse_pin_threads;
_map["posix_acl"] = &posix_acl;
_map["readahead"] = &readahead;

2
src/config.hpp

@ -131,6 +131,7 @@ public:
MoveOnENOSPC moveonenospc;
NFSOpenHack nfsopenhack;
ConfigBOOL nullrw;
ConfigBOOL parallel_direct_writes;
ConfigUINT64 pid;
ConfigBOOL posix_acl;
ConfigUINT64 readahead;
@ -151,7 +152,6 @@ public:
ConfigBOOL writeback_cache;
XAttr xattr;
private:
bool _initialized;

12
src/fileinfo.hpp

@ -18,19 +18,25 @@
#include "fh.hpp"
#include <cstdint>
#include <string>
#include <mutex>
class FileInfo : public FH
{
public:
FileInfo(const int fd_,
const char *fusepath_)
FileInfo(int const fd_,
char const *fusepath_,
bool const direct_io_)
: FH(fusepath_),
fd(fd_)
fd(fd_),
direct_io(direct_io_)
{
}
public:
int fd;
uint32_t direct_io:1;
std::mutex mutex;
};

10
src/fs_copydata_readwrite.cpp

@ -57,8 +57,7 @@ namespace l
int
copydata_readwrite(const int src_fd_,
const int dst_fd_,
const size_t count_,
const size_t blocksize_)
const size_t count_)
{
ssize_t nr;
ssize_t nw;
@ -66,11 +65,9 @@ namespace l
size_t totalwritten;
vector<char> buf;
bufsize = (blocksize_ * 16);
bufsize = (1024 * 1024);
buf.resize(bufsize);
fs::lseek(src_fd_,0,SEEK_SET);
totalwritten = 0;
while(totalwritten < count_)
{
@ -108,7 +105,6 @@ namespace fs
return l::copydata_readwrite(src_fd_,
dst_fd_,
st.st_size,
st.st_blksize);
st.st_size);
}
}

38
src/fs_dup2.hpp

@ -0,0 +1,38 @@
/*
ISC License
Copyright (c) 2023, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include <unistd.h>
namespace fs
{
static
inline
int
dup2(int const oldfd_,
int const newfd_)
{
int rv;
rv = ::dup2(oldfd_,newfd_);
return ((rv == -1) ? -errno : rv);
}
}

2
src/fs_mktemp.cpp

@ -59,7 +59,7 @@ namespace fs
fd = -1;
count = MAX_ATTEMPTS;
flags = (flags_ | O_EXCL | O_CREAT);
flags = (flags_ | O_EXCL | O_CREAT | O_TRUNC);
while(count-- > 0)
{
tmppath = generate_tmp_path(*base_);

148
src/fs_movefile.cpp

@ -44,6 +44,21 @@ using std::string;
using std::vector;
static
int
cleanup_flags(const int flags_)
{
int rv;
rv = flags_;
rv = (rv & ~O_TRUNC);
rv = (rv & ~O_CREAT);
rv = (rv & ~O_EXCL);
return rv;
}
namespace l
{
static
@ -51,82 +66,97 @@ namespace l
movefile(const Policy::Create &createFunc_,
const Branches::CPtr &branches_,
const string &fusepath_,
int *origfd_)
int origfd_)
{
int rv;
int fdin;
int fdout;
int fdin_flags;
int64_t fdin_size;
int srcfd;
int dstfd;
int dstfd_flags;
int origfd_flags;
int64_t srcfd_size;
string fusedir;
string fdin_path;
string fdout_temp;
vector<string> fdout_path;
fdin = *origfd_;
string srcfd_branch;
string srcfd_filepath;
string dstfd_filepath;
string dstfd_tmp_filepath;
vector<string> dstfd_branch;
fdin_flags = fs::getfl(fdin);
if(fdin_flags == -1)
return -1;
srcfd = -1;
dstfd = -1;
rv = fs::findonfs(branches_,fusepath_,fdin,&fdin_path);
rv = fs::findonfs(branches_,fusepath_,origfd_,&srcfd_branch);
if(rv == -1)
return -1;
return -errno;
rv = createFunc_(branches_,fusepath_,&fdout_path);
rv = createFunc_(branches_,fusepath_,&dstfd_branch);
if(rv == -1)
return -1;
return -errno;
origfd_flags = fs::getfl(origfd_);
if(origfd_flags == -1)
return -errno;
fdin_size = fs::file_size(fdin);
if(fdin_size == -1)
return -1;
srcfd_size = fs::file_size(origfd_);
if(srcfd_size == -1)
return -errno;
if(fs::has_space(fdout_path[0],fdin_size) == false)
return (errno=ENOSPC,-1);
if(fs::has_space(dstfd_branch[0],srcfd_size) == false)
return -ENOSPC;
fusedir = fs::path::dirname(fusepath_);
rv = fs::clonepath(fdin_path,fdout_path[0],fusedir);
rv = fs::clonepath(srcfd_branch,dstfd_branch[0],fusedir);
if(rv == -1)
return -1;
fs::path::append(fdin_path,fusepath_);
fdin = fs::open(fdin_path,O_RDONLY);
if(fdin == -1)
return -1;
return -ENOSPC;
srcfd_filepath = srcfd_branch;
fs::path::append(srcfd_filepath,fusepath_);
srcfd = fs::open(srcfd_filepath,O_RDONLY);
if(srcfd == -1)
return -ENOSPC;
dstfd_filepath = dstfd_branch[0];
fs::path::append(dstfd_filepath,fusepath_);
dstfd_tmp_filepath = dstfd_filepath;
dstfd = fs::mktemp(&dstfd_tmp_filepath,O_WRONLY);
if(dstfd == -1)
{
fs::close(srcfd);
return -ENOSPC;
}
fs::path::append(fdout_path[0],fusepath_);
fdout_temp = fdout_path[0];
fdout = fs::mktemp(&fdout_temp,fdin_flags);
if(fdout == -1)
return -1;
rv = fs::clonefile(srcfd,dstfd);
if(rv == -1)
{
fs::close(srcfd);
fs::close(dstfd);
fs::unlink(dstfd_tmp_filepath);
return -ENOSPC;
}
rv = fs::clonefile(fdin,fdout);
rv = fs::rename(dstfd_tmp_filepath,dstfd_filepath);
if(rv == -1)
goto cleanup;
{
fs::close(srcfd);
fs::close(dstfd);
fs::unlink(dstfd_tmp_filepath);
return -ENOSPC;
}
rv = fs::rename(fdout_temp,fdout_path[0]);
fs::close(srcfd);
fs::close(dstfd);
dstfd_flags = ::cleanup_flags(origfd_flags);
rv = fs::open(dstfd_filepath,dstfd_flags);
if(rv == -1)
goto cleanup;
// should we care if it fails?
fs::unlink(fdin_path);
std::swap(*origfd_,fdout);
fs::close(fdin);
fs::close(fdout);
return 0;
cleanup:
rv = errno;
if(fdin != -1)
fs::close(fdin);
if(fdout != -1)
fs::close(fdout);
fs::unlink(fdout_temp);
errno = rv;
return -1;
{
fs::unlink(dstfd_tmp_filepath);
return -ENOSPC;
}
fs::unlink(srcfd_filepath);
return rv;
}
}
@ -136,7 +166,7 @@ namespace fs
movefile(const Policy::Create &policy_,
const Branches::CPtr &basepaths_,
const string &fusepath_,
int *origfd_)
int origfd_)
{
return l::movefile(policy_,basepaths_,fusepath_,origfd_);
}
@ -145,7 +175,7 @@ namespace fs
movefile_as_root(const Policy::Create &policy_,
const Branches::CPtr &basepaths_,
const string &fusepath_,
int *origfd_)
int origfd_)
{
const ugid::Set ugid(0,0);

4
src/fs_movefile.hpp

@ -28,11 +28,11 @@ namespace fs
movefile(const Policy::Create &policy,
const Branches::CPtr &branches,
const std::string &fusepath,
int *origfd);
int origfd);
int
movefile_as_root(const Policy::Create &policy,
const Branches::CPtr &branches,
const std::string &fusepath,
int *origfd);
int origfd);
}

59
src/fs_preadn.hpp

@ -0,0 +1,59 @@
/*
ISC License
Copyright (c) 2023, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include "fs_pread.hpp"
namespace fs
{
static
inline
ssize_t
preadn(int const fd_,
void *buf_,
size_t const count_,
off_t const offset_,
int *err_)
{
ssize_t rv;
ssize_t count = count_;
off_t offset = offset_;
char const *buf = (char const *)buf_;
*err_ = 0;
while(count > 0)
{
rv = fs::pread(fd_,buf,count,offset);
if(rv == 0)
return (count_ - count);
if(rv < 0)
{
*err_ = rv;
return (count_ - count);
}
buf += rv;
count -= rv;
offset += rv;
}
return count_;
}
}

42
src/fs_pwrite.hpp

@ -0,0 +1,42 @@
/*
ISC License
Copyright (c) 2016, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include <unistd.h>
namespace fs
{
static
inline
ssize_t
pwrite(int const fd_,
void const *buf_,
size_t const count_,
off_t const offset_)
{
ssize_t rv;
rv = ::pwrite(fd_,buf_,count_,offset_);
if(rv == -1)
return -errno;
return rv;
}
}

59
src/fs_pwriten.hpp

@ -0,0 +1,59 @@
/*
ISC License
Copyright (c) 2023, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include "fs_pwrite.hpp"
namespace fs
{
static
inline
ssize_t
pwriten(const int fd_,
const void *buf_,
const size_t count_,
const off_t offset_,
int *err_)
{
ssize_t rv;
ssize_t count = count_;
off_t offset = offset_;
char const *buf = (char const *)buf_;
*err_ = 0;
while(count > 0)
{
rv = fs::pwrite(fd_,buf,count,offset);
if(rv == 0)
return (count_ - count);
if(rv < 0)
{
*err_ = rv;
return (count_ - count);
}
buf += rv;
count -= rv;
offset += rv;
}
return count_;
}
}

11
src/fs_write.hpp

@ -32,15 +32,4 @@ namespace fs
{
return ::write(fd_,buf_,count_);
}
static
inline
ssize_t
pwrite(const int fd_,
const void *buf_,
const size_t count_,
const off_t offset_)
{
return ::pwrite(fd_,buf_,count_,offset_);
}
}

35
src/fuse_create.cpp

@ -102,6 +102,9 @@ namespace l
}
break;
}
if(cfg_->parallel_direct_writes == true)
ffi_->parallel_direct_writes = ffi_->direct_io;
}
static
@ -121,21 +124,23 @@ namespace l
int
create_core(const std::string &createpath_,
const char *fusepath_,
fuse_file_info_t *ffi_,
const mode_t mode_,
const mode_t umask_,
const int flags_,
uint64_t *fh_)
const mode_t umask_)
{
int rv;
FileInfo *fi;
std::string fullpath;
fullpath = fs::path::make(createpath_,fusepath_);
rv = l::create_core(fullpath,mode_,umask_,flags_);
rv = l::create_core(fullpath,mode_,umask_,ffi_->flags);
if(rv == -1)
return -errno;
*fh_ = reinterpret_cast<uint64_t>(new FileInfo(rv,fusepath_));
fi = new FileInfo(rv,fusepath_,ffi_->direct_io);
ffi_->fh = reinterpret_cast<uint64_t>(fi);
return 0;
}
@ -146,10 +151,9 @@ namespace l
const Policy::Create &createFunc_,
const Branches &branches_,
const char *fusepath_,
fuse_file_info_t *ffi_,
const mode_t mode_,
const mode_t umask_,
const int flags_,
uint64_t *fh_)
const mode_t umask_)
{
int rv;
std::string fullpath;
@ -173,10 +177,9 @@ namespace l
return l::create_core(createpaths[0],
fusepath_,
ffi_,
mode_,
umask_,
flags_,
fh_);
umask_);
}
}
@ -187,6 +190,7 @@ namespace FUSE
mode_t mode_,
fuse_file_info_t *ffi_)
{
int rv;
Config::Read cfg;
const fuse_context *fc = fuse_get_context();
const ugid::Set ugid(fc->uid,fc->gid);
@ -196,13 +200,14 @@ namespace FUSE
if(cfg->writeback_cache)
l::tweak_flags_writeback_cache(&ffi_->flags);
return l::create(cfg->func.getattr.policy,
rv = l::create(cfg->func.getattr.policy,
cfg->func.create.policy,
cfg->branches,
fusepath_,
ffi_,
mode_,
fc->umask,
ffi_->flags,
&ffi_->fh);
fc->umask);
return rv;
}
}

36
src/fuse_open.cpp

@ -165,32 +165,37 @@ namespace l
}
break;
}
if(cfg_->parallel_direct_writes == true)
ffi_->parallel_direct_writes = ffi_->direct_io;
}
static
int
open_core(const std::string &basepath_,
const char *fusepath_,
const int flags_,
fuse_file_info_t *ffi_,
const bool link_cow_,
const NFSOpenHack nfsopenhack_,
uint64_t *fh_)
const NFSOpenHack nfsopenhack_)
{
int fd;
FileInfo *fi;
std::string fullpath;
fullpath = fs::path::make(basepath_,fusepath_);
if(link_cow_ && fs::cow::is_eligible(fullpath.c_str(),flags_))
if(link_cow_ && fs::cow::is_eligible(fullpath.c_str(),ffi_->flags))
fs::cow::break_link(fullpath.c_str());
fd = fs::open(fullpath,flags_);
fd = fs::open(fullpath,ffi_->flags);
if((fd == -1) && (errno == EACCES))
fd = l::nfsopenhack(fullpath,flags_,nfsopenhack_);
fd = l::nfsopenhack(fullpath,ffi_->flags,nfsopenhack_);
if(fd == -1)
return -errno;
*fh_ = reinterpret_cast<uint64_t>(new FileInfo(fd,fusepath_));
fi = new FileInfo(fd,fusepath_,ffi_->direct_io);
ffi_->fh = reinterpret_cast<uint64_t>(fi);
return 0;
}
@ -200,10 +205,9 @@ namespace l
open(const Policy::Search &searchFunc_,
const Branches &branches_,
const char *fusepath_,
const int flags_,
fuse_file_info_t *ffi_,
const bool link_cow_,
const NFSOpenHack nfsopenhack_,
uint64_t *fh_)
const NFSOpenHack nfsopenhack_)
{
int rv;
StrVec basepaths;
@ -212,7 +216,7 @@ namespace l
if(rv == -1)
return -errno;
return l::open_core(basepaths[0],fusepath_,flags_,link_cow_,nfsopenhack_,fh_);
return l::open_core(basepaths[0],fusepath_,ffi_,link_cow_,nfsopenhack_);
}
}
@ -222,6 +226,7 @@ namespace FUSE
open(const char *fusepath_,
fuse_file_info_t *ffi_)
{
int rv;
Config::Read cfg;
const fuse_context *fc = fuse_get_context();
const ugid::Set ugid(fc->uid,fc->gid);
@ -231,12 +236,13 @@ namespace FUSE
if(cfg->writeback_cache)
l::tweak_flags_writeback_cache(&ffi_->flags);
return l::open(cfg->func.open.policy,
rv = l::open(cfg->func.open.policy,
cfg->branches,
fusepath_,
ffi_->flags,
ffi_,
cfg->link_cow,
cfg->nfsopenhack,
&ffi_->fh);
cfg->nfsopenhack);
return rv;
}
}

26
src/fuse_read.cpp

@ -23,14 +23,26 @@
#include <stdlib.h>
#include <string.h>
typedef struct fuse_bufvec fuse_bufvec;
namespace l
{
static
int
read(const int fd_,
read_direct_io(const int fd_,
char *buf_,
const size_t size_,
const off_t offset_)
{
int rv;
rv = fs::pread(fd_,buf_,size_,offset_);
return rv;
}
static
int
read_cached(const int fd_,
char *buf_,
const size_t size_,
const off_t offset_)
@ -53,10 +65,10 @@ namespace FUSE
{
FileInfo *fi = reinterpret_cast<FileInfo*>(ffi_->fh);
return l::read(fi->fd,
buf_,
size_,
offset_);
if(fi->direct_io)
return l::read_direct_io(fi->fd,buf_,size_,offset_);
return l::read_cached(fi->fd,buf_,size_,offset_);
}
int

167
src/fuse_write.cpp

@ -17,8 +17,11 @@
#include "config.hpp"
#include "errno.hpp"
#include "fileinfo.hpp"
#include "fs_close.hpp"
#include "fs_dup2.hpp"
#include "fs_movefile.hpp"
#include "fs_write.hpp"
#include "fs_pwrite.hpp"
#include "fs_pwriten.hpp"
#include "ugid.hpp"
#include "fuse.h"
@ -36,89 +39,157 @@ namespace l
{
static
bool
out_of_space(const int error_)
out_of_space(const ssize_t error_)
{
return ((error_ == ENOSPC) ||
(error_ == EDQUOT));
return ((error_ == -ENOSPC) ||
(error_ == -EDQUOT));
}
static
int
write_regular(const int fd_,
const void *buf_,
move_and_pwrite(const char *buf_,
const size_t count_,
const off_t offset_)
const off_t offset_,
FileInfo *fi_,
int err_)
{
int rv;
int err;
ssize_t rv;
Config::Read cfg;
rv = fs::pwrite(fd_,buf_,count_,offset_);
if(rv == -1)
return -errno;
if(rv == 0)
return 0;
if(cfg->moveonenospc.enabled == false)
return err_;
return count_;
rv = fs::movefile_as_root(cfg->moveonenospc.policy,
cfg->branches,
fi_->fusepath,
fi_->fd);
if(rv < 0)
return err_;
err = fs::dup2(rv,fi_->fd);
fs::close(rv);
if(err < 0)
return err;
return fs::pwrite(fi_->fd,buf_,count_,offset_);
}
static
int
move_and_pwriten(char const *buf_,
size_t const count_,
off_t const offset_,
FileInfo *fi_,
ssize_t const err_,
ssize_t const written_)
{
int err;
ssize_t rv;
Config::Read cfg;
if(cfg->moveonenospc.enabled == false)
return err_;
rv = fs::movefile_as_root(cfg->moveonenospc.policy,
cfg->branches,
fi_->fusepath,
fi_->fd);
if(rv < 0)
return err_;
err = fs::dup2(rv,fi_->fd);
fs::close(rv);
if(err < 0)
return err;
rv = fs::pwriten(fi_->fd,
buf_ + written_,
count_ - written_,
offset_ + written_,
&err);
if(err < 0)
return err;
return (rv + written_);
}
// When in direct_io mode write's return value should match that of
// the operation.
// 0 on EOF
// N bytes written (short writes included)
// -errno on error
// See libfuse/include/fuse.h for more details
static
int
write_direct_io(const int fd_,
const void *buf_,
write_direct_io(const char *buf_,
const size_t count_,
const off_t offset_)
const off_t offset_,
FileInfo *fi_)
{
int rv;
ssize_t rv;
rv = fs::pwrite(fd_,buf_,count_,offset_);
if(rv == -1)
return -errno;
rv = fs::pwrite(fi_->fd,buf_,count_,offset_);
if(l::out_of_space(rv))
rv = l::move_and_pwrite(buf_,count_,offset_,fi_,rv);
return rv;
}
// When not in direct_io mode write's return value is more complex.
// 0 or less than `count` on EOF
// `count` on non-errors
// -errno on error
// See libfuse/include/fuse.h for more details
static
int
move_and_write(WriteFunc func_,
const char *buf_,
write_cached(const char *buf_,
const size_t count_,
const off_t offset_,
FileInfo *fi_,
int err_)
FileInfo *fi_)
{
int rv;
Config::Read cfg;
int err;
ssize_t rv;
if(cfg->moveonenospc.enabled == false)
return err_;
rv = fs::pwriten(fi_->fd,buf_,count_,offset_,&err);
if(rv == (ssize_t)count_)
return count_;
if(rv == 0)
return 0;
if(err && !l::out_of_space(err))
return err;
rv = fs::movefile_as_root(cfg->moveonenospc.policy,
cfg->branches,
fi_->fusepath,
&fi_->fd);
if(rv == -1)
return err_;
rv = l::move_and_pwriten(buf_,count_,offset_,fi_,err,rv);
return func_(fi_->fd,buf_,count_,offset_);
return rv;
}
static
int
write(const fuse_file_info_t *ffi_,
WriteFunc func_,
const char *buf_,
const size_t count_,
const off_t offset_)
{
int rv;
FileInfo* fi;
FileInfo *fi;
fi = reinterpret_cast<FileInfo*>(ffi_->fh);
rv = func_(fi->fd,buf_,count_,offset_);
if(l::out_of_space(-rv))
rv = l::move_and_write(func_,buf_,count_,offset_,fi,rv);
return rv;
// Concurrent writes can only happen if:
// 1) writeback-cache is enabled and using page caching
// 2) parallel_direct_writes is enabled and file has `direct_io=true`
// Will look into selectively locking in the future
// A reader/writer lock would probably be the best option given
// the expense of the write itself in comparison. Alternatively,
// could change the move file behavior to use a known target file
// and have threads use O_EXCL and back off and wait for the
// transfer to complete before retrying.
std::lock_guard<std::mutex> guard(fi->mutex);
if(fi->direct_io)
return l::write_direct_io(buf_,count_,offset_,fi);
return l::write_cached(buf_,count_,offset_,fi);
}
}
@ -130,13 +201,7 @@ namespace FUSE
size_t count_,
off_t offset_)
{
WriteFunc wf;
wf = ((ffi_->direct_io) ?
l::write_direct_io :
l::write_regular);
return l::write(ffi_,wf,buf_,count_,offset_);
return l::write(ffi_,buf_,count_,offset_);
}
int

Loading…
Cancel
Save