From 9200ecb00c902eecb7784a4a5ac92380215daf33 Mon Sep 17 00:00:00 2001 From: Antonio SJ Musumeci Date: Mon, 19 Jan 2026 15:16:01 -0600 Subject: [PATCH] Rework open file resource management --- src/fileinfo.hpp | 36 ++- src/fuse_create.cpp | 157 ++++++------- src/fuse_open.cpp | 302 +++++++++++++------------ src/fuse_release.cpp | 164 +++++++++----- src/fuse_release.hpp | 34 ++- src/state.hpp | 72 +++++- tests/TEST_io_passthrough_create_open | 103 +++++++++ tests/TEST_io_passthrough_open_race | 132 +++++++++++ vendored/libfuse/include/fuse.h | 10 + vendored/libfuse/lib/fuse.cpp | 5 +- vendored/libfuse/lib/fuse_lowlevel.cpp | 2 +- 11 files changed, 716 insertions(+), 301 deletions(-) create mode 100755 tests/TEST_io_passthrough_create_open create mode 100755 tests/TEST_io_passthrough_open_race diff --git a/src/fileinfo.hpp b/src/fileinfo.hpp index f30013b5..e905007a 100644 --- a/src/fileinfo.hpp +++ b/src/fileinfo.hpp @@ -16,6 +16,31 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ +/* + FILEINFO - OPEN FILE METADATA + ============================= + + This file defines the FileInfo class which stores metadata for an open + file. Each create and opened file get its own instance while the + first one of any set of concurrently open files will be stored in + open_files map for reference but always pass around through + fuse_file_info_t.fh. + + FUSE FILE HANDLE CONVERSION + --------------------------- + + FUSE uses a 64-bit file handle (fuse_file_info_t.fh). We convert between + FileInfo* and fh using simple pointer casting: + + - to_fh(FileInfo*): Casts pointer to uint64_t for FUSE + - from_fh(uint64_t): Casts back to FileInfo pointer + + This is safe because FileInfo objects remain valid until the last close. + + See fuse_open.cpp and fuse_release.cpp for how FileInfo is used in the + open/close flow. +*/ + #pragma once #include "assert.hpp" @@ -31,7 +56,8 @@ class FileInfo : public FH { public: - static FileInfo *from_fh(const u64 fh); + static FileInfo *from_fh(const u64); + static u64 to_fh(const FileInfo*); public: FileInfo(const int fd_, @@ -74,6 +100,14 @@ public: Mutex mutex; }; +inline +u64 +FileInfo::to_fh(const FileInfo *fi_) +{ + ASSERT_NOT_NULL(fi_); + return reinterpret_cast(fi_); +} + inline u64 FileInfo::to_fh() const diff --git a/src/fuse_create.cpp b/src/fuse_create.cpp index 6ebcbd3c..ac5dee63 100644 --- a/src/fuse_create.cpp +++ b/src/fuse_create.cpp @@ -16,6 +16,33 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ +/* + FILE CREATION WITH FUSE PASSTHROUGH SUPPORT + =========================================== + + This file implements FUSE create() which creates a new file and opens it. + Unlike fuse_open.cpp, this file does NOT use a retry loop because the + kernel guarantees serialization of create operations. + + WHY NO RETRY LOOP? + ------------------ + + The fuse_open.cpp retry loop handles the case where multiple threads race + to open an EXISTING file. With create(), the kernel serializes namespace + operations at the VFS layer - only one create for a given path can succeed. + + Additionally, when a file is created, FUSE allocates a completely new + nodeid (inode number). Since mergerfs never reuses nodeids, there should + never be a collision in the open_files map. + + PASSTHROUGH MODE + ---------------- + + Like fuse_open.cpp, this file supports FUSE passthrough mode for improved + I/O performance. The backing_id is obtained from fuse_passthrough_open() + and stored in the OpenFile entry. +*/ + #include "fuse_create.hpp" #include "state.hpp" @@ -227,29 +254,26 @@ _(const PassthroughIOEnum e_, static int -_create_for_insert_lambda(const fuse_req_ctx_t *ctx_, - const fs::path &fusepath_, - const mode_t mode_, - fuse_file_info_t *ffi_, - State::OpenFile *of_) +_create(const fuse_req_ctx_t *ctx_, + const fs::path &fusepath_, + mode_t mode_, + fuse_file_info_t *ffi_) { - int rv; - FileInfo *fi; + auto &of = state.open_files; ::_config_to_ffi_flags(cfg,ctx_->pid,ffi_); if(cfg.cache_writeback) ::_tweak_flags_cache_writeback(&ffi_->flags); - ffi_->noflush = !::_calculate_flush(cfg.flushonclose, - ffi_->flags); - - rv = ::_create(ctx_, - cfg.func.getattr.policy, - cfg.func.create.policy, - cfg.branches, - fusepath_, - ffi_, - mode_, - ctx_->umask); + ffi_->noflush = !::_calculate_flush(cfg.flushonclose,ffi_->flags); + + int rv = ::_create(ctx_, + cfg.func.getattr.policy, + cfg.func.create.policy, + cfg.branches, + fusepath_, + ffi_, + mode_, + ctx_->umask); if(rv == -EROFS) { cfg.branches.find_and_set_mode_ro(); @@ -266,10 +290,8 @@ _create_for_insert_lambda(const fuse_req_ctx_t *ctx_, if(rv < 0) return rv; - fi = FileInfo::from_fh(ffi_->fh); - - of_->ref_count = 1; - of_->fi = fi; + FileInfo *fi = FileInfo::from_fh(ffi_->fh); + int backing_id = INVALID_BACKING_ID; switch(_(cfg.passthrough_io,ffi_->flags)) { @@ -278,83 +300,40 @@ _create_for_insert_lambda(const fuse_req_ctx_t *ctx_, case _(PassthroughIO::ENUM::RW,O_RDONLY): case _(PassthroughIO::ENUM::RW,O_WRONLY): case _(PassthroughIO::ENUM::RW,O_RDWR): + backing_id = FUSE::passthrough_open(fi->fd); + if(fuse_backing_id_is_valid(backing_id)) + { + ffi_->backing_id = backing_id; + ffi_->passthrough = true; + ffi_->keep_cache = false; + } break; default: - return 0; + break; } - of_->backing_id = FUSE::passthrough_open(fi->fd); - if(of_->backing_id < 0) - return 0; + bool inserted = of.try_emplace(ctx_->nodeid, + backing_id, + fi); - ffi_->backing_id = of_->backing_id; - ffi_->passthrough = true; - ffi_->keep_cache = false; - - return 0; -} - -static -inline -auto -_create_insert_lambda(const fuse_req_ctx_t *ctx_, - const fs::path &fusepath_, - const mode_t mode_, - fuse_file_info_t *ffi_, - int *_rv_) -{ - return - [=](auto &val_) - { - *_rv_ = ::_create_for_insert_lambda(ctx_, - fusepath_, - mode_, - ffi_, - &val_.second); - }; -} - -// This function should never be called? -static -inline -auto -_create_update_lambda() -{ - return - [](const auto &val_) + // Really. This should never happen. FUSE should not be allowing a + // create for an existing nodeid. mergerfs doesn't reuse nodeids. It + // always uses a fixed generation value and always uses new nodeids. + if(not inserted) { - fmt::println(stderr,"CREATE_UPDATE_LAMBDA: THIS SHOULD NOT HAPPEN"); - SysLog::crit("CREATE_UPDATE_LAMBDA: THIS SHOULD NOT HAPPEN"); - abort(); - }; -} + const char *msg = "nodeid already exists in open_files map"; -static -int -_create(const fuse_req_ctx_t *ctx_, - const fs::path &fusepath_, - mode_t mode_, - fuse_file_info_t *ffi_) -{ - int rv; - auto &of = state.open_files; + FUSE::passthrough_close(backing_id); + fs::close(fi->fd); + delete fi; - rv = -EINVAL; - of.try_emplace_and_visit(ctx_->nodeid, - ::_create_insert_lambda(ctx_,fusepath_,mode_,ffi_,&rv), - ::_create_update_lambda()); + SysLog::crit(msg); + fmt::println(stderr,"{}",msg); - // Can't abort an emplace_and_visit and can't assume another thread - // hasn't created an entry since this failure so erase only if - // ref_count is default (0). - if(rv < 0) - of.erase_if(ctx_->nodeid, - [](const auto &val_) - { - return (val_.second.ref_count <= 0); - }); + return -EIO; + } - return rv; + return 0; } int diff --git a/src/fuse_open.cpp b/src/fuse_open.cpp index 086c24fa..17da9828 100644 --- a/src/fuse_open.cpp +++ b/src/fuse_open.cpp @@ -16,6 +16,73 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ +/* + OPEN FILE MANAGEMENT AND CONCURRENCY CONTROL + ============================================ + + This file implements FUSE open() with handling for concurrent access + to the same file. The key challenge is that multiple threads may + simultaneously attempt to open the same file (same nodeid), and we + need to: + + 1. Share a single FileInfo instance among all opens of the same file + (to be able to re-open the same file and use the same backing id for + concurrent open requests) + 2. Properly reference count to know when to release resources + 3. Handle race conditions without deadlocks or resource leaks + 4. Support FUSE passthrough mode for performance + + CONCURRENCY MODEL + ----------------- + + The open_files map (in state.hpp) tracks all currently open files: + - Key: nodeid (FUSE inode number) + - Value: OpenFile struct { ref_count, backing_id, fi } + + When a file is opened: + 1. First, we check if an entry already exists (using visit()) + 2. If it exists and is valid (fi != nullptr), we increment ref_count and + share the existing FileInfo by re-opening its file descriptor + 3. If it doesn't exist, we create a new FileInfo and try to insert it + 4. If insertion fails (another thread inserted first), we clean up + and retry (because to use passthrough it MUST be the same underlying + file as the backing id was created) + + THE RETRY LOOP + -------------- + + The MAX_OPEN_RETRIES loop (lines ~294-380) handles the race where multiple + threads try to open the same file simultaneously: + + Thread A: visit() -> no entry -> open file -> try_emplace() -> SUCCESS + Thread B: visit() -> no entry -> open file -> try_emplace() -> FAIL + Thread B: (cleans up, retries) -> visit() -> found Thread A's entry -> share it + + This ensures that all threads eventually succeed while sharing the same + underlying file resources. + + ZOMBIE STATE PROTECTION + ----------------------- + + When a file is being released (in fuse_release.cpp), the entry enters a + "zombie" state where fi is set to nullptr. The check at line ~305 prevents + new opens from acquiring a reference to a dying file: + + if(v_.second.fi == nullptr) + return; // Skip zombie entries + + PASSTHROUGH MODE + ---------------- + + FUSE passthrough (Linux 6.9+) allows the kernel to bypass mergerfs for I/O: + - We request a backing_id from the kernel via fuse_passthrough_open() + - If successful, subsequent reads/writes go directly to the backing file + - This achieves near native performance + + The backing_id is stored in the OpenFile entry so all overlapping + open requests share the same passthrough handle. +*/ + #include "fuse_open.hpp" #include "state.hpp" @@ -23,6 +90,8 @@ #include "config.hpp" #include "errno.hpp" #include "fileinfo.hpp" +#include "fuse_release.hpp" +#include "fs_close.hpp" #include "fs_cow.hpp" #include "fs_fchmod.hpp" #include "fs_lchmod.hpp" @@ -41,6 +110,8 @@ #include #include +#define MAX_OPEN_RETRIES 64 + static bool @@ -275,169 +346,106 @@ _(const PassthroughIOEnum e_, static int -_open_for_insert_lambda(const fuse_req_ctx_t *ctx_, - const fs::path &fusepath_, - fuse_file_info_t *ffi_, - State::OpenFile *of_) +_open(const fuse_req_ctx_t *ctx_, + const fs::path &fusepath_, + fuse_file_info_t *ffi_) { int rv; - FileInfo *fi; + auto &of = state.open_files; ::_config_to_ffi_flags(cfg,ctx_->pid,ffi_); - if(cfg.cache_writeback) ::_tweak_flags_cache_writeback(&ffi_->flags); + ffi_->noflush = !::_calculate_flush(cfg.flushonclose,ffi_->flags); - ffi_->noflush = !::_calculate_flush(cfg.flushonclose, - ffi_->flags); - - rv = ::_open(cfg.func.open.policy, - cfg.branches, - fusepath_, - ffi_, - cfg.link_cow, - cfg.nfsopenhack); - - if(rv < 0) - return rv; - - fi = FileInfo::from_fh(ffi_->fh); - - of_->ref_count = 1; - of_->fi = fi; - - switch(_(cfg.passthrough_io,ffi_->flags)) + for(size_t i = 0; i < MAX_OPEN_RETRIES; i++) { - case _(PassthroughIO::ENUM::RO,O_RDONLY): - case _(PassthroughIO::ENUM::WO,O_WRONLY): - case _(PassthroughIO::ENUM::RW,O_RDONLY): - case _(PassthroughIO::ENUM::RW,O_WRONLY): - case _(PassthroughIO::ENUM::RW,O_RDWR): - break; - default: - return 0; - } - - of_->backing_id = FUSE::passthrough_open(fi->fd); - if(of_->backing_id <= 0) - return 0; - - ffi_->backing_id = of_->backing_id; - ffi_->passthrough = true; - ffi_->keep_cache = false; - - return 0; -} - -static -int -_open_for_update_lambda(const fuse_req_ctx_t *ctx_, - const fs::path &fusepath_, - fuse_file_info_t *ffi_, - State::OpenFile *of_) -{ - int rv; - - ::_config_to_ffi_flags(cfg,ctx_->pid,ffi_); - - if(cfg.cache_writeback) - ::_tweak_flags_cache_writeback(&ffi_->flags); - - ffi_->noflush = !::_calculate_flush(cfg.flushonclose, - ffi_->flags); - - rv = ::_open_fd(of_->fi->fd, - &of_->fi->branch, - fusepath_, - ffi_); - if(rv < 0) - return rv; - - of_->ref_count++; - - if(of_->backing_id <= 0) - return 0; - - ffi_->backing_id = of_->backing_id; - ffi_->passthrough = true; - ffi_->keep_cache = false; + FileInfo *fi = nullptr; + int backing_id = INVALID_BACKING_ID; + + // The ref increment keeps fuse_release.cpp from erasing and + // freeing the entry. + of.visit(ctx_->nodeid, + [&](auto &v_) + { + v_.second.ref_count++; + fi = v_.second.fi; + backing_id = v_.second.backing_id; + }); + + // If the file is already open... + if(fi) + { + rv = ::_open_fd(fi->fd, + &fi->branch, + fusepath_, + ffi_); + + // If we fail to reopen the already open file we need to + // treat it similarly to fuse_release. Since we increased + // the ref count without an actual new fileinfo obj we just + // pass nullptr which FUSE::release() can handle. + if(rv < 0) + { + FUSE::release(ctx_->nodeid, + FileInfo::from_fh(ffi_->fh)); + return rv; + } + + if(!fuse_backing_id_is_valid(backing_id)) + return 0; + + ffi_->backing_id = backing_id; + ffi_->passthrough = true; + ffi_->keep_cache = false; + + return 0; + } - return rv; -} + // Was not open, do first open, try to insert, if someone beat us + // to it in another thread then throw it away and try again. + rv = ::_open(cfg.func.open.policy, + cfg.branches, + fusepath_, + ffi_, + cfg.link_cow, + cfg.nfsopenhack); + if(rv < 0) + return rv; -static -inline -auto -_open_insert_lambda(const fuse_req_ctx_t *ctx_, - const fs::path &fusepath_, - fuse_file_info_t *ffi_, - int *rv_) -{ - return - [=](auto &val_) - { - *rv_ = ::_open_for_insert_lambda(ctx_, - fusepath_, - ffi_, - &val_.second); - }; -} + fi = FileInfo::from_fh(ffi_->fh); -static -inline -auto -_open_update_lambda(const fuse_req_ctx_t *ctx_, - const fs::path &fusepath_, - fuse_file_info_t *ffi_, - int *rv_) -{ - return - [=](auto &val_) - { - // For the edge case where insert succeeded but the open failed - // and hadn't been cleaned up yet. There unfortunately is no way - // to abort an insert. - if(val_.second.ref_count <= 0) + switch(_(cfg.passthrough_io,ffi_->flags)) { - *rv_ = ::_open_for_insert_lambda(ctx_, - fusepath_, - ffi_, - &val_.second); - return; + case _(PassthroughIO::ENUM::RO,O_RDONLY): + case _(PassthroughIO::ENUM::WO,O_WRONLY): + case _(PassthroughIO::ENUM::RW,O_RDONLY): + case _(PassthroughIO::ENUM::RW,O_WRONLY): + case _(PassthroughIO::ENUM::RW,O_RDWR): + backing_id = FUSE::passthrough_open(fi->fd); + if(fuse_backing_id_is_valid(backing_id)) + { + ffi_->backing_id = backing_id; + ffi_->passthrough = true; + ffi_->keep_cache = false; + } + break; + default: + break; } - *rv_ = ::_open_for_update_lambda(ctx_, - fusepath_, - ffi_, - &val_.second); - }; -} - -static -int -_open(const fuse_req_ctx_t *ctx_, - const fs::path &fusepath_, - fuse_file_info_t *ffi_) -{ - int rv; - auto &of = state.open_files; - - rv = -EINVAL; - of.try_emplace_and_visit(ctx_->nodeid, - ::_open_insert_lambda(ctx_,fusepath_,ffi_,&rv), - ::_open_update_lambda(ctx_,fusepath_,ffi_,&rv)); + bool inserted = of.try_emplace(ctx_->nodeid, + backing_id, + fi); + if(inserted) + return 0; - // Can't abort an emplace_and_visit and can't assume another thread - // hasn't created an entry since this failure so erase only if - // ref_count is default (0). - if(rv < 0) - of.erase_if(ctx_->nodeid, - [](auto &val_) - { - return (val_.second.ref_count <= 0); - }); + FUSE::passthrough_close(backing_id); + fs::close(fi->fd); + delete fi; + } - return rv; + return -EIO; } diff --git a/src/fuse_release.cpp b/src/fuse_release.cpp index 91e1dceb..3f07f427 100644 --- a/src/fuse_release.cpp +++ b/src/fuse_release.cpp @@ -16,6 +16,76 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ +/* + TWO-PHASE FILE RELEASE AND RESOURCE CLEANUP + =========================================== + + This file implements FUSE release() with a sophisticated two-phase cleanup + mechanism designed to handle concurrent access without deadlocks or leaks. + + THE PROBLEM: CONCURRENT RELEASE AND OPEN + ---------------------------------------- + + When multiple threads access the same file, we have reference counting to + track how many opens are active. When the last close happens, we need to: + 1. Close the file descriptor + 2. Close the passthrough backing_id (if any) + 3. Delete the FileInfo object + 4. Remove the entry from the open_files map + + The challenge is that these operations can be slow (especially close()), + and we CANNOT hold the map lock during slow operations. Other threads + may need to open the same file or release other files. + + THE SOLUTION: TWO-PHASE CLEANUP + ------------------------------- + + Phase 1: _release_ref() - Atomic Reference Management (lines ~35-65) + - Runs inside visit() callback (map is locked) + - Decrements ref_count + - If ref_count reaches 0, captures resources and sets fi = nullptr + - This "zombie" state signals that cleanup is needed + + Phase 2: _release_cleanup() - Resource Destruction (lines ~69-88) + - Runs OUTSIDE visit() callback (map is unlocked) + - Performs slow operations: close(), passthrough_close(), delete + - Conditionally erases the map entry using erase_if + + WHY TWO PHASES? + -------------- + + Without this split, a slow close() would block: + - Other threads trying to open the same file + - Other threads trying to release other files (global map lock) + - The entire FUSE request pipeline + + By separating the fast reference counting from the slow cleanup, we achieve + both correctness and high concurrency. + + THE ZOMBIE STATE + ---------------- + + When ref_count hits 0, we set fi = nullptr. This "zombie" state means: + - The entry is marked for destruction + - New opens should skip this entry (see fuse_open.cpp line ~305) + - Cleanup is in progress or pending + + The erase_if() in _release_cleanup() only erases if fi is still nullptr, + protecting against the ABA problem where another thread might have already + created a new entry with the same nodeid. + + ORPHAN FILEINFO CLEANUP + ----------------------- + + The release() function (lines ~116-135) handles the case where a FileInfo + was created but never inserted into the map (e.g., open failure). The + fi_in_map flag distinguishes between: + - FileInfo in map: wait for normal cleanup + - FileInfo not in map (orphan): clean up immediately + + See commit 446e9a7b for the complete rationale behind this design. +*/ + #include "fuse_release.hpp" #include "state.hpp" @@ -28,80 +98,64 @@ #include "fuse.h" -static -constexpr -auto -_erase_if_lambda(FileInfo *fi_, - bool *existed_in_map_) -{ - return - [=](auto &val_) - { - *existed_in_map_ = true; - - if(fi_ != val_.second.fi) - { - fs::close(fi_->fd); - delete fi_; - } - - val_.second.ref_count--; - if(val_.second.ref_count > 0) - return false; - - if(val_.second.backing_id > 0) - FUSE::passthrough_close(val_.second.backing_id); - fs::close(val_.second.fi->fd); - delete val_.second.fi; - - return true; - }; -} static int -_release(const fuse_req_ctx_t *ctx_, - FileInfo *fi_, - const bool dropcacheonclose_) +_release(cu64 nodeid_, + FileInfo *fi_, + const bool dropcacheonclose_) { - bool existed_in_map; - - // according to Feh of nocache calling it once doesn't always work - // https://github.com/Feh/nocache if(dropcacheonclose_) { fs::fadvise_dontneed(fi_->fd); fs::fadvise_dontneed(fi_->fd); } - // Because of course the API doesn't tell you if the key - // existed. Just how many it erased and in this case I only want to - // erase if there are no more open files. - existed_in_map = false; - state.open_files.erase_if(ctx_->nodeid, - ::_erase_if_lambda(fi_,&existed_in_map)); - if(existed_in_map) - return 0; - - fs::close(fi_->fd); - delete fi_; + FUSE::release(nodeid_,fi_); return 0; } -static -int -_release(const fuse_req_ctx_t *ctx_, - const fuse_file_info_t *ffi_) +void +FUSE::release(cu64 nodeid_, + FileInfo * const fi_) { - FileInfo *fi = FileInfo::from_fh(ffi_->fh); - - return ::_release(ctx_,fi,cfg.dropcacheonclose); + auto &of = state.open_files; + int backing_id = INVALID_BACKING_ID; + FileInfo *fi_to_free = nullptr; + + of.erase_if(nodeid_, + [&](auto &v_) + { + v_.second.ref_count--; + if(v_.second.ref_count > 0) + { + if(fi_ != v_.second.fi) + fi_to_free = fi_; + return false; + } + + fi_to_free = v_.second.fi; + backing_id = v_.second.backing_id; + + return true; + }); + + FUSE::passthrough_close(backing_id); + if(fi_to_free) + { + fs::close(fi_to_free->fd); + delete fi_to_free; + } } int FUSE::release(const fuse_req_ctx_t *ctx_, const fuse_file_info_t *ffi_) { - return ::_release(ctx_,ffi_); + FileInfo *fi = FileInfo::from_fh(ffi_->fh); + + return ::_release(ctx_->nodeid, + fi, + cfg.dropcacheonclose); } diff --git a/src/fuse_release.hpp b/src/fuse_release.hpp index 29794a9c..64771bea 100644 --- a/src/fuse_release.hpp +++ b/src/fuse_release.hpp @@ -16,14 +16,46 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ +/* + FUSE RELEASE INTERFACE + ====================== + + This header defines the two release() function overloads used for closing + open files. There are two variants because release can be initiated from + different contexts: + + 1. release(ctx, ffi) - Standard FUSE release callback + Called by libfuse when a file is closed. Extracts the FileInfo from + the fuse_file_info_t handle and calls the internal release function. + + 2. release(nodeid, fi) - Internal release function + Called internally when we need to release a file that failed to open + properly (e.g., in fuse_open.cpp when _open_fd fails after finding an + existing open). This allows the same cleanup logic to be used for both + normal closes and error cleanup. + + The two-phase cleanup mechanism is implemented in fuse_release.cpp. See + that file for detailed documentation on the reference counting and + zombie state handling. + + See commit 446e9a7b for the complete rationale behind this design. +*/ + #pragma once +#include "base_types.h" #include "fuse.h" +class FileInfo; + namespace FUSE { int release(const fuse_req_ctx_t *ctx_, - const fuse_file_info_t *ffi); + const fuse_file_info_t *ffi_); + + void + release(cu64 nodeid, + FileInfo * const fi); } diff --git a/src/state.hpp b/src/state.hpp index 89f1edc9..12bc8eb8 100644 --- a/src/state.hpp +++ b/src/state.hpp @@ -16,19 +16,76 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ +/* + GLOBAL STATE AND OPEN FILE TRACKING + =================================== + + This file defines the global State object that holds the open_files map, + which tracks all currently open files in the filesystem. + + OPEN_FILE STRUCTURE + ------------------- + + The OpenFile struct represents a file that is currently open. It + contains: + + - ref_count: Number of active opens sharing this file. When it reaches 0, + the file is eligible for cleanup. + + - backing_id: Kernel handle for FUSE passthrough mode. INVALID_BACKING_ID + means passthrough is not enabled for this file. + + - fi: Pointer to the FileInfo object containing file metadata and + the actual file descriptor to the **first** opened file. Since a + specific file descriptor has state associated with it you can not + simply reuse the FD for overlapping open requests. Instead it is + reopened using openat(/proc/self/fd/X). Additionally, FUSE + passthrough feature requires that the same underlying file be used + for the nodeid and backing_id. All secondary opens are stored in + the fuse_file_info_t.fh only. When set to nullptr, the entry is in + "zombie" state (being destroyed but not yet removed from the map). + + THREAD SAFETY + ------------- + + The open_files map uses boost::concurrent_flat_map which provides: + - Bucket-level locking for fine-grained concurrency + - visit() for atomic read-modify operations + - try_emplace() for atomic insertion + - erase_if() for conditional removal + + While there is fine-grained locking it isn't per entry and as a + result the code for opening and releasing is more complicated since + having file io syscalls in the critical section lambda could cause + contention. + + LIFECYCLE STATES + ---------------- + + An OpenFile entry goes through these states: + + 1. NOT IN MAP - File is not open + 2. ACTIVE (ref_count >= 1, fi != nullptr) - File is open and in use + 3. ZOMBIE (ref_count = 0, fi = nullptr) - File is being destroyed + 4. CLEANED UP - Resources freed, entry erased from map + + See fuse_open.cpp and fuse_release.cpp for detailed documentation on the + concurrency patterns and two-phase cleanup mechanism. +*/ + #pragma once #include "boost/unordered/concurrent_flat_map.hpp" #include "fileinfo.hpp" +#include "fuse.h" + #include #include #include -constexpr int INVALID_BACKING_ID = -1; - class State { public: @@ -37,10 +94,13 @@ public: public: struct OpenFile { - OpenFile() - : ref_count(0), - backing_id(INVALID_BACKING_ID), - fi(nullptr) + OpenFile() = delete; + + OpenFile(const int backing_id_, + FileInfo * const fi_) + : ref_count(1), + backing_id(backing_id_), + fi(fi_) { } diff --git a/tests/TEST_io_passthrough_create_open b/tests/TEST_io_passthrough_create_open new file mode 100755 index 00000000..5943523c --- /dev/null +++ b/tests/TEST_io_passthrough_create_open @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 + +import os +import sys +import tempfile +import time + +# Test create + write + read (passthrough.io=rw) +(fd, filepath) = tempfile.mkstemp(dir=sys.argv[1]) + +test_data = b"passthrough io test data for create\n" * 100 + +# Write to newly created file +bytes_written = os.write(fd, test_data) +if bytes_written != len(test_data): + print("create write failed: expected {} bytes, wrote {}".format(len(test_data), bytes_written)) + sys.exit(1) + +# Seek and read back +os.lseek(fd, 0, os.SEEK_SET) +read_data = os.read(fd, len(test_data)) +if read_data != test_data: + print("create read failed: data mismatch") + sys.exit(1) + +os.close(fd) + +# Test open existing file + write + read +fd = os.open(filepath, os.O_RDWR) + +# Read existing data +os.lseek(fd, 0, os.SEEK_SET) +read_data = os.read(fd, len(test_data)) +if read_data != test_data: + print("open read failed: data mismatch") + sys.exit(1) + +# Write more data at end +os.lseek(fd, 0, os.SEEK_END) +more_data = b"additional passthrough data for open\n" * 50 +bytes_written = os.write(fd, more_data) +if bytes_written != len(more_data): + print("open write failed: expected {} bytes, wrote {}".format(len(more_data), bytes_written)) + sys.exit(1) + +# Verify all data +os.lseek(fd, 0, os.SEEK_SET) +all_data = os.read(fd, len(test_data) + len(more_data)) +if all_data != test_data + more_data: + print("open final read failed: data mismatch") + sys.exit(1) + +# Test multiple opens of same file (single backing_id feature) +# When a file is already open with passthrough, subsequent opens +# must reuse the same backing_id rather than creating new ones +fd2 = os.open(filepath, os.O_RDWR) + +# Read from second fd - should see same data +os.lseek(fd2, 0, os.SEEK_SET) +read_data2 = os.read(fd2, len(test_data) + len(more_data)) +if read_data2 != test_data + more_data: + print("second open read failed: data mismatch") + os.close(fd) + os.close(fd2) + sys.exit(1) + +# Write from second fd +os.lseek(fd2, 0, os.SEEK_END) +extra_data = b"data from second file descriptor\n" * 25 +bytes_written = os.write(fd2, extra_data) +if bytes_written != len(extra_data): + print("second open write failed: expected {} bytes, wrote {}".format(len(extra_data), bytes_written)) + os.close(fd) + os.close(fd2) + sys.exit(1) + +# Verify write is visible from first fd (shared backing) +os.lseek(fd, 0, os.SEEK_SET) +combined_data = os.read(fd, len(test_data) + len(more_data) + len(extra_data)) +expected_data = test_data + more_data + extra_data +if combined_data != expected_data: + print("cross-fd read failed: data mismatch (writes from fd2 not visible on fd)") + os.close(fd) + os.close(fd2) + sys.exit(1) + +# Open a third fd while others are still open +fd3 = os.open(filepath, os.O_RDONLY) +os.lseek(fd3, 0, os.SEEK_SET) +read_data3 = os.read(fd3, len(expected_data)) +if read_data3 != expected_data: + print("third open read failed: data mismatch") + os.close(fd) + os.close(fd2) + os.close(fd3) + sys.exit(1) + +os.close(fd3) +os.close(fd2) +os.close(fd) + +# Cleanup +os.unlink(filepath) diff --git a/tests/TEST_io_passthrough_open_race b/tests/TEST_io_passthrough_open_race new file mode 100755 index 00000000..5a0c6a14 --- /dev/null +++ b/tests/TEST_io_passthrough_open_race @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 + +import os +import sys +import tempfile +import threading +import time + +NUM_THREADS = 50 +TEST_DATA = b"race condition test data\n" + +def open_and_operate(filepath, barrier, results, index): + """ + Wait at barrier, then open file, write, read, and store result. + """ + try: + # Wait for all threads to be ready + barrier.wait() + + # All threads try to open simultaneously + fd = os.open(filepath, os.O_RDWR) + + # Write thread-specific data at a unique offset + offset = index * len(TEST_DATA) + os.lseek(fd, offset, os.SEEK_SET) + bytes_written = os.write(fd, TEST_DATA) + + if bytes_written != len(TEST_DATA): + results[index] = ("write_error", "expected {} bytes, wrote {}".format(len(TEST_DATA), bytes_written)) + os.close(fd) + return + + # Read back what we wrote + os.lseek(fd, offset, os.SEEK_SET) + read_data = os.read(fd, len(TEST_DATA)) + + if read_data != TEST_DATA: + results[index] = ("read_error", "data mismatch at offset {}".format(offset)) + os.close(fd) + return + + results[index] = ("success", fd) + + except Exception as e: + results[index] = ("exception", str(e)) + +# Create test file with enough space for all threads +(fd, filepath) = tempfile.mkstemp(dir=sys.argv[1]) + +# Pre-allocate file to avoid issues with concurrent writes extending file +total_size = NUM_THREADS * len(TEST_DATA) +os.ftruncate(fd, total_size) +os.close(fd) + +# Set up synchronization +barrier = threading.Barrier(NUM_THREADS) +results = [None] * NUM_THREADS +threads = [] + +# Create and start all threads +for i in range(NUM_THREADS): + t = threading.Thread(target=open_and_operate, args=(filepath, barrier, results, i)) + threads.append(t) + +for t in threads: + t.start() + +for t in threads: + t.join() + +# Check results +failed = False +fds_to_close = [] + +for i, result in enumerate(results): + if result is None: + print("thread {} returned no result".format(i)) + failed = True + elif result[0] == "success": + fds_to_close.append(result[1]) + else: + print("thread {} failed: {} - {}".format(i, result[0], result[1])) + failed = True + +if failed: + for fd in fds_to_close: + os.close(fd) + os.unlink(filepath) + sys.exit(1) + +# Verify all data is consistent by reading through one fd +if fds_to_close: + verify_fd = fds_to_close[0] + os.lseek(verify_fd, 0, os.SEEK_SET) + all_data = os.read(verify_fd, total_size) + + expected_data = TEST_DATA * NUM_THREADS + if all_data != expected_data: + print("final verification failed: data mismatch") + print("expected {} bytes, got {} bytes".format(len(expected_data), len(all_data))) + for fd in fds_to_close: + os.close(fd) + os.unlink(filepath) + sys.exit(1) + +# Test cross-fd visibility: write from one fd, read from another +if len(fds_to_close) >= 2: + fd_a = fds_to_close[0] + fd_b = fds_to_close[1] + + # Write new data from fd_a + new_data = b"cross-fd visibility test\n" + os.lseek(fd_a, 0, os.SEEK_SET) + os.write(fd_a, new_data) + + # Read from fd_b - should see the new data + os.lseek(fd_b, 0, os.SEEK_SET) + read_back = os.read(fd_b, len(new_data)) + + if read_back != new_data: + print("cross-fd visibility failed: write from fd_a not visible on fd_b") + for fd in fds_to_close: + os.close(fd) + os.unlink(filepath) + sys.exit(1) + +# Close all file descriptors +for fd in fds_to_close: + os.close(fd) + +# Cleanup +os.unlink(filepath) diff --git a/vendored/libfuse/include/fuse.h b/vendored/libfuse/include/fuse.h index 1c788f43..1064e1a9 100644 --- a/vendored/libfuse/include/fuse.h +++ b/vendored/libfuse/include/fuse.h @@ -29,6 +29,16 @@ EXTERN_C_BEGIN * Basic FUSE API * * ----------------------------------------------------------- */ +#define INVALID_BACKING_ID (0) + +static +inline +int +fuse_backing_id_is_valid(const int backing_id_) +{ + return (backing_id_ > INVALID_BACKING_ID); +} + struct fuse_dirents_t; typedef struct fuse_dirents_t fuse_dirents_t; diff --git a/vendored/libfuse/lib/fuse.cpp b/vendored/libfuse/lib/fuse.cpp index 69191605..7b7b458b 100644 --- a/vendored/libfuse/lib/fuse.cpp +++ b/vendored/libfuse/lib/fuse.cpp @@ -3800,7 +3800,7 @@ fuse_passthrough_open(const int fd_) rv = ::ioctl(dev_fuse_fd,FUSE_DEV_IOC_BACKING_OPEN,&bm); - return rv; + return ((rv < 0) ? INVALID_BACKING_ID : rv); } int @@ -3808,6 +3808,9 @@ fuse_passthrough_close(const int backing_id_) { int dev_fuse_fd; + if(!fuse_backing_id_is_valid(backing_id_)) + return 0; + dev_fuse_fd = f.se->fd; return ::ioctl(dev_fuse_fd,FUSE_DEV_IOC_BACKING_CLOSE,&backing_id_); diff --git a/vendored/libfuse/lib/fuse_lowlevel.cpp b/vendored/libfuse/lib/fuse_lowlevel.cpp index fdf316d2..227959f1 100644 --- a/vendored/libfuse/lib/fuse_lowlevel.cpp +++ b/vendored/libfuse/lib/fuse_lowlevel.cpp @@ -258,7 +258,7 @@ fill_open(struct fuse_open_out *arg_, arg_->open_flags |= FOPEN_PARALLEL_DIRECT_WRITES; if(ffi_->noflush) arg_->open_flags |= FOPEN_NOFLUSH; - if(ffi_->passthrough && (ffi_->backing_id > 0)) + if(ffi_->passthrough && fuse_backing_id_is_valid(ffi_->backing_id)) { arg_->open_flags |= FOPEN_PASSTHROUGH; arg_->backing_id = ffi_->backing_id;