Browse Source

Fix read/write behavior and return value depending on direct_io

Also add parallel direct write option for 6.2+ kernels.
pull/1203/head
Antonio SJ Musumeci 2 years ago
parent
commit
6a14a10e6c
  1. 4
      README.md
  2. 2
      libfuse/include/fuse_common.h
  3. 61
      libfuse/include/fuse_kernel.h
  4. 29
      libfuse/lib/fuse_lowlevel.c
  5. 6
      man/mergerfs.1
  6. 2
      src/config.cpp
  7. 2
      src/config.hpp
  8. 12
      src/fileinfo.hpp
  9. 4
      src/fs_copydata_readwrite.cpp
  10. 38
      src/fs_dup2.hpp
  11. 2
      src/fs_movefile.cpp
  12. 59
      src/fs_preadn.hpp
  13. 42
      src/fs_pwrite.hpp
  14. 59
      src/fs_pwriten.hpp
  15. 11
      src/fs_write.hpp
  16. 35
      src/fuse_create.cpp
  17. 36
      src/fuse_open.cpp
  18. 26
      src/fuse_read.cpp
  19. 139
      src/fuse_write.cpp

4
README.md

@ -279,6 +279,9 @@ These options are the same regardless of whether you use them with the
(default: false)
* **cache.readdir=BOOL**: Cache readdir (if supported by kernel)
(default: false)
* **parallel-direct-writes=BOOL**: Allow the kernel to dispatch
multiple, parallel (non-extending) write requests for files opened
with `direct_io=true` (if supported by the kernel)
* **direct_io**: deprecated - Bypass page cache. Use `cache.files=off`
instead. (default: false)
* **kernel_cache**: deprecated - Do not invalidate data cache on file
@ -1461,6 +1464,7 @@ understand what behaviors it may impact
* disable `security_capability` and/or `xattr`
* increase cache timeouts `cache.attr`, `cache.entry`, `cache.negative_entry`
* enable (or disable) page caching (`cache.files`)
* enable `parallel-direct-writes`
* enable `cache.writeback`
* enable `cache.statfs`
* enable `cache.symlinks`

2
libfuse/include/fuse_common.h

@ -84,6 +84,8 @@ struct fuse_file_info_t
uint32_t auto_cache : 1;
uint32_t parallel_direct_writes:1;
/** File handle. May be filled in by filesystem in open().
Available in all other file operations */
uint64_t fh;

61
libfuse/include/fuse_kernel.h

@ -197,6 +197,15 @@
*
* 7.37
* - add FUSE_TMPFILE
*
* 7.38
* - add FUSE_EXPIRE_ONLY flag to fuse_notify_inval_entry
* - add FOPEN_PARALLEL_DIRECT_WRITES
* - add total_extlen to fuse_in_header
* - add FUSE_MAX_NR_SECCTX
* - add extension header
* - add FUSE_EXT_GROUPS
* - add FUSE_CREATE_SUPP_GROUP
*/
#ifndef _LINUX_FUSE_H
@ -232,7 +241,7 @@
#define FUSE_KERNEL_VERSION 7
/** Minor version number of this interface */
#define FUSE_KERNEL_MINOR_VERSION 37
#define FUSE_KERNEL_MINOR_VERSION 38
/** The node ID of the root inode */
#define FUSE_ROOT_ID 1
@ -304,6 +313,7 @@ struct fuse_file_lock {
* FOPEN_CACHE_DIR: allow caching this directory
* FOPEN_STREAM: the file is stream-like (no file position at all)
* FOPEN_NOFLUSH: don't flush data cache on close (unless FUSE_WRITEBACK_CACHE)
* FOPEN_PARALLEL_DIRECT_WRITES: Allow concurrent direct writes on the same inode
*/
#define FOPEN_DIRECT_IO (1 << 0)
#define FOPEN_KEEP_CACHE (1 << 1)
@ -311,6 +321,7 @@ struct fuse_file_lock {
#define FOPEN_CACHE_DIR (1 << 3)
#define FOPEN_STREAM (1 << 4)
#define FOPEN_NOFLUSH (1 << 5)
#define FOPEN_PARALLEL_DIRECT_WRITES (1 << 6)
/**
* INIT request/reply flags
@ -356,6 +367,8 @@ struct fuse_file_lock {
* FUSE_SECURITY_CTX: add security context to create, mkdir, symlink, and
* mknod
* FUSE_HAS_INODE_DAX: use per inode DAX
* FUSE_CREATE_SUPP_GROUP: add supplementary group info to create, mkdir,
* symlink and mknod (single group that matches parent)
*/
#define FUSE_ASYNC_READ (1 << 0)
#define FUSE_POSIX_LOCKS (1 << 1)
@ -392,6 +405,7 @@ struct fuse_file_lock {
/* bits 32..63 get shifted down 32 bits into the flags2 field */
#define FUSE_SECURITY_CTX (1ULL << 32)
#define FUSE_HAS_INODE_DAX (1ULL << 33)
#define FUSE_CREATE_SUPP_GROUP (1ULL << 34)
/**
* CUSE INIT request/reply flags
@ -491,6 +505,23 @@ struct fuse_file_lock {
*/
#define FUSE_SETXATTR_ACL_KILL_SGID (1 << 0)
/**
* notify_inval_entry flags
* FUSE_EXPIRE_ONLY
*/
#define FUSE_EXPIRE_ONLY (1 << 0)
/**
* extension type
* FUSE_MAX_NR_SECCTX: maximum value of &fuse_secctx_header.nr_secctx
* FUSE_EXT_GROUPS: &fuse_supp_groups extension
*/
enum fuse_ext_type {
/* Types 0..31 are reserved for fuse_secctx_header */
FUSE_MAX_NR_SECCTX = 31,
FUSE_EXT_GROUPS = 32,
};
enum fuse_opcode {
FUSE_LOOKUP = 1,
FUSE_FORGET = 2, /* no reply */
@ -874,7 +905,8 @@ struct fuse_in_header {
uint32_t uid;
uint32_t gid;
uint32_t pid;
uint32_t padding;
uint16_t total_extlen; /* length of extensions in 8byte units */
uint16_t padding;
};
struct fuse_out_header {
@ -919,7 +951,7 @@ struct fuse_notify_inval_inode_out {
struct fuse_notify_inval_entry_out {
uint64_t parent;
uint32_t namelen;
uint32_t padding;
uint32_t flags;
};
struct fuse_notify_delete_out {
@ -1035,4 +1067,27 @@ struct fuse_secctx_header {
uint32_t nr_secctx;
};
/**
* struct fuse_ext_header - extension header
* @size: total size of this extension including this header
* @type: type of extension
*
* This is made compatible with fuse_secctx_header by using type values >
* FUSE_MAX_NR_SECCTX
*/
struct fuse_ext_header {
uint32_t size;
uint32_t type;
};
/**
* struct fuse_supp_groups - Supplementary group extension
* @nr_groups: number of supplementary groups
* @groups: flexible array of group IDs
*/
struct fuse_supp_groups {
uint32_t nr_groups;
uint32_t groups[];
};
#endif /* _LINUX_FUSE_H */

29
libfuse/lib/fuse_lowlevel.c

@ -261,18 +261,20 @@ fill_entry(struct fuse_entry_out *arg,
static
void
fill_open(struct fuse_open_out *arg,
const fuse_file_info_t *f)
{
arg->fh = f->fh;
if(f->direct_io)
arg->open_flags |= FOPEN_DIRECT_IO;
if(f->keep_cache)
arg->open_flags |= FOPEN_KEEP_CACHE;
if(f->nonseekable)
arg->open_flags |= FOPEN_NONSEEKABLE;
if(f->cache_readdir)
arg->open_flags |= FOPEN_CACHE_DIR;
fill_open(struct fuse_open_out *arg_,
const fuse_file_info_t *ffi_)
{
arg_->fh = ffi_->fh;
if(ffi_->direct_io)
arg_->open_flags |= FOPEN_DIRECT_IO;
if(ffi_->keep_cache)
arg_->open_flags |= FOPEN_KEEP_CACHE;
if(ffi_->nonseekable)
arg_->open_flags |= FOPEN_NONSEEKABLE;
if(ffi_->cache_readdir)
arg_->open_flags |= FOPEN_CACHE_DIR;
if(ffi_->parallel_direct_writes)
arg_->open_flags |= FOPEN_PARALLEL_DIRECT_WRITES;
}
int
@ -1416,7 +1418,8 @@ fuse_lowlevel_notify_inval_entry(struct fuse_chan *ch,
outarg.parent = parent;
outarg.namelen = namelen;
outarg.padding = 0;
// TODO: Add ability to set `flags`
outarg.flags = 0;
iov[1].iov_base = &outarg;
iov[1].iov_len = sizeof(outarg);

6
man/mergerfs.1

@ -377,6 +377,10 @@ to enable page caching for when \f[C]cache.files=per-process\f[R].
\f[B]cache.readdir=BOOL\f[R]: Cache readdir (if supported by kernel)
(default: false)
.IP \[bu] 2
\f[B]parallel-direct-writes=BOOL\f[R]: Allow the kernel to dispatch
multiple, parallel (non-extending) write requests for files opened with
\f[C]direct_io=true\f[R] (if supported by the kernel)
.IP \[bu] 2
\f[B]direct_io\f[R]: deprecated - Bypass page cache.
Use \f[C]cache.files=off\f[R] instead.
(default: false)
@ -1902,6 +1906,8 @@ increase cache timeouts \f[C]cache.attr\f[R], \f[C]cache.entry\f[R],
.IP \[bu] 2
enable (or disable) page caching (\f[C]cache.files\f[R])
.IP \[bu] 2
enable \f[C]parallel-direct-writes\f[R]
.IP \[bu] 2
enable \f[C]cache.writeback\f[R]
.IP \[bu] 2
enable \f[C]cache.statfs\f[R]

2
src/config.cpp

@ -107,6 +107,7 @@ Config::Config()
moveonenospc(false),
nfsopenhack(NFSOpenHack::ENUM::OFF),
nullrw(false),
parallel_direct_writes(false),
pid(::getpid()),
posix_acl(false),
readahead(0),
@ -182,6 +183,7 @@ Config::Config()
_map["nfsopenhack"] = &nfsopenhack;
_map["nullrw"] = &nullrw;
_map["pid"] = &pid;
_map["parallel-direct-writes"] = &parallel_direct_writes;
_map["pin-threads"] = &fuse_pin_threads;
_map["posix_acl"] = &posix_acl;
_map["readahead"] = &readahead;

2
src/config.hpp

@ -131,6 +131,7 @@ public:
MoveOnENOSPC moveonenospc;
NFSOpenHack nfsopenhack;
ConfigBOOL nullrw;
ConfigBOOL parallel_direct_writes;
ConfigUINT64 pid;
ConfigBOOL posix_acl;
ConfigUINT64 readahead;
@ -151,7 +152,6 @@ public:
ConfigBOOL writeback_cache;
XAttr xattr;
private:
bool _initialized;

12
src/fileinfo.hpp

@ -18,19 +18,25 @@
#include "fh.hpp"
#include <cstdint>
#include <string>
#include <mutex>
class FileInfo : public FH
{
public:
FileInfo(const int fd_,
const char *fusepath_)
FileInfo(int const fd_,
char const *fusepath_,
bool const direct_io_)
: FH(fusepath_),
fd(fd_)
fd(fd_),
direct_io(direct_io_)
{
}
public:
int fd;
uint32_t direct_io:1;
std::mutex mutex;
};

4
src/fs_copydata_readwrite.cpp

@ -65,11 +65,9 @@ namespace l
size_t totalwritten;
vector<char> buf;
bufsize = (128 * 1024);
bufsize = (1024 * 1024);
buf.resize(bufsize);
fs::lseek(src_fd_,0,SEEK_SET);
totalwritten = 0;
while(totalwritten < count_)
{

38
src/fs_dup2.hpp

@ -0,0 +1,38 @@
/*
ISC License
Copyright (c) 2023, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include <unistd.h>
namespace fs
{
static
inline
int
dup2(int const oldfd_,
int const newfd_)
{
int rv;
rv = ::dup2(oldfd_,newfd_);
return ((rv == -1) ? -errno : rv);
}
}

2
src/fs_movefile.cpp

@ -156,8 +156,6 @@ namespace l
fs::unlink(srcfd_filepath);
fs::close(origfd_);
return rv;
}
}

59
src/fs_preadn.hpp

@ -0,0 +1,59 @@
/*
ISC License
Copyright (c) 2023, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include "fs_pread.hpp"
namespace fs
{
static
inline
ssize_t
preadn(int const fd_,
void *buf_,
size_t const count_,
off_t const offset_,
int *err_)
{
ssize_t rv;
ssize_t count = count_;
off_t offset = offset_;
char const *buf = (char const *)buf_;
*err_ = 0;
while(count > 0)
{
rv = fs::pread(fd_,buf,count,offset);
if(rv == 0)
return (count_ - count);
if(rv < 0)
{
*err_ = rv;
return (count_ - count);
}
buf += rv;
count -= rv;
offset += rv;
}
return count_;
}
}

42
src/fs_pwrite.hpp

@ -0,0 +1,42 @@
/*
ISC License
Copyright (c) 2016, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include <unistd.h>
namespace fs
{
static
inline
ssize_t
pwrite(int const fd_,
void const *buf_,
size_t const count_,
off_t const offset_)
{
ssize_t rv;
rv = ::pwrite(fd_,buf_,count_,offset_);
if(rv == -1)
return -errno;
return rv;
}
}

59
src/fs_pwriten.hpp

@ -0,0 +1,59 @@
/*
ISC License
Copyright (c) 2023, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include "fs_pwrite.hpp"
namespace fs
{
static
inline
ssize_t
pwriten(const int fd_,
const void *buf_,
const size_t count_,
const off_t offset_,
int *err_)
{
ssize_t rv;
ssize_t count = count_;
off_t offset = offset_;
char const *buf = (char const *)buf_;
*err_ = 0;
while(count > 0)
{
rv = fs::pwrite(fd_,buf,count,offset);
if(rv == 0)
return (count_ - count);
if(rv < 0)
{
*err_ = rv;
return (count_ - count);
}
buf += rv;
count -= rv;
offset += rv;
}
return count_;
}
}

11
src/fs_write.hpp

@ -32,15 +32,4 @@ namespace fs
{
return ::write(fd_,buf_,count_);
}
static
inline
ssize_t
pwrite(const int fd_,
const void *buf_,
const size_t count_,
const off_t offset_)
{
return ::pwrite(fd_,buf_,count_,offset_);
}
}

35
src/fuse_create.cpp

@ -102,6 +102,9 @@ namespace l
}
break;
}
if(cfg_->parallel_direct_writes == true)
ffi_->parallel_direct_writes = ffi_->direct_io;
}
static
@ -121,21 +124,23 @@ namespace l
int
create_core(const std::string &createpath_,
const char *fusepath_,
fuse_file_info_t *ffi_,
const mode_t mode_,
const mode_t umask_,
const int flags_,
uint64_t *fh_)
const mode_t umask_)
{
int rv;
FileInfo *fi;
std::string fullpath;
fullpath = fs::path::make(createpath_,fusepath_);
rv = l::create_core(fullpath,mode_,umask_,flags_);
rv = l::create_core(fullpath,mode_,umask_,ffi_->flags);
if(rv == -1)
return -errno;
*fh_ = reinterpret_cast<uint64_t>(new FileInfo(rv,fusepath_));
fi = new FileInfo(rv,fusepath_,ffi_->direct_io);
ffi_->fh = reinterpret_cast<uint64_t>(fi);
return 0;
}
@ -146,10 +151,9 @@ namespace l
const Policy::Create &createFunc_,
const Branches &branches_,
const char *fusepath_,
fuse_file_info_t *ffi_,
const mode_t mode_,
const mode_t umask_,
const int flags_,
uint64_t *fh_)
const mode_t umask_)
{
int rv;
std::string fullpath;
@ -173,10 +177,9 @@ namespace l
return l::create_core(createpaths[0],
fusepath_,
ffi_,
mode_,
umask_,
flags_,
fh_);
umask_);
}
}
@ -187,6 +190,7 @@ namespace FUSE
mode_t mode_,
fuse_file_info_t *ffi_)
{
int rv;
Config::Read cfg;
const fuse_context *fc = fuse_get_context();
const ugid::Set ugid(fc->uid,fc->gid);
@ -196,13 +200,14 @@ namespace FUSE
if(cfg->writeback_cache)
l::tweak_flags_writeback_cache(&ffi_->flags);
return l::create(cfg->func.getattr.policy,
rv = l::create(cfg->func.getattr.policy,
cfg->func.create.policy,
cfg->branches,
fusepath_,
ffi_,
mode_,
fc->umask,
ffi_->flags,
&ffi_->fh);
fc->umask);
return rv;
}
}

36
src/fuse_open.cpp

@ -165,32 +165,37 @@ namespace l
}
break;
}
if(cfg_->parallel_direct_writes == true)
ffi_->parallel_direct_writes = ffi_->direct_io;
}
static
int
open_core(const std::string &basepath_,
const char *fusepath_,
const int flags_,
fuse_file_info_t *ffi_,
const bool link_cow_,
const NFSOpenHack nfsopenhack_,
uint64_t *fh_)
const NFSOpenHack nfsopenhack_)
{
int fd;
FileInfo *fi;
std::string fullpath;
fullpath = fs::path::make(basepath_,fusepath_);
if(link_cow_ && fs::cow::is_eligible(fullpath.c_str(),flags_))
if(link_cow_ && fs::cow::is_eligible(fullpath.c_str(),ffi_->flags))
fs::cow::break_link(fullpath.c_str());
fd = fs::open(fullpath,flags_);
fd = fs::open(fullpath,ffi_->flags);
if((fd == -1) && (errno == EACCES))
fd = l::nfsopenhack(fullpath,flags_,nfsopenhack_);
fd = l::nfsopenhack(fullpath,ffi_->flags,nfsopenhack_);
if(fd == -1)
return -errno;
*fh_ = reinterpret_cast<uint64_t>(new FileInfo(fd,fusepath_));
fi = new FileInfo(fd,fusepath_,ffi_->direct_io);
ffi_->fh = reinterpret_cast<uint64_t>(fi);
return 0;
}
@ -200,10 +205,9 @@ namespace l
open(const Policy::Search &searchFunc_,
const Branches &branches_,
const char *fusepath_,
const int flags_,
fuse_file_info_t *ffi_,
const bool link_cow_,
const NFSOpenHack nfsopenhack_,
uint64_t *fh_)
const NFSOpenHack nfsopenhack_)
{
int rv;
StrVec basepaths;
@ -212,7 +216,7 @@ namespace l
if(rv == -1)
return -errno;
return l::open_core(basepaths[0],fusepath_,flags_,link_cow_,nfsopenhack_,fh_);
return l::open_core(basepaths[0],fusepath_,ffi_,link_cow_,nfsopenhack_);
}
}
@ -222,6 +226,7 @@ namespace FUSE
open(const char *fusepath_,
fuse_file_info_t *ffi_)
{
int rv;
Config::Read cfg;
const fuse_context *fc = fuse_get_context();
const ugid::Set ugid(fc->uid,fc->gid);
@ -231,12 +236,13 @@ namespace FUSE
if(cfg->writeback_cache)
l::tweak_flags_writeback_cache(&ffi_->flags);
return l::open(cfg->func.open.policy,
rv = l::open(cfg->func.open.policy,
cfg->branches,
fusepath_,
ffi_->flags,
ffi_,
cfg->link_cow,
cfg->nfsopenhack,
&ffi_->fh);
cfg->nfsopenhack);
return rv;
}
}

26
src/fuse_read.cpp

@ -23,14 +23,26 @@
#include <stdlib.h>
#include <string.h>
typedef struct fuse_bufvec fuse_bufvec;
namespace l
{
static
int
read(const int fd_,
read_direct_io(const int fd_,
char *buf_,
const size_t size_,
const off_t offset_)
{
int rv;
rv = fs::pread(fd_,buf_,size_,offset_);
return rv;
}
static
int
read_cached(const int fd_,
char *buf_,
const size_t size_,
const off_t offset_)
@ -53,10 +65,10 @@ namespace FUSE
{
FileInfo *fi = reinterpret_cast<FileInfo*>(ffi_->fh);
return l::read(fi->fd,
buf_,
size_,
offset_);
if(fi->direct_io)
return l::read_direct_io(fi->fd,buf_,size_,offset_);
return l::read_cached(fi->fd,buf_,size_,offset_);
}
int

139
src/fuse_write.cpp

@ -17,8 +17,11 @@
#include "config.hpp"
#include "errno.hpp"
#include "fileinfo.hpp"
#include "fs_close.hpp"
#include "fs_dup2.hpp"
#include "fs_movefile.hpp"
#include "fs_write.hpp"
#include "fs_pwrite.hpp"
#include "fs_pwriten.hpp"
#include "ugid.hpp"
#include "fuse.h"
@ -36,37 +39,53 @@ namespace l
{
static
bool
out_of_space(const int error_)
out_of_space(const ssize_t error_)
{
return ((error_ == ENOSPC) ||
(error_ == EDQUOT));
return ((error_ == -ENOSPC) ||
(error_ == -EDQUOT));
}
static
int
write(const int fd_,
const void *buf_,
move_and_pwrite(const char *buf_,
const size_t count_,
const off_t offset_)
const off_t offset_,
FileInfo *fi_,
int err_)
{
int rv;
int err;
ssize_t rv;
Config::Read cfg;
rv = fs::pwrite(fd_,buf_,count_,offset_);
if(rv == -1)
return -errno;
if(cfg->moveonenospc.enabled == false)
return err_;
return rv;
rv = fs::movefile_as_root(cfg->moveonenospc.policy,
cfg->branches,
fi_->fusepath,
fi_->fd);
if(rv < 0)
return err_;
err = fs::dup2(rv,fi_->fd);
fs::close(rv);
if(err < 0)
return err;
return fs::pwrite(fi_->fd,buf_,count_,offset_);
}
static
int
move_and_write(const char *buf_,
const size_t count_,
const off_t offset_,
move_and_pwriten(char const *buf_,
size_t const count_,
off_t const offset_,
FileInfo *fi_,
int err_)
ssize_t const err_,
ssize_t const written_)
{
int rv;
int err;
ssize_t rv;
Config::Read cfg;
if(cfg->moveonenospc.enabled == false)
@ -79,9 +98,70 @@ namespace l
if(rv < 0)
return err_;
fi_->fd = rv;
err = fs::dup2(rv,fi_->fd);
fs::close(rv);
if(err < 0)
return err;
rv = fs::pwriten(fi_->fd,
buf_ + written_,
count_ - written_,
offset_ + written_,
&err);
if(err < 0)
return err;
return (rv + written_);
}
// When in direct_io mode write's return value should match that of
// the operation.
// 0 on EOF
// N bytes written (short writes included)
// -errno on error
// See libfuse/include/fuse.h for more details
static
int
write_direct_io(const char *buf_,
const size_t count_,
const off_t offset_,
FileInfo *fi_)
{
ssize_t rv;
rv = fs::pwrite(fi_->fd,buf_,count_,offset_);
if(l::out_of_space(rv))
rv = l::move_and_pwrite(buf_,count_,offset_,fi_,rv);
return rv;
}
// When not in direct_io mode write's return value is more complex.
// 0 or less than `count` on EOF
// `count` on non-errors
// -errno on error
// See libfuse/include/fuse.h for more details
static
int
write_cached(const char *buf_,
const size_t count_,
const off_t offset_,
FileInfo *fi_)
{
int err;
ssize_t rv;
return l::write(fi_->fd,buf_,count_,offset_);
rv = fs::pwriten(fi_->fd,buf_,count_,offset_,&err);
if(rv == (ssize_t)count_)
return count_;
if(rv == 0)
return 0;
if(err && !l::out_of_space(err))
return err;
rv = l::move_and_pwriten(buf_,count_,offset_,fi_,err,rv);
return rv;
}
static
@ -91,16 +171,25 @@ namespace l
const size_t count_,
const off_t offset_)
{
int rv;
FileInfo *fi;
fi = reinterpret_cast<FileInfo*>(ffi_->fh);
rv = l::write(fi->fd,buf_,count_,offset_);
if(l::out_of_space(-rv))
rv = l::move_and_write(buf_,count_,offset_,fi,rv);
return rv;
// Concurrent writes can only happen if:
// 1) writeback-cache is enabled and using page caching
// 2) parallel_direct_writes is enabled and file has `direct_io=true`
// Will look into selectively locking in the future
// A reader/writer lock would probably be the best option given
// the expense of the write itself in comparison. Alternatively,
// could change the move file behavior to use a known target file
// and have threads use O_EXCL and back off and wait for the
// transfer to complete before retrying.
std::lock_guard<std::mutex> guard(fi->mutex);
if(fi->direct_io)
return l::write_direct_io(buf_,count_,offset_,fi);
return l::write_cached(buf_,count_,offset_,fi);
}
}

Loading…
Cancel
Save