Browse Source

Rework open file resource management

openfiles
Antonio SJ Musumeci 2 months ago
parent
commit
9200ecb00c
  1. 36
      src/fileinfo.hpp
  2. 157
      src/fuse_create.cpp
  3. 302
      src/fuse_open.cpp
  4. 164
      src/fuse_release.cpp
  5. 34
      src/fuse_release.hpp
  6. 72
      src/state.hpp
  7. 103
      tests/TEST_io_passthrough_create_open
  8. 132
      tests/TEST_io_passthrough_open_race
  9. 10
      vendored/libfuse/include/fuse.h
  10. 5
      vendored/libfuse/lib/fuse.cpp
  11. 2
      vendored/libfuse/lib/fuse_lowlevel.cpp

36
src/fileinfo.hpp

@ -16,6 +16,31 @@
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/*
FILEINFO - OPEN FILE METADATA
=============================
This file defines the FileInfo class which stores metadata for an open
file. Each create and opened file get its own instance while the
first one of any set of concurrently open files will be stored in
open_files map for reference but always pass around through
fuse_file_info_t.fh.
FUSE FILE HANDLE CONVERSION
---------------------------
FUSE uses a 64-bit file handle (fuse_file_info_t.fh). We convert between
FileInfo* and fh using simple pointer casting:
- to_fh(FileInfo*): Casts pointer to uint64_t for FUSE
- from_fh(uint64_t): Casts back to FileInfo pointer
This is safe because FileInfo objects remain valid until the last close.
See fuse_open.cpp and fuse_release.cpp for how FileInfo is used in the
open/close flow.
*/
#pragma once
#include "assert.hpp"
@ -31,7 +56,8 @@
class FileInfo : public FH
{
public:
static FileInfo *from_fh(const u64 fh);
static FileInfo *from_fh(const u64);
static u64 to_fh(const FileInfo*);
public:
FileInfo(const int fd_,
@ -74,6 +100,14 @@ public:
Mutex mutex;
};
inline
u64
FileInfo::to_fh(const FileInfo *fi_)
{
ASSERT_NOT_NULL(fi_);
return reinterpret_cast<u64>(fi_);
}
inline
u64
FileInfo::to_fh() const

157
src/fuse_create.cpp

@ -16,6 +16,33 @@
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/*
FILE CREATION WITH FUSE PASSTHROUGH SUPPORT
===========================================
This file implements FUSE create() which creates a new file and opens it.
Unlike fuse_open.cpp, this file does NOT use a retry loop because the
kernel guarantees serialization of create operations.
WHY NO RETRY LOOP?
------------------
The fuse_open.cpp retry loop handles the case where multiple threads race
to open an EXISTING file. With create(), the kernel serializes namespace
operations at the VFS layer - only one create for a given path can succeed.
Additionally, when a file is created, FUSE allocates a completely new
nodeid (inode number). Since mergerfs never reuses nodeids, there should
never be a collision in the open_files map.
PASSTHROUGH MODE
----------------
Like fuse_open.cpp, this file supports FUSE passthrough mode for improved
I/O performance. The backing_id is obtained from fuse_passthrough_open()
and stored in the OpenFile entry.
*/
#include "fuse_create.hpp"
#include "state.hpp"
@ -227,29 +254,26 @@ _(const PassthroughIOEnum e_,
static
int
_create_for_insert_lambda(const fuse_req_ctx_t *ctx_,
const fs::path &fusepath_,
const mode_t mode_,
fuse_file_info_t *ffi_,
State::OpenFile *of_)
_create(const fuse_req_ctx_t *ctx_,
const fs::path &fusepath_,
mode_t mode_,
fuse_file_info_t *ffi_)
{
int rv;
FileInfo *fi;
auto &of = state.open_files;
::_config_to_ffi_flags(cfg,ctx_->pid,ffi_);
if(cfg.cache_writeback)
::_tweak_flags_cache_writeback(&ffi_->flags);
ffi_->noflush = !::_calculate_flush(cfg.flushonclose,
ffi_->flags);
rv = ::_create(ctx_,
cfg.func.getattr.policy,
cfg.func.create.policy,
cfg.branches,
fusepath_,
ffi_,
mode_,
ctx_->umask);
ffi_->noflush = !::_calculate_flush(cfg.flushonclose,ffi_->flags);
int rv = ::_create(ctx_,
cfg.func.getattr.policy,
cfg.func.create.policy,
cfg.branches,
fusepath_,
ffi_,
mode_,
ctx_->umask);
if(rv == -EROFS)
{
cfg.branches.find_and_set_mode_ro();
@ -266,10 +290,8 @@ _create_for_insert_lambda(const fuse_req_ctx_t *ctx_,
if(rv < 0)
return rv;
fi = FileInfo::from_fh(ffi_->fh);
of_->ref_count = 1;
of_->fi = fi;
FileInfo *fi = FileInfo::from_fh(ffi_->fh);
int backing_id = INVALID_BACKING_ID;
switch(_(cfg.passthrough_io,ffi_->flags))
{
@ -278,83 +300,40 @@ _create_for_insert_lambda(const fuse_req_ctx_t *ctx_,
case _(PassthroughIO::ENUM::RW,O_RDONLY):
case _(PassthroughIO::ENUM::RW,O_WRONLY):
case _(PassthroughIO::ENUM::RW,O_RDWR):
backing_id = FUSE::passthrough_open(fi->fd);
if(fuse_backing_id_is_valid(backing_id))
{
ffi_->backing_id = backing_id;
ffi_->passthrough = true;
ffi_->keep_cache = false;
}
break;
default:
return 0;
break;
}
of_->backing_id = FUSE::passthrough_open(fi->fd);
if(of_->backing_id < 0)
return 0;
bool inserted = of.try_emplace(ctx_->nodeid,
backing_id,
fi);
ffi_->backing_id = of_->backing_id;
ffi_->passthrough = true;
ffi_->keep_cache = false;
return 0;
}
static
inline
auto
_create_insert_lambda(const fuse_req_ctx_t *ctx_,
const fs::path &fusepath_,
const mode_t mode_,
fuse_file_info_t *ffi_,
int *_rv_)
{
return
[=](auto &val_)
{
*_rv_ = ::_create_for_insert_lambda(ctx_,
fusepath_,
mode_,
ffi_,
&val_.second);
};
}
// This function should never be called?
static
inline
auto
_create_update_lambda()
{
return
[](const auto &val_)
// Really. This should never happen. FUSE should not be allowing a
// create for an existing nodeid. mergerfs doesn't reuse nodeids. It
// always uses a fixed generation value and always uses new nodeids.
if(not inserted)
{
fmt::println(stderr,"CREATE_UPDATE_LAMBDA: THIS SHOULD NOT HAPPEN");
SysLog::crit("CREATE_UPDATE_LAMBDA: THIS SHOULD NOT HAPPEN");
abort();
};
}
const char *msg = "nodeid already exists in open_files map";
static
int
_create(const fuse_req_ctx_t *ctx_,
const fs::path &fusepath_,
mode_t mode_,
fuse_file_info_t *ffi_)
{
int rv;
auto &of = state.open_files;
FUSE::passthrough_close(backing_id);
fs::close(fi->fd);
delete fi;
rv = -EINVAL;
of.try_emplace_and_visit(ctx_->nodeid,
::_create_insert_lambda(ctx_,fusepath_,mode_,ffi_,&rv),
::_create_update_lambda());
SysLog::crit(msg);
fmt::println(stderr,"{}",msg);
// Can't abort an emplace_and_visit and can't assume another thread
// hasn't created an entry since this failure so erase only if
// ref_count is default (0).
if(rv < 0)
of.erase_if(ctx_->nodeid,
[](const auto &val_)
{
return (val_.second.ref_count <= 0);
});
return -EIO;
}
return rv;
return 0;
}
int

302
src/fuse_open.cpp

@ -16,6 +16,73 @@
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/*
OPEN FILE MANAGEMENT AND CONCURRENCY CONTROL
============================================
This file implements FUSE open() with handling for concurrent access
to the same file. The key challenge is that multiple threads may
simultaneously attempt to open the same file (same nodeid), and we
need to:
1. Share a single FileInfo instance among all opens of the same file
(to be able to re-open the same file and use the same backing id for
concurrent open requests)
2. Properly reference count to know when to release resources
3. Handle race conditions without deadlocks or resource leaks
4. Support FUSE passthrough mode for performance
CONCURRENCY MODEL
-----------------
The open_files map (in state.hpp) tracks all currently open files:
- Key: nodeid (FUSE inode number)
- Value: OpenFile struct { ref_count, backing_id, fi }
When a file is opened:
1. First, we check if an entry already exists (using visit())
2. If it exists and is valid (fi != nullptr), we increment ref_count and
share the existing FileInfo by re-opening its file descriptor
3. If it doesn't exist, we create a new FileInfo and try to insert it
4. If insertion fails (another thread inserted first), we clean up
and retry (because to use passthrough it MUST be the same underlying
file as the backing id was created)
THE RETRY LOOP
--------------
The MAX_OPEN_RETRIES loop (lines ~294-380) handles the race where multiple
threads try to open the same file simultaneously:
Thread A: visit() -> no entry -> open file -> try_emplace() -> SUCCESS
Thread B: visit() -> no entry -> open file -> try_emplace() -> FAIL
Thread B: (cleans up, retries) -> visit() -> found Thread A's entry -> share it
This ensures that all threads eventually succeed while sharing the same
underlying file resources.
ZOMBIE STATE PROTECTION
-----------------------
When a file is being released (in fuse_release.cpp), the entry enters a
"zombie" state where fi is set to nullptr. The check at line ~305 prevents
new opens from acquiring a reference to a dying file:
if(v_.second.fi == nullptr)
return; // Skip zombie entries
PASSTHROUGH MODE
----------------
FUSE passthrough (Linux 6.9+) allows the kernel to bypass mergerfs for I/O:
- We request a backing_id from the kernel via fuse_passthrough_open()
- If successful, subsequent reads/writes go directly to the backing file
- This achieves near native performance
The backing_id is stored in the OpenFile entry so all overlapping
open requests share the same passthrough handle.
*/
#include "fuse_open.hpp"
#include "state.hpp"
@ -23,6 +90,8 @@
#include "config.hpp"
#include "errno.hpp"
#include "fileinfo.hpp"
#include "fuse_release.hpp"
#include "fs_close.hpp"
#include "fs_cow.hpp"
#include "fs_fchmod.hpp"
#include "fs_lchmod.hpp"
@ -41,6 +110,8 @@
#include <string>
#include <vector>
#define MAX_OPEN_RETRIES 64
static
bool
@ -275,169 +346,106 @@ _(const PassthroughIOEnum e_,
static
int
_open_for_insert_lambda(const fuse_req_ctx_t *ctx_,
const fs::path &fusepath_,
fuse_file_info_t *ffi_,
State::OpenFile *of_)
_open(const fuse_req_ctx_t *ctx_,
const fs::path &fusepath_,
fuse_file_info_t *ffi_)
{
int rv;
FileInfo *fi;
auto &of = state.open_files;
::_config_to_ffi_flags(cfg,ctx_->pid,ffi_);
if(cfg.cache_writeback)
::_tweak_flags_cache_writeback(&ffi_->flags);
ffi_->noflush = !::_calculate_flush(cfg.flushonclose,ffi_->flags);
ffi_->noflush = !::_calculate_flush(cfg.flushonclose,
ffi_->flags);
rv = ::_open(cfg.func.open.policy,
cfg.branches,
fusepath_,
ffi_,
cfg.link_cow,
cfg.nfsopenhack);
if(rv < 0)
return rv;
fi = FileInfo::from_fh(ffi_->fh);
of_->ref_count = 1;
of_->fi = fi;
switch(_(cfg.passthrough_io,ffi_->flags))
for(size_t i = 0; i < MAX_OPEN_RETRIES; i++)
{
case _(PassthroughIO::ENUM::RO,O_RDONLY):
case _(PassthroughIO::ENUM::WO,O_WRONLY):
case _(PassthroughIO::ENUM::RW,O_RDONLY):
case _(PassthroughIO::ENUM::RW,O_WRONLY):
case _(PassthroughIO::ENUM::RW,O_RDWR):
break;
default:
return 0;
}
of_->backing_id = FUSE::passthrough_open(fi->fd);
if(of_->backing_id <= 0)
return 0;
ffi_->backing_id = of_->backing_id;
ffi_->passthrough = true;
ffi_->keep_cache = false;
return 0;
}
static
int
_open_for_update_lambda(const fuse_req_ctx_t *ctx_,
const fs::path &fusepath_,
fuse_file_info_t *ffi_,
State::OpenFile *of_)
{
int rv;
::_config_to_ffi_flags(cfg,ctx_->pid,ffi_);
if(cfg.cache_writeback)
::_tweak_flags_cache_writeback(&ffi_->flags);
ffi_->noflush = !::_calculate_flush(cfg.flushonclose,
ffi_->flags);
rv = ::_open_fd(of_->fi->fd,
&of_->fi->branch,
fusepath_,
ffi_);
if(rv < 0)
return rv;
of_->ref_count++;
if(of_->backing_id <= 0)
return 0;
ffi_->backing_id = of_->backing_id;
ffi_->passthrough = true;
ffi_->keep_cache = false;
FileInfo *fi = nullptr;
int backing_id = INVALID_BACKING_ID;
// The ref increment keeps fuse_release.cpp from erasing and
// freeing the entry.
of.visit(ctx_->nodeid,
[&](auto &v_)
{
v_.second.ref_count++;
fi = v_.second.fi;
backing_id = v_.second.backing_id;
});
// If the file is already open...
if(fi)
{
rv = ::_open_fd(fi->fd,
&fi->branch,
fusepath_,
ffi_);
// If we fail to reopen the already open file we need to
// treat it similarly to fuse_release. Since we increased
// the ref count without an actual new fileinfo obj we just
// pass nullptr which FUSE::release() can handle.
if(rv < 0)
{
FUSE::release(ctx_->nodeid,
FileInfo::from_fh(ffi_->fh));
return rv;
}
if(!fuse_backing_id_is_valid(backing_id))
return 0;
ffi_->backing_id = backing_id;
ffi_->passthrough = true;
ffi_->keep_cache = false;
return 0;
}
return rv;
}
// Was not open, do first open, try to insert, if someone beat us
// to it in another thread then throw it away and try again.
rv = ::_open(cfg.func.open.policy,
cfg.branches,
fusepath_,
ffi_,
cfg.link_cow,
cfg.nfsopenhack);
if(rv < 0)
return rv;
static
inline
auto
_open_insert_lambda(const fuse_req_ctx_t *ctx_,
const fs::path &fusepath_,
fuse_file_info_t *ffi_,
int *rv_)
{
return
[=](auto &val_)
{
*rv_ = ::_open_for_insert_lambda(ctx_,
fusepath_,
ffi_,
&val_.second);
};
}
fi = FileInfo::from_fh(ffi_->fh);
static
inline
auto
_open_update_lambda(const fuse_req_ctx_t *ctx_,
const fs::path &fusepath_,
fuse_file_info_t *ffi_,
int *rv_)
{
return
[=](auto &val_)
{
// For the edge case where insert succeeded but the open failed
// and hadn't been cleaned up yet. There unfortunately is no way
// to abort an insert.
if(val_.second.ref_count <= 0)
switch(_(cfg.passthrough_io,ffi_->flags))
{
*rv_ = ::_open_for_insert_lambda(ctx_,
fusepath_,
ffi_,
&val_.second);
return;
case _(PassthroughIO::ENUM::RO,O_RDONLY):
case _(PassthroughIO::ENUM::WO,O_WRONLY):
case _(PassthroughIO::ENUM::RW,O_RDONLY):
case _(PassthroughIO::ENUM::RW,O_WRONLY):
case _(PassthroughIO::ENUM::RW,O_RDWR):
backing_id = FUSE::passthrough_open(fi->fd);
if(fuse_backing_id_is_valid(backing_id))
{
ffi_->backing_id = backing_id;
ffi_->passthrough = true;
ffi_->keep_cache = false;
}
break;
default:
break;
}
*rv_ = ::_open_for_update_lambda(ctx_,
fusepath_,
ffi_,
&val_.second);
};
}
static
int
_open(const fuse_req_ctx_t *ctx_,
const fs::path &fusepath_,
fuse_file_info_t *ffi_)
{
int rv;
auto &of = state.open_files;
rv = -EINVAL;
of.try_emplace_and_visit(ctx_->nodeid,
::_open_insert_lambda(ctx_,fusepath_,ffi_,&rv),
::_open_update_lambda(ctx_,fusepath_,ffi_,&rv));
bool inserted = of.try_emplace(ctx_->nodeid,
backing_id,
fi);
if(inserted)
return 0;
// Can't abort an emplace_and_visit and can't assume another thread
// hasn't created an entry since this failure so erase only if
// ref_count is default (0).
if(rv < 0)
of.erase_if(ctx_->nodeid,
[](auto &val_)
{
return (val_.second.ref_count <= 0);
});
FUSE::passthrough_close(backing_id);
fs::close(fi->fd);
delete fi;
}
return rv;
return -EIO;
}

164
src/fuse_release.cpp

@ -16,6 +16,76 @@
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/*
TWO-PHASE FILE RELEASE AND RESOURCE CLEANUP
===========================================
This file implements FUSE release() with a sophisticated two-phase cleanup
mechanism designed to handle concurrent access without deadlocks or leaks.
THE PROBLEM: CONCURRENT RELEASE AND OPEN
----------------------------------------
When multiple threads access the same file, we have reference counting to
track how many opens are active. When the last close happens, we need to:
1. Close the file descriptor
2. Close the passthrough backing_id (if any)
3. Delete the FileInfo object
4. Remove the entry from the open_files map
The challenge is that these operations can be slow (especially close()),
and we CANNOT hold the map lock during slow operations. Other threads
may need to open the same file or release other files.
THE SOLUTION: TWO-PHASE CLEANUP
-------------------------------
Phase 1: _release_ref() - Atomic Reference Management (lines ~35-65)
- Runs inside visit() callback (map is locked)
- Decrements ref_count
- If ref_count reaches 0, captures resources and sets fi = nullptr
- This "zombie" state signals that cleanup is needed
Phase 2: _release_cleanup() - Resource Destruction (lines ~69-88)
- Runs OUTSIDE visit() callback (map is unlocked)
- Performs slow operations: close(), passthrough_close(), delete
- Conditionally erases the map entry using erase_if
WHY TWO PHASES?
--------------
Without this split, a slow close() would block:
- Other threads trying to open the same file
- Other threads trying to release other files (global map lock)
- The entire FUSE request pipeline
By separating the fast reference counting from the slow cleanup, we achieve
both correctness and high concurrency.
THE ZOMBIE STATE
----------------
When ref_count hits 0, we set fi = nullptr. This "zombie" state means:
- The entry is marked for destruction
- New opens should skip this entry (see fuse_open.cpp line ~305)
- Cleanup is in progress or pending
The erase_if() in _release_cleanup() only erases if fi is still nullptr,
protecting against the ABA problem where another thread might have already
created a new entry with the same nodeid.
ORPHAN FILEINFO CLEANUP
-----------------------
The release() function (lines ~116-135) handles the case where a FileInfo
was created but never inserted into the map (e.g., open failure). The
fi_in_map flag distinguishes between:
- FileInfo in map: wait for normal cleanup
- FileInfo not in map (orphan): clean up immediately
See commit 446e9a7b for the complete rationale behind this design.
*/
#include "fuse_release.hpp"
#include "state.hpp"
@ -28,80 +98,64 @@
#include "fuse.h"
static
constexpr
auto
_erase_if_lambda(FileInfo *fi_,
bool *existed_in_map_)
{
return
[=](auto &val_)
{
*existed_in_map_ = true;
if(fi_ != val_.second.fi)
{
fs::close(fi_->fd);
delete fi_;
}
val_.second.ref_count--;
if(val_.second.ref_count > 0)
return false;
if(val_.second.backing_id > 0)
FUSE::passthrough_close(val_.second.backing_id);
fs::close(val_.second.fi->fd);
delete val_.second.fi;
return true;
};
}
static
int
_release(const fuse_req_ctx_t *ctx_,
FileInfo *fi_,
const bool dropcacheonclose_)
_release(cu64 nodeid_,
FileInfo *fi_,
const bool dropcacheonclose_)
{
bool existed_in_map;
// according to Feh of nocache calling it once doesn't always work
// https://github.com/Feh/nocache
if(dropcacheonclose_)
{
fs::fadvise_dontneed(fi_->fd);
fs::fadvise_dontneed(fi_->fd);
}
// Because of course the API doesn't tell you if the key
// existed. Just how many it erased and in this case I only want to
// erase if there are no more open files.
existed_in_map = false;
state.open_files.erase_if(ctx_->nodeid,
::_erase_if_lambda(fi_,&existed_in_map));
if(existed_in_map)
return 0;
fs::close(fi_->fd);
delete fi_;
FUSE::release(nodeid_,fi_);
return 0;
}
static
int
_release(const fuse_req_ctx_t *ctx_,
const fuse_file_info_t *ffi_)
void
FUSE::release(cu64 nodeid_,
FileInfo * const fi_)
{
FileInfo *fi = FileInfo::from_fh(ffi_->fh);
return ::_release(ctx_,fi,cfg.dropcacheonclose);
auto &of = state.open_files;
int backing_id = INVALID_BACKING_ID;
FileInfo *fi_to_free = nullptr;
of.erase_if(nodeid_,
[&](auto &v_)
{
v_.second.ref_count--;
if(v_.second.ref_count > 0)
{
if(fi_ != v_.second.fi)
fi_to_free = fi_;
return false;
}
fi_to_free = v_.second.fi;
backing_id = v_.second.backing_id;
return true;
});
FUSE::passthrough_close(backing_id);
if(fi_to_free)
{
fs::close(fi_to_free->fd);
delete fi_to_free;
}
}
int
FUSE::release(const fuse_req_ctx_t *ctx_,
const fuse_file_info_t *ffi_)
{
return ::_release(ctx_,ffi_);
FileInfo *fi = FileInfo::from_fh(ffi_->fh);
return ::_release(ctx_->nodeid,
fi,
cfg.dropcacheonclose);
}

34
src/fuse_release.hpp

@ -16,14 +16,46 @@
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/*
FUSE RELEASE INTERFACE
======================
This header defines the two release() function overloads used for closing
open files. There are two variants because release can be initiated from
different contexts:
1. release(ctx, ffi) - Standard FUSE release callback
Called by libfuse when a file is closed. Extracts the FileInfo from
the fuse_file_info_t handle and calls the internal release function.
2. release(nodeid, fi) - Internal release function
Called internally when we need to release a file that failed to open
properly (e.g., in fuse_open.cpp when _open_fd fails after finding an
existing open). This allows the same cleanup logic to be used for both
normal closes and error cleanup.
The two-phase cleanup mechanism is implemented in fuse_release.cpp. See
that file for detailed documentation on the reference counting and
zombie state handling.
See commit 446e9a7b for the complete rationale behind this design.
*/
#pragma once
#include "base_types.h"
#include "fuse.h"
class FileInfo;
namespace FUSE
{
int
release(const fuse_req_ctx_t *ctx_,
const fuse_file_info_t *ffi);
const fuse_file_info_t *ffi_);
void
release(cu64 nodeid,
FileInfo * const fi);
}

72
src/state.hpp

@ -16,19 +16,76 @@
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/*
GLOBAL STATE AND OPEN FILE TRACKING
===================================
This file defines the global State object that holds the open_files map,
which tracks all currently open files in the filesystem.
OPEN_FILE STRUCTURE
-------------------
The OpenFile struct represents a file that is currently open. It
contains:
- ref_count: Number of active opens sharing this file. When it reaches 0,
the file is eligible for cleanup.
- backing_id: Kernel handle for FUSE passthrough mode. INVALID_BACKING_ID
means passthrough is not enabled for this file.
- fi: Pointer to the FileInfo object containing file metadata and
the actual file descriptor to the **first** opened file. Since a
specific file descriptor has state associated with it you can not
simply reuse the FD for overlapping open requests. Instead it is
reopened using openat(/proc/self/fd/X). Additionally, FUSE
passthrough feature requires that the same underlying file be used
for the nodeid and backing_id. All secondary opens are stored in
the fuse_file_info_t.fh only. When set to nullptr, the entry is in
"zombie" state (being destroyed but not yet removed from the map).
THREAD SAFETY
-------------
The open_files map uses boost::concurrent_flat_map which provides:
- Bucket-level locking for fine-grained concurrency
- visit() for atomic read-modify operations
- try_emplace() for atomic insertion
- erase_if() for conditional removal
While there is fine-grained locking it isn't per entry and as a
result the code for opening and releasing is more complicated since
having file io syscalls in the critical section lambda could cause
contention.
LIFECYCLE STATES
----------------
An OpenFile entry goes through these states:
1. NOT IN MAP - File is not open
2. ACTIVE (ref_count >= 1, fi != nullptr) - File is open and in use
3. ZOMBIE (ref_count = 0, fi = nullptr) - File is being destroyed
4. CLEANED UP - Resources freed, entry erased from map
See fuse_open.cpp and fuse_release.cpp for detailed documentation on the
concurrency patterns and two-phase cleanup mechanism.
*/
#pragma once
#include "boost/unordered/concurrent_flat_map.hpp"
#include "fileinfo.hpp"
#include "fuse.h"
#include <functional>
#include <map>
#include <string>
constexpr int INVALID_BACKING_ID = -1;
class State
{
public:
@ -37,10 +94,13 @@ public:
public:
struct OpenFile
{
OpenFile()
: ref_count(0),
backing_id(INVALID_BACKING_ID),
fi(nullptr)
OpenFile() = delete;
OpenFile(const int backing_id_,
FileInfo * const fi_)
: ref_count(1),
backing_id(backing_id_),
fi(fi_)
{
}

103
tests/TEST_io_passthrough_create_open

@ -0,0 +1,103 @@
#!/usr/bin/env python3
import os
import sys
import tempfile
import time
# Test create + write + read (passthrough.io=rw)
(fd, filepath) = tempfile.mkstemp(dir=sys.argv[1])
test_data = b"passthrough io test data for create\n" * 100
# Write to newly created file
bytes_written = os.write(fd, test_data)
if bytes_written != len(test_data):
print("create write failed: expected {} bytes, wrote {}".format(len(test_data), bytes_written))
sys.exit(1)
# Seek and read back
os.lseek(fd, 0, os.SEEK_SET)
read_data = os.read(fd, len(test_data))
if read_data != test_data:
print("create read failed: data mismatch")
sys.exit(1)
os.close(fd)
# Test open existing file + write + read
fd = os.open(filepath, os.O_RDWR)
# Read existing data
os.lseek(fd, 0, os.SEEK_SET)
read_data = os.read(fd, len(test_data))
if read_data != test_data:
print("open read failed: data mismatch")
sys.exit(1)
# Write more data at end
os.lseek(fd, 0, os.SEEK_END)
more_data = b"additional passthrough data for open\n" * 50
bytes_written = os.write(fd, more_data)
if bytes_written != len(more_data):
print("open write failed: expected {} bytes, wrote {}".format(len(more_data), bytes_written))
sys.exit(1)
# Verify all data
os.lseek(fd, 0, os.SEEK_SET)
all_data = os.read(fd, len(test_data) + len(more_data))
if all_data != test_data + more_data:
print("open final read failed: data mismatch")
sys.exit(1)
# Test multiple opens of same file (single backing_id feature)
# When a file is already open with passthrough, subsequent opens
# must reuse the same backing_id rather than creating new ones
fd2 = os.open(filepath, os.O_RDWR)
# Read from second fd - should see same data
os.lseek(fd2, 0, os.SEEK_SET)
read_data2 = os.read(fd2, len(test_data) + len(more_data))
if read_data2 != test_data + more_data:
print("second open read failed: data mismatch")
os.close(fd)
os.close(fd2)
sys.exit(1)
# Write from second fd
os.lseek(fd2, 0, os.SEEK_END)
extra_data = b"data from second file descriptor\n" * 25
bytes_written = os.write(fd2, extra_data)
if bytes_written != len(extra_data):
print("second open write failed: expected {} bytes, wrote {}".format(len(extra_data), bytes_written))
os.close(fd)
os.close(fd2)
sys.exit(1)
# Verify write is visible from first fd (shared backing)
os.lseek(fd, 0, os.SEEK_SET)
combined_data = os.read(fd, len(test_data) + len(more_data) + len(extra_data))
expected_data = test_data + more_data + extra_data
if combined_data != expected_data:
print("cross-fd read failed: data mismatch (writes from fd2 not visible on fd)")
os.close(fd)
os.close(fd2)
sys.exit(1)
# Open a third fd while others are still open
fd3 = os.open(filepath, os.O_RDONLY)
os.lseek(fd3, 0, os.SEEK_SET)
read_data3 = os.read(fd3, len(expected_data))
if read_data3 != expected_data:
print("third open read failed: data mismatch")
os.close(fd)
os.close(fd2)
os.close(fd3)
sys.exit(1)
os.close(fd3)
os.close(fd2)
os.close(fd)
# Cleanup
os.unlink(filepath)

132
tests/TEST_io_passthrough_open_race

@ -0,0 +1,132 @@
#!/usr/bin/env python3
import os
import sys
import tempfile
import threading
import time
NUM_THREADS = 50
TEST_DATA = b"race condition test data\n"
def open_and_operate(filepath, barrier, results, index):
"""
Wait at barrier, then open file, write, read, and store result.
"""
try:
# Wait for all threads to be ready
barrier.wait()
# All threads try to open simultaneously
fd = os.open(filepath, os.O_RDWR)
# Write thread-specific data at a unique offset
offset = index * len(TEST_DATA)
os.lseek(fd, offset, os.SEEK_SET)
bytes_written = os.write(fd, TEST_DATA)
if bytes_written != len(TEST_DATA):
results[index] = ("write_error", "expected {} bytes, wrote {}".format(len(TEST_DATA), bytes_written))
os.close(fd)
return
# Read back what we wrote
os.lseek(fd, offset, os.SEEK_SET)
read_data = os.read(fd, len(TEST_DATA))
if read_data != TEST_DATA:
results[index] = ("read_error", "data mismatch at offset {}".format(offset))
os.close(fd)
return
results[index] = ("success", fd)
except Exception as e:
results[index] = ("exception", str(e))
# Create test file with enough space for all threads
(fd, filepath) = tempfile.mkstemp(dir=sys.argv[1])
# Pre-allocate file to avoid issues with concurrent writes extending file
total_size = NUM_THREADS * len(TEST_DATA)
os.ftruncate(fd, total_size)
os.close(fd)
# Set up synchronization
barrier = threading.Barrier(NUM_THREADS)
results = [None] * NUM_THREADS
threads = []
# Create and start all threads
for i in range(NUM_THREADS):
t = threading.Thread(target=open_and_operate, args=(filepath, barrier, results, i))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
# Check results
failed = False
fds_to_close = []
for i, result in enumerate(results):
if result is None:
print("thread {} returned no result".format(i))
failed = True
elif result[0] == "success":
fds_to_close.append(result[1])
else:
print("thread {} failed: {} - {}".format(i, result[0], result[1]))
failed = True
if failed:
for fd in fds_to_close:
os.close(fd)
os.unlink(filepath)
sys.exit(1)
# Verify all data is consistent by reading through one fd
if fds_to_close:
verify_fd = fds_to_close[0]
os.lseek(verify_fd, 0, os.SEEK_SET)
all_data = os.read(verify_fd, total_size)
expected_data = TEST_DATA * NUM_THREADS
if all_data != expected_data:
print("final verification failed: data mismatch")
print("expected {} bytes, got {} bytes".format(len(expected_data), len(all_data)))
for fd in fds_to_close:
os.close(fd)
os.unlink(filepath)
sys.exit(1)
# Test cross-fd visibility: write from one fd, read from another
if len(fds_to_close) >= 2:
fd_a = fds_to_close[0]
fd_b = fds_to_close[1]
# Write new data from fd_a
new_data = b"cross-fd visibility test\n"
os.lseek(fd_a, 0, os.SEEK_SET)
os.write(fd_a, new_data)
# Read from fd_b - should see the new data
os.lseek(fd_b, 0, os.SEEK_SET)
read_back = os.read(fd_b, len(new_data))
if read_back != new_data:
print("cross-fd visibility failed: write from fd_a not visible on fd_b")
for fd in fds_to_close:
os.close(fd)
os.unlink(filepath)
sys.exit(1)
# Close all file descriptors
for fd in fds_to_close:
os.close(fd)
# Cleanup
os.unlink(filepath)

10
vendored/libfuse/include/fuse.h

@ -29,6 +29,16 @@ EXTERN_C_BEGIN
* Basic FUSE API *
* ----------------------------------------------------------- */
#define INVALID_BACKING_ID (0)
static
inline
int
fuse_backing_id_is_valid(const int backing_id_)
{
return (backing_id_ > INVALID_BACKING_ID);
}
struct fuse_dirents_t;
typedef struct fuse_dirents_t fuse_dirents_t;

5
vendored/libfuse/lib/fuse.cpp

@ -3800,7 +3800,7 @@ fuse_passthrough_open(const int fd_)
rv = ::ioctl(dev_fuse_fd,FUSE_DEV_IOC_BACKING_OPEN,&bm);
return rv;
return ((rv < 0) ? INVALID_BACKING_ID : rv);
}
int
@ -3808,6 +3808,9 @@ fuse_passthrough_close(const int backing_id_)
{
int dev_fuse_fd;
if(!fuse_backing_id_is_valid(backing_id_))
return 0;
dev_fuse_fd = f.se->fd;
return ::ioctl(dev_fuse_fd,FUSE_DEV_IOC_BACKING_CLOSE,&backing_id_);

2
vendored/libfuse/lib/fuse_lowlevel.cpp

@ -258,7 +258,7 @@ fill_open(struct fuse_open_out *arg_,
arg_->open_flags |= FOPEN_PARALLEL_DIRECT_WRITES;
if(ffi_->noflush)
arg_->open_flags |= FOPEN_NOFLUSH;
if(ffi_->passthrough && (ffi_->backing_id > 0))
if(ffi_->passthrough && fuse_backing_id_is_valid(ffi_->backing_id))
{
arg_->open_flags |= FOPEN_PASSTHROUGH;
arg_->backing_id = ffi_->backing_id;

Loading…
Cancel
Save