Browse Source

Use debug mutex more

pull/1502/merge
Antonio SJ Musumeci 2 weeks ago
committed by trapexit
parent
commit
a2860c3f1f
  1. 2
      Makefile
  2. 2
      libfuse/include/fuse_msgbuf.hpp
  3. 45
      libfuse/include/mutex.hpp
  4. 28
      libfuse/include/mutex_debug.hpp
  5. 24
      libfuse/include/mutex_ndebug.hpp
  6. 109
      libfuse/include/objpool.hpp
  7. 0
      libfuse/include/scope_guard.hpp
  8. 20
      libfuse/include/thread_pool.hpp
  9. 380
      libfuse/lib/fmp.h
  10. 142
      libfuse/lib/fuse.cpp
  11. 21
      libfuse/lib/fuse_lowlevel.cpp
  12. 214
      libfuse/lib/fuse_msgbuf.cpp
  13. 245
      libfuse/lib/lfmp.h
  14. 9
      libfuse/lib/maintenance_thread.cpp
  15. 14
      src/branches.cpp
  16. 10
      src/branches.hpp
  17. 9
      src/fileinfo.hpp
  18. 89
      src/fixed_mem_pool.hpp
  19. 8
      src/fs_statvfs_cache.cpp
  20. 2
      src/fuse_readdir_cor.cpp
  21. 6
      src/fuse_readdir_cor_getdents.icpp
  22. 4
      src/fuse_readdir_cor_readdir.icpp
  23. 5
      src/fuse_write.cpp
  24. 72
      src/locked_fixed_mem_pool.hpp
  25. 21
      src/mempools.cpp
  26. 23
      src/mempools.hpp

2
Makefile

@ -169,6 +169,8 @@ help:
@echo "USE_XATTR=0 - build program without xattrs functionality"
@echo "STATIC=1 - build static binary"
@echo "LTO=1 - build with link time optimization"
@echo "SANITIZE=1 - build with sanitizers (address,undefined,leak)"
@echo "SANITIZE=list - build with custom sanitizers (comma separated)"
@echo "CLEANUP=1 - cleanup images between release build process"
@echo "PKGDIR=/dir/ - location for release build pkgs"
@echo "GITREF=gitref - gitref to use for release builds"

2
libfuse/include/fuse_msgbuf.hpp

@ -29,8 +29,8 @@ fuse_msgbuf_t *msgbuf_alloc();
fuse_msgbuf_t *msgbuf_alloc_page_aligned();
void msgbuf_free(fuse_msgbuf_t *msgbuf);
void msgbuf_clear();
void msgbuf_gc();
void msgbuf_gc_10percent();
u64 msgbuf_alloc_count();
u64 msgbuf_avail_count();

45
libfuse/include/mutex.hpp

@ -5,3 +5,48 @@
#else
#include "mutex_ndebug.hpp"
#endif
typedef pthread_mutex_t mutex_t;
// Simple mutex_t wrapper to provide RAII
// Because macros are needed to get location in the code a lock is
// called not making it std::mutex compatible.
class Mutex
{
private:
mutex_t _mutex;
public:
Mutex()
{
mutex_init(_mutex);
}
~Mutex()
{
mutex_destroy(_mutex);
}
operator mutex_t&()
{
return _mutex;
}
};
class LockGuard
{
private:
mutex_t &_mutex;
public:
LockGuard(mutex_t &mutex_)
: _mutex(mutex_)
{
mutex_lock(_mutex);
}
~LockGuard()
{
mutex_unlock(_mutex);
}
};

28
libfuse/include/mutex_debug.hpp

@ -2,19 +2,19 @@
#include "fmt/core.h"
#include <ctime>
#include <cstdlib>
#include <pthread.h>
#include <time.h>
#ifndef PTHREAD_MUTEX_ADAPTIVE_NP
# define PTHREAD_MUTEX_ADAPTIVE_NP PTHREAD_MUTEX_NORMAL
#endif
#define mutex_init(M) _mutex_init(M,__FILE__,__func__,__LINE__)
#define mutex_destroy(M) _mutex_destroy(M,__FILE__,__func__,__LINE__)
#define mutex_lock(M) _mutex_lock(M,__FILE__,__func__,__LINE__)
#define mutex_unlock(M) _mutex_unlock(M,__FILE__,__func__,__LINE__)
#define mutex_init(M) _mutex_init((M),__FILE__,__func__,__LINE__)
#define mutex_destroy(M) _mutex_destroy((M),__FILE__,__func__,__LINE__)
#define mutex_lock(M) _mutex_lock((M),__FILE__,__func__,__LINE__)
#define mutex_unlock(M) _mutex_unlock((M),__FILE__,__func__,__LINE__)
static
@ -34,13 +34,13 @@ _print_error_and_abort(const char *func1_,
file_,
func2_,
linenum_);
abort();
std::abort();
}
static
inline
void
_mutex_init(pthread_mutex_t *mutex_,
_mutex_init(pthread_mutex_t &mutex_,
const char *file_,
const char *func_,
const int linenum_)
@ -51,7 +51,7 @@ _mutex_init(pthread_mutex_t *mutex_,
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_ADAPTIVE_NP);
rv = pthread_mutex_init(mutex_,&attr);
rv = pthread_mutex_init(&mutex_,&attr);
if(rv != 0)
_print_error_and_abort(__func__,rv,file_,func_,linenum_);
@ -61,14 +61,14 @@ _mutex_init(pthread_mutex_t *mutex_,
static
inline
void
_mutex_destroy(pthread_mutex_t *mutex_,
_mutex_destroy(pthread_mutex_t &mutex_,
const char *file_,
const char *func_,
const int linenum_)
{
int rv;
rv = pthread_mutex_destroy(mutex_);
rv = pthread_mutex_destroy(&mutex_);
if(rv != 0)
_print_error_and_abort(__func__,rv,file_,func_,linenum_);
}
@ -76,7 +76,7 @@ _mutex_destroy(pthread_mutex_t *mutex_,
static
inline
void
_mutex_lock(pthread_mutex_t *mutex_,
_mutex_lock(pthread_mutex_t &mutex_,
const char *file_,
const char *func_,
const int linenum_)
@ -95,7 +95,7 @@ _mutex_lock(pthread_mutex_t *mutex_,
timeout.tv_nsec -= 1000000000;
}
rv = pthread_mutex_timedlock(mutex_,&timeout);
rv = pthread_mutex_timedlock(&mutex_,&timeout);
switch(rv)
{
case 0:
@ -119,14 +119,14 @@ _mutex_lock(pthread_mutex_t *mutex_,
static
inline
void
_mutex_unlock(pthread_mutex_t *mutex_,
_mutex_unlock(pthread_mutex_t &mutex_,
const char *file_,
const char *func_,
const int linenum_)
{
int rv;
rv = pthread_mutex_unlock(mutex_);
rv = pthread_mutex_unlock(&mutex_);
if(rv != 0)
_print_error_and_abort(__func__,rv,file_,func_,linenum_);
}

24
libfuse/include/mutex_ndebug.hpp

@ -12,7 +12,7 @@
static
inline
void
mutex_init(pthread_mutex_t *mutex_)
mutex_init(pthread_mutex_t &mutex_)
{
int rv;
pthread_mutexattr_t attr;
@ -20,9 +20,9 @@ mutex_init(pthread_mutex_t *mutex_)
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_ADAPTIVE_NP);
rv = pthread_mutex_init(mutex_,&attr);
rv = pthread_mutex_init(&mutex_,&attr);
if(rv != 0)
abort();
std::abort();
pthread_mutexattr_destroy(&attr);
}
@ -30,35 +30,35 @@ mutex_init(pthread_mutex_t *mutex_)
static
inline
void
mutex_lock(pthread_mutex_t *mutex_)
mutex_lock(pthread_mutex_t &mutex_)
{
int rv;
rv = pthread_mutex_lock(mutex_);
rv = pthread_mutex_lock(&mutex_);
if(rv != 0)
abort();
std::abort();
}
static
inline
void
mutex_unlock(pthread_mutex_t *mutex_)
mutex_unlock(pthread_mutex_t &mutex_)
{
int rv;
rv = pthread_mutex_unlock(mutex_);
rv = pthread_mutex_unlock(&mutex_);
if(rv != 0)
abort();
std::abort();
}
static
inline
void
mutex_destroy(pthread_mutex_t *mutex_)
mutex_destroy(pthread_mutex_t &mutex_)
{
int rv;
rv = pthread_mutex_destroy(mutex_);
rv = pthread_mutex_destroy(&mutex_);
if(rv != 0)
abort();
std::abort();
}

109
libfuse/include/objpool.hpp

@ -16,14 +16,43 @@
#pragma once
#include "mutex.hpp"
#include <algorithm>
#include <atomic>
#include <memory>
#include <mutex>
#include <new>
#include <utility>
#include <type_traits>
template<typename T>
struct DefaultAllocator
{
void*
allocate(size_t size_, size_t align_)
{
return ::operator new(size_, std::align_val_t{align_});
}
void
deallocate(void *ptr_, size_t /*size_*/, size_t align_)
{
::operator delete(ptr_, std::align_val_t{align_});
}
};
struct DefaultShouldPool
{
template<typename T>
bool operator()(T*) const noexcept
{
return true;
}
};
template<typename T,
typename Allocator = DefaultAllocator,
typename ShouldPool = DefaultShouldPool>
class ObjPool
{
private:
@ -45,11 +74,34 @@ private:
}
private:
std::mutex _mtx;
mutex_t _mtx;
Node *_head = nullptr;
std::atomic<size_t> _pooled_count{0};
std::atomic<size_t> _pool_count{0};
Allocator _allocator;
ShouldPool _should_pool;
public:
template<typename AllocPred,
typename = std::enable_if_t<
!std::is_same_v<std::decay_t<AllocPred>, ObjPool> &&
!std::is_same_v<std::decay_t<AllocPred>, Allocator>
>>
explicit
ObjPool(AllocPred&& allocator_pred_)
: _allocator(allocator_pred_)
{}
template<typename ShouldPoolPred,
typename = std::enable_if_t<
!std::is_same_v<std::decay_t<ShouldPoolPred>, ObjPool> &&
!std::is_same_v<std::decay_t<ShouldPoolPred>, ShouldPool>
>>
explicit
ObjPool(Allocator allocator_, ShouldPoolPred&& should_pool_pred_)
: _allocator(allocator_),
_should_pool(std::forward<ShouldPoolPred>(should_pool_pred_))
{}
ObjPool() = default;
ObjPool(const ObjPool&) = delete;
ObjPool& operator=(const ObjPool&) = delete;
@ -65,26 +117,31 @@ private:
Node*
_pop_node()
{
std::lock_guard<std::mutex> lock(_mtx);
mutex_lock(&_mtx);
Node *node = _head;
if(node)
{
_head = node->next;
_pooled_count.fetch_sub(1, std::memory_order_relaxed);
_pool_count.fetch_sub(1, std::memory_order_relaxed);
}
mutex_unlock(&_mtx);
return node;
}
void
_push_node(Node *node_)
{
std::lock_guard<std::mutex> lock(_mtx);
mutex_lock(_mtx);
node_->next = _head;
_head = node_;
_pooled_count.fetch_add(1, std::memory_order_relaxed);
mutex_unlock(_mtx);
_pool_count.fetch_add(1, std::memory_order_relaxed);
}
public:
@ -93,24 +150,26 @@ public:
{
Node *head;
{
std::lock_guard<std::mutex> lock(_mtx);
head = _head;
_head = nullptr;
_pooled_count.store(0, std::memory_order_relaxed);
}
mutex_lock(_mtx);
head = _head;
_head = nullptr;
mutex_unlock(_mtx);
_pool_count.store(0, std::memory_order_relaxed);
while(head)
{
Node *next = head->next;
::operator delete(head, std::align_val_t{alignof(T)});
_allocator.deallocate(head, sizeof(Node), alignof(Node));
head = next;
}
}
template<typename... Args>
T*
alloc(Args&&... args_)
alloc(size_t size_ = sizeof(T), Args&&... args_)
{
void *mem;
Node *node;
@ -118,7 +177,7 @@ public:
node = _pop_node();
mem = (node ?
static_cast<void*>(node) :
::operator new(sizeof(T), std::align_val_t{alignof(T)}));
_allocator.allocate(size_, alignof(T)));
try
{
@ -126,32 +185,38 @@ public:
}
catch(...)
{
_push_node(static_cast<Node*>(mem));
if(!node)
_allocator.deallocate(mem, size_, alignof(T));
throw;
}
}
void
free(T *obj_) noexcept
free(T *obj_, size_t size_ = sizeof(T)) noexcept
{
if(not obj_)
return;
bool should_pool = _should_pool(obj_);
obj_->~T();
_push_node(to_node(obj_));
if(should_pool)
_push_node(to_node(obj_));
else
_allocator.deallocate(obj_, size_, alignof(T));
}
size_t
size() const noexcept
{
return _pooled_count.load(std::memory_order_relaxed);
return _pool_count.load(std::memory_order_relaxed);
}
void
gc() noexcept
{
size_t count = _pooled_count.load(std::memory_order_relaxed);
size_t count = _pool_count.load(std::memory_order_relaxed);
size_t to_free = std::max(count / 10, size_t{1});
for(size_t i = 0; i < to_free; ++i)
@ -159,7 +224,7 @@ public:
Node *node = _pop_node();
if(not node)
break;
::operator delete(node, std::align_val_t{alignof(T)});
_allocator.deallocate(node, sizeof(Node), alignof(Node));
}
}
};

0
src/scope_guard.hpp → libfuse/include/scope_guard.hpp

20
libfuse/include/thread_pool.hpp

@ -4,13 +4,14 @@
#include "moodycamel/lightweightsemaphore.h"
#include "invocable.h"
#include "mutex.hpp"
#include <algorithm>
#include <atomic>
#include <csignal>
#include <cstring>
#include <future>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <string>
#include <thread>
@ -91,7 +92,7 @@ private:
private:
std::string const _name;
std::vector<pthread_t> _threads;
mutable std::mutex _threads_mutex;
mutable Mutex _threads_mutex;
};
@ -230,7 +231,8 @@ ThreadPool::add_thread(const std::string name_)
pthread_setname_np(t,name.c_str());
{
std::lock_guard<std::mutex> lg(_threads_mutex);
LockGuard lg(_threads_mutex);
_threads.push_back(t);
}
@ -243,7 +245,8 @@ int
ThreadPool::remove_thread(void)
{
{
std::lock_guard<std::mutex> lg(_threads_mutex);
LockGuard lg(_threads_mutex);
if(_threads.size() <= 1)
return -EINVAL;
}
@ -258,7 +261,7 @@ ThreadPool::remove_thread(void)
promise.set_value(t);
{
std::lock_guard<std::mutex> lg(_threads_mutex);
LockGuard lg(_threads_mutex);
for(auto i = _threads.begin(); i != _threads.end(); ++i)
{
@ -285,8 +288,9 @@ int
ThreadPool::set_threads(std::size_t const count_)
{
int diff;
{
std::lock_guard<std::mutex> lg(_threads_mutex);
LockGuard lg(_threads_mutex);
diff = ((int)count_ - (int)_threads.size());
}
@ -304,7 +308,7 @@ inline
void
ThreadPool::shutdown(void)
{
std::lock_guard<std::mutex> lg(_threads_mutex);
LockGuard lg(_threads_mutex);
for(pthread_t tid : _threads)
pthread_cancel(tid);
@ -375,7 +379,7 @@ inline
std::vector<pthread_t>
ThreadPool::threads() const
{
std::lock_guard<std::mutex> lg(_threads_mutex);
LockGuard lg(_threads_mutex);
return _threads;
}

380
libfuse/lib/fmp.h

@ -1,380 +0,0 @@
/*
ISC License
Copyright (c) 2021, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include "kvec.h"
#include <errno.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <unistd.h>
#define ROUND_UP(N,S) ((((N) + (S) - 1) / (S)) * (S))
typedef kvec_t(void*) slab_kvec_t;
typedef struct mem_stack_t mem_stack_t;
struct mem_stack_t
{
mem_stack_t *next;
};
typedef struct fmp_t fmp_t;
struct fmp_t
{
mem_stack_t *objs;
slab_kvec_t slabs;
uint64_t avail_objs;
uint64_t obj_size;
uint64_t page_size;
uint64_t slab_size;
};
static
inline
uint64_t
fmp_page_size()
{
return sysconf(_SC_PAGESIZE);
}
static
inline
void
fmp_init(fmp_t *fmp_,
const uint64_t obj_size_,
const uint64_t page_multiple_)
{
kv_init(fmp_->slabs);
fmp_->objs = NULL;
fmp_->avail_objs = 0;
fmp_->obj_size = ROUND_UP(obj_size_,sizeof(void*));
fmp_->page_size = fmp_page_size();
fmp_->slab_size = (fmp_->page_size * page_multiple_);
}
static
inline
uint64_t
fmp_slab_count(fmp_t *fmp_)
{
return kv_size(fmp_->slabs);
}
static
inline
void*
fmp_slab_alloc_posix_memalign(fmp_t *fmp_)
{
int rv;
void *mem;
const size_t alignment = fmp_->page_size;
const size_t size = fmp_->slab_size;
rv = posix_memalign(&mem,alignment,size);
if(rv != 0)
return NULL;
return NULL;
}
static
inline
void*
fmp_slab_alloc_mmap(fmp_t *fmp_)
{
void *mem;
void *address = NULL;
const size_t length = fmp_->slab_size;
const int protect = PROT_READ|PROT_WRITE;
const int flags = MAP_PRIVATE|MAP_ANONYMOUS;
const int filedes = -1;
const off_t offset = 0;
mem = mmap(address,length,protect,flags,filedes,offset);
if(mem == MAP_FAILED)
return NULL;
return mem;
}
static
inline
void
fmp_slab_free_posix_memalign(fmp_t* fmp_,
void *mem_)
{
(void)fmp_;
free(mem_);
}
static
inline
void
fmp_slab_free_mmap(fmp_t* fmp_,
void *mem_)
{
void *addr = mem_;
size_t length = fmp_->slab_size;
(void)munmap(addr,length);
}
static
inline
int
fmp_slab_alloc(fmp_t *fmp_)
{
char *i;
void *mem;
mem = fmp_slab_alloc_mmap(fmp_);
if(mem == NULL)
return -ENOMEM;
kv_push(void*,fmp_->slabs,mem);
i = ((char*)mem + fmp_->slab_size - fmp_->obj_size);
while(i >= (char*)mem)
{
mem_stack_t *obj = (mem_stack_t*)i;
obj->next = fmp_->objs;
fmp_->objs = obj;
fmp_->avail_objs++;
i -= fmp_->obj_size;
}
return 0;
}
static
inline
void*
fmp_alloc(fmp_t *fmp_)
{
void *rv;
if(fmp_->objs == NULL)
fmp_slab_alloc(fmp_);
if(fmp_->objs == NULL)
return NULL;
rv = fmp_->objs;
fmp_->objs = fmp_->objs->next;
fmp_->avail_objs--;
return rv;
}
static
inline
void*
fmp_calloc(fmp_t *fmp_)
{
void *obj;
obj = fmp_alloc(fmp_);
if(obj == NULL)
return NULL;
memset(obj,0,fmp_->obj_size);
return obj;
}
static
inline
void
fmp_free(fmp_t *fmp_,
void *obj_)
{
mem_stack_t *obj = (mem_stack_t*)obj_;
obj->next = fmp_->objs;
fmp_->objs = obj;
fmp_->avail_objs++;
}
static
inline
void
fmp_clear(fmp_t *fmp_)
{
while(kv_size(fmp_->slabs))
{
void *slab = kv_pop(fmp_->slabs);
fmp_slab_free_mmap(fmp_,slab);
}
fmp_->objs = NULL;
fmp_->avail_objs = 0;
}
static
inline
void
fmp_destroy(fmp_t *fmp_)
{
fmp_clear(fmp_);
kv_destroy(fmp_->slabs);
}
static
inline
uint64_t
fmp_avail_objs(fmp_t *fmp_)
{
return fmp_->avail_objs;
}
static
inline
uint64_t
fmp_objs_per_slab(fmp_t *fmp_)
{
return (fmp_->slab_size / fmp_->obj_size);
}
static
inline
uint64_t
fmp_objs_in_slab(fmp_t *fmp_,
void *slab_)
{
char *slab;
uint64_t objs_per_slab;
uint64_t objs_in_slab;
slab = (char*)slab_;
objs_in_slab = 0;
objs_per_slab = fmp_objs_per_slab(fmp_);
for(mem_stack_t *stack = fmp_->objs; stack != NULL; stack = stack->next)
{
char *obj = (char*)stack;
if((obj >= slab) && (obj < (slab + fmp_->slab_size)))
objs_in_slab++;
if(objs_in_slab >= objs_per_slab)
break;
}
return objs_in_slab;
}
static
inline
void
fmp_remove_objs_in_slab(fmp_t *fmp_,
void *slab_)
{
char *slab;
uint64_t objs_per_slab;
uint64_t objs_in_slab;
mem_stack_t **p;
p = &fmp_->objs;
slab = (char*)slab_;
objs_in_slab = 0;
objs_per_slab = fmp_objs_per_slab(fmp_);
while((*p) != NULL)
{
char *obj = (char*)*p;
if((obj >= slab) && (obj < (slab + fmp_->slab_size)))
{
objs_in_slab++;
*p = (*p)->next;
fmp_->avail_objs--;
if(objs_in_slab >= objs_per_slab)
break;
continue;
}
p = &(*p)->next;
}
}
static
inline
int
fmp_gc_slab(fmp_t *fmp_,
uint64_t slab_idx_)
{
char *slab;
uint64_t objs_in_slab;
uint64_t objs_per_slab;
slab_idx_ = (slab_idx_ % kv_size(fmp_->slabs));
slab = (char*)kv_A(fmp_->slabs,slab_idx_);
objs_per_slab = fmp_objs_per_slab(fmp_);
objs_in_slab = fmp_objs_in_slab(fmp_,slab);
if(objs_in_slab != objs_per_slab)
return 0;
fmp_remove_objs_in_slab(fmp_,slab);
kv_delete(fmp_->slabs,slab_idx_);
fmp_slab_free_mmap(fmp_,slab);
return 1;
}
static
inline
int
fmp_gc(fmp_t *fmp_)
{
uint64_t slab_idx;
slab_idx = rand();
return fmp_gc_slab(fmp_,slab_idx);
}
static
inline
double
fmp_slab_usage_ratio(fmp_t *fmp_)
{
double avail_objs;
double objs_per_slab;
double nums_of_slabs;
avail_objs = fmp_->avail_objs;
objs_per_slab = fmp_objs_per_slab(fmp_);
nums_of_slabs = kv_size(fmp_->slabs);
return (avail_objs / (objs_per_slab * nums_of_slabs));
}
static
inline
uint64_t
fmp_total_allocated_memory(fmp_t *fmp_)
{
return (fmp_->slab_size * kv_size(fmp_->slabs));
}

142
libfuse/lib/fuse.cpp

@ -123,7 +123,7 @@ struct fuse
struct node_table id_table;
nodeid_gen_t nodeid_gen;
unsigned int hidectr;
pthread_mutex_t lock;
mutex_t lock;
struct fuse_config conf;
fuse_operations ops;
struct lock_queue_element *lockq;
@ -146,9 +146,9 @@ struct lock
struct fuse_dh
{
pthread_mutex_t lock;
uint64_t fh;
fuse_dirents_t d;
mutex_t lock;
uint64_t fh;
fuse_dirents_t d;
};
static struct fuse f = {};
@ -638,7 +638,7 @@ find_node(uint64_t parent,
{
node_t *node;
mutex_lock(&f.lock);
mutex_lock(f.lock);
if(!name)
node = get_node(parent);
else
@ -668,7 +668,7 @@ find_node(uint64_t parent,
}
inc_nlookup(node);
out_err:
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
return node;
}
@ -973,7 +973,7 @@ get_path_common(uint64_t nodeid,
{
int err;
mutex_lock(&f.lock);
mutex_lock(f.lock);
err = try_get_path(nodeid,name,path,wnode,true);
if(err == -EAGAIN)
{
@ -986,7 +986,7 @@ get_path_common(uint64_t nodeid,
err = wait_path(&qe);
}
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
return err;
}
@ -1031,7 +1031,7 @@ get_path2(uint64_t nodeid1,
{
int err;
mutex_lock(&f.lock);
mutex_lock(f.lock);
err = try_get_path2(nodeid1,name1,nodeid2,name2,
path1,path2,wnode1,wnode2);
if(err == -EAGAIN)
@ -1049,7 +1049,7 @@ get_path2(uint64_t nodeid1,
err = wait_path(&qe);
}
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
return err;
}
@ -1060,11 +1060,11 @@ free_path_wrlock(uint64_t nodeid,
node_t *wnode,
char *path)
{
mutex_lock(&f.lock);
mutex_lock(f.lock);
unlock_path(nodeid,wnode,NULL);
if(f.lockq)
wake_up_queued();
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
free(path);
}
@ -1086,11 +1086,11 @@ free_path2(uint64_t nodeid1,
char *path1,
char *path2)
{
mutex_lock(&f.lock);
mutex_lock(f.lock);
unlock_path(nodeid1,wnode1,NULL);
unlock_path(nodeid2,wnode2,NULL);
wake_up_queued();
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
free(path1);
free(path2);
}
@ -1105,7 +1105,7 @@ forget_node(const uint64_t nodeid,
if(nodeid == FUSE_ROOT_ID)
return;
mutex_lock(&f.lock);
mutex_lock(f.lock);
node = get_node(nodeid);
/*
@ -1146,7 +1146,7 @@ forget_node(const uint64_t nodeid,
kv_push(remembered_node_t,f.remembered_nodes,fn);
}
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
}
static
@ -1168,11 +1168,11 @@ remove_node(uint64_t dir,
{
node_t *node;
mutex_lock(&f.lock);
mutex_lock(f.lock);
node = lookup_node(dir,name);
if(node != NULL)
unlink_node(node);
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
}
static
@ -1186,7 +1186,7 @@ rename_node(uint64_t olddir,
node_t *node;
node_t *newnode;
mutex_lock(&f.lock);
mutex_lock(f.lock);
node = lookup_node(olddir,oldname);
newnode = lookup_node(newdir,newname);
if(node == NULL)
@ -1203,7 +1203,7 @@ rename_node(uint64_t olddir,
}
out:
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
return err;
}
@ -1257,9 +1257,9 @@ set_path_info(uint64_t nodeid,
e->ino = node->nodeid;
e->generation = ((e->ino == FUSE_ROOT_ID) ? 0 : f.nodeid_gen.generation);
mutex_lock(&f.lock);
mutex_lock(f.lock);
update_stat(node,&e->attr);
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
set_stat(e->ino,&e->attr);
@ -1356,16 +1356,16 @@ fuse_lib_lookup(fuse_req_t *req_,
if(name[1] == '\0')
{
name = NULL;
mutex_lock(&f.lock);
mutex_lock(f.lock);
dot = get_node_nocheck(nodeid);
if(dot == NULL)
{
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
reply_entry(req_,&e,-ESTALE);
return;
}
dot->refctr++;
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
}
else if((name[1] == '.') && (name[2] == '\0'))
{
@ -1376,9 +1376,9 @@ fuse_lib_lookup(fuse_req_t *req_,
}
name = NULL;
mutex_lock(&f.lock);
mutex_lock(f.lock);
nodeid = get_node(nodeid)->parent->nodeid;
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
}
}
@ -1396,9 +1396,9 @@ fuse_lib_lookup(fuse_req_t *req_,
if(dot)
{
mutex_lock(&f.lock);
mutex_lock(f.lock);
unref_node(dot);
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
}
reply_entry(req_,&e,err);
@ -1488,10 +1488,10 @@ fuse_lib_getattr(fuse_req_t *req_,
if(!err)
{
mutex_lock(&f.lock);
mutex_lock(f.lock);
node = get_node(hdr_->nodeid);
update_stat(node,&buf);
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
set_stat(hdr_->nodeid,&buf);
fuse_reply_attr(req_,&buf,timeout.attr);
}
@ -1668,9 +1668,9 @@ fuse_lib_setattr(fuse_req_t *req_,
if(!err)
{
mutex_lock(&f.lock);
mutex_lock(f.lock);
update_stat(get_node(hdr_->nodeid),&stbuf);
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
set_stat(hdr_->nodeid,&stbuf);
fuse_reply_attr(req_,&stbuf,timeout.attr);
}
@ -2011,7 +2011,7 @@ fuse_do_release(fuse_req_ctx_t *req_ctx_,
f.ops.release(req_ctx_,
ffi_);
mutex_lock(&f.lock);
mutex_lock(f.lock);
{
node_t *node;
@ -2019,7 +2019,7 @@ fuse_do_release(fuse_req_ctx_t *req_ctx_,
assert(node->open_count > 0);
node->open_count--;
}
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
}
static
@ -2078,9 +2078,9 @@ fuse_lib_create(fuse_req_t *req_,
if(!err)
{
mutex_lock(&f.lock);
mutex_lock(f.lock);
get_node(e.ino)->open_count++;
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
if(fuse_reply_create(req_,&e,&ffi) == -ENOENT)
{
@ -2109,7 +2109,7 @@ open_auto_cache(fuse_req_ctx_t *req_ctx_,
node_t *node;
fuse_timeouts_t timeout;
mutex_lock(&f.lock);
mutex_lock(f.lock);
node = get_node(ino);
if(node->is_stat_cache_valid)
@ -2117,12 +2117,12 @@ open_auto_cache(fuse_req_ctx_t *req_ctx_,
int err;
struct stat stbuf;
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
err = f.ops.fgetattr(req_ctx_,
fi->fh,
&stbuf,
&timeout);
mutex_lock(&f.lock);
mutex_lock(f.lock);
if(!err)
update_stat(node,&stbuf);
@ -2135,7 +2135,7 @@ open_auto_cache(fuse_req_ctx_t *req_ctx_,
node->is_stat_cache_valid = 1;
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
}
static
@ -2167,9 +2167,9 @@ fuse_lib_open(fuse_req_t *req_,
if(!err)
{
mutex_lock(&f.lock);
mutex_lock(f.lock);
get_node(hdr_->nodeid)->open_count++;
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
/* The open syscall was interrupted,so it must be cancelled */
if(fuse_reply_open(req_,&ffi) == -ENOENT)
fuse_do_release(&req_->ctx,hdr_->nodeid,&ffi);
@ -2307,7 +2307,7 @@ fuse_lib_opendir(fuse_req_t *req_,
}
fuse_dirents_init(&dh->d);
mutex_init(&dh->lock);
mutex_init(dh->lock);
llffi.fh = (uintptr_t)dh;
ffi.flags = llffi.flags;
@ -2331,14 +2331,14 @@ fuse_lib_opendir(fuse_req_t *req_,
must be cancelled */
f.ops.releasedir(&req_->ctx,
&ffi);
mutex_destroy(&dh->lock);
mutex_destroy(dh->lock);
free(dh);
}
}
else
{
fuse_reply_err(req_,err);
mutex_destroy(&dh->lock);
mutex_destroy(dh->lock);
free(dh);
}
@ -2390,7 +2390,7 @@ fuse_lib_readdir(fuse_req_t *req_,
dh = get_dirhandle(&llffi,&ffi);
d = &dh->d;
mutex_lock(&dh->lock);
mutex_lock(dh->lock);
rv = 0;
if((arg->offset == 0) || (kv_size(d->data) == 0))
@ -2411,7 +2411,7 @@ fuse_lib_readdir(fuse_req_t *req_,
size);
out:
mutex_unlock(&dh->lock);
mutex_unlock(dh->lock);
}
static
@ -2434,7 +2434,7 @@ fuse_lib_readdir_plus(fuse_req_t *req_,
dh = get_dirhandle(&llffi,&ffi);
d = &dh->d;
mutex_lock(&dh->lock);
mutex_lock(dh->lock);
rv = 0;
if((arg->offset == 0) || (kv_size(d->data) == 0))
@ -2455,7 +2455,7 @@ fuse_lib_readdir_plus(fuse_req_t *req_,
size);
out:
mutex_unlock(&dh->lock);
mutex_unlock(dh->lock);
}
static
@ -2478,9 +2478,9 @@ fuse_lib_releasedir(fuse_req_t *req_,
&ffi);
/* Done to keep race condition between last readdir reply and the unlock */
mutex_lock(&dh->lock);
mutex_unlock(&dh->lock);
mutex_destroy(&dh->lock);
mutex_lock(dh->lock);
mutex_unlock(dh->lock);
mutex_destroy(dh->lock);
fuse_dirents_free(&dh->d);
free(dh);
fuse_reply_err(req_,0);
@ -2823,9 +2823,9 @@ fuse_lib_tmpfile(fuse_req_t *req_,
if(!err)
{
mutex_lock(&f.lock);
mutex_lock(f.lock);
get_node(e.ino)->open_count++;
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
if(fuse_reply_create(req_,&e,&ffi) == -ENOENT)
{
@ -3010,9 +3010,9 @@ fuse_flush_common(fuse_req_t *req_,
{
flock_to_lock(&lock,&l);
l.owner = ffi_->lock_owner;
mutex_lock(&f.lock);
mutex_lock(f.lock);
locks_insert(get_node(ino_),&l);
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
/* if op.lock() is defined FLUSH is needed regardless
of op.flush() */
@ -3135,11 +3135,11 @@ fuse_lib_getlk(fuse_req_t *req_,
flock_to_lock(&flk,&lk);
lk.owner = ffi.lock_owner;
mutex_lock(&f.lock);
mutex_lock(f.lock);
conflict = locks_conflict(get_node(hdr_->nodeid),&lk);
if(conflict)
lock_to_flock(conflict,&flk);
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
if(!conflict)
err = fuse_lock_common(req_,hdr_->nodeid,&ffi,&flk,F_GETLK);
else
@ -3166,9 +3166,9 @@ fuse_lib_setlk(fuse_req_t *req_,
lock_t l;
flock_to_lock(lock,&l);
l.owner = fi->lock_owner;
mutex_lock(&f.lock);
mutex_lock(f.lock);
locks_insert(get_node(ino),&l);
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
}
fuse_reply_err(req_,err);
@ -3361,12 +3361,12 @@ static
void
remembered_nodes_sort()
{
mutex_lock(&f.lock);
mutex_lock(f.lock);
qsort(&kv_first(f.remembered_nodes),
kv_size(f.remembered_nodes),
sizeof(remembered_node_t),
remembered_node_cmp);
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
}
#define MAX_PRUNE 100
@ -3379,7 +3379,7 @@ fuse_prune_some_remembered_nodes(int *offset_)
int pruned;
int checked;
mutex_lock(&f.lock);
mutex_lock(f.lock);
pruned = 0;
checked = 0;
@ -3414,7 +3414,7 @@ fuse_prune_some_remembered_nodes(int *offset_)
pruned++;
}
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
if((pruned < MAX_PRUNE) && (checked < MAX_CHECK))
*offset_ = -1;
@ -3662,7 +3662,7 @@ fuse_invalidate_all_nodes()
{
std::vector<std::string> names;
mutex_lock(&f.lock);
mutex_lock(f.lock);
for(size_t i = 0; i < f.id_table.size; i++)
{
node_t *node;
@ -3681,7 +3681,7 @@ fuse_invalidate_all_nodes()
names.emplace_back(node->name);
}
}
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
SysLog::info("invalidating {} file entries",
names.size());
@ -3699,7 +3699,7 @@ fuse_gc()
{
SysLog::info("running thorough garbage collection");
node_clear();
msgbuf_gc();
msgbuf_clear();
fuse_malloc_trim();
}
@ -3708,7 +3708,7 @@ fuse_gc1()
{
SysLog::info("running basic garbage collection");
node_gc();
msgbuf_gc_10percent();
msgbuf_gc();
fuse_malloc_trim();
}
@ -3773,7 +3773,7 @@ fuse_new(int fd_,
if(node_table_init(&f.id_table) == -1)
goto out_free_name_table;
mutex_init(&f.lock);
mutex_init(f.lock);
kv_init(f.remembered_nodes);
@ -3827,7 +3827,7 @@ fuse_destroy(struct fuse *)
free(f.id_table.array);
free(f.name_table.array);
mutex_destroy(&f.lock);
mutex_destroy(f.lock);
fuse_session_destroy(f.se);
kv_destroy(f.remembered_nodes);
}

21
libfuse/lib/fuse_lowlevel.cpp

@ -47,7 +47,7 @@ struct fuse_ll
void *userdata;
uid_t owner;
fuse_conn_info_t conn;
pthread_mutex_t lock;
mutex_t lock;
int got_init;
int got_destroy;
uint64_t notify_ctr;
@ -1339,7 +1339,7 @@ do_notify_reply(fuse_req_t *req,
struct fuse_notify_req *nreq;
struct fuse_notify_req *head;
mutex_lock(&f.lock);
mutex_lock(f.lock);
head = &f.notify_list;
for(nreq = head->next; nreq != head; nreq = nreq->next)
{
@ -1349,7 +1349,7 @@ do_notify_reply(fuse_req_t *req,
break;
}
}
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
if(nreq != head)
nreq->reply(nreq, req, hdr_->nodeid, &hdr_[1]);
@ -1579,12 +1579,12 @@ fuse_lowlevel_notify_retrieve(struct fuse_session *se,
if(rreq == NULL)
return -ENOMEM;
mutex_lock(&f.lock);
mutex_lock(f.lock);
rreq->cookie = cookie;
rreq->nreq.unique = f.notify_ctr++;
rreq->nreq.reply = fuse_ll_retrieve_reply;
list_add_nreq(&rreq->nreq, &f.notify_list);
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
outarg.notify_unique = rreq->nreq.unique;
outarg.nodeid = ino;
@ -1597,9 +1597,9 @@ fuse_lowlevel_notify_retrieve(struct fuse_session *se,
err = send_notify_iov(se, FUSE_NOTIFY_RETRIEVE, iov, 2);
if(err)
{
mutex_lock(&f.lock);
mutex_lock(f.lock);
list_del_nreq(&rreq->nreq);
mutex_unlock(&f.lock);
mutex_unlock(f.lock);
free(rreq);
}
@ -1680,7 +1680,8 @@ fuse_ll_destroy(void *data)
f.op.destroy(f.userdata);
}
mutex_destroy(&f.lock);
mutex_destroy(f.lock);
msgbuf_gc();
}
static
@ -1868,7 +1869,7 @@ fuse_lowlevel_new_common(struct fuse_args *args,
list_init_nreq(&f.notify_list);
f.notify_ctr = 1;
mutex_init(&f.lock);
mutex_init(f.lock);
if(fuse_opt_parse(args,NULL,fuse_ll_opts,fuse_ll_opt_proc) == -1)
goto out_free;
@ -1888,7 +1889,7 @@ fuse_lowlevel_new_common(struct fuse_args *args,
return se;
out_free:
mutex_destroy(&f.lock);
mutex_destroy(f.lock);
return nullptr;
}

214
libfuse/lib/fuse_msgbuf.cpp

@ -19,53 +19,69 @@
#include "fuse.h"
#include "fuse_kernel.h"
#include "objpool.hpp"
#include "fmt/core.h"
#include <unistd.h>
#include <cassert>
#include <atomic>
#include <cstdlib>
#include <mutex>
#include <vector>
static u32 g_PAGESIZE = 0;
static u64 g_BUFSIZE = 0;
static u32 g_pagesize = 0;
static u64 g_bufsize = 0;
static std::atomic<std::uint_fast64_t> g_MSGBUF_ALLOC_COUNT = 0;
static
__attribute__((constructor))
void
_constructor()
{
long pagesize = sysconf(_SC_PAGESIZE);
static std::mutex g_MUTEX;
static std::vector<fuse_msgbuf_t*> g_MSGBUF_STACK;
assert(pagesize > 0);
g_pagesize = pagesize;
u64
msgbuf_get_bufsize()
{
return g_BUFSIZE;
assert((sizeof(struct fuse_in_header) + sizeof(struct fuse_write_in))
< g_pagesize);
msgbuf_set_bufsize(FUSE_DEFAULT_MAX_MAX_PAGES);
}
u32
msgbuf_get_pagesize()
struct PageAlignedAllocator
{
return g_PAGESIZE;
}
void*
allocate(size_t size_, size_t align_)
{
void *buf = NULL;
int rv = posix_memalign(&buf, align_, size_);
return (rv == 0) ? buf : NULL;
}
// +1 page so that it is used for the "header" of the allocation as
// well as to allow for offseting for write requests to be page
// aligned. +1 again for fuse header as the max_pages value is for the
// body.
void
msgbuf_set_bufsize(const u64 size_in_pages_)
void
deallocate(void *ptr_, size_t /*size_*/, size_t /*align_*/)
{
free(ptr_);
}
};
struct ShouldPoolMsgbuf
{
g_BUFSIZE = ((size_in_pages_ + 2) * g_PAGESIZE);
}
bool operator()(const fuse_msgbuf_t *msgbuf) const noexcept
{
return msgbuf->size == (g_bufsize - g_pagesize);
}
};
static ObjPool<fuse_msgbuf_t, PageAlignedAllocator, ShouldPoolMsgbuf> g_msgbuf_pool;
static
void
_msgbuf_page_align(fuse_msgbuf_t *msgbuf_)
{
msgbuf_->mem = (char*)msgbuf_;
msgbuf_->mem += g_PAGESIZE;
msgbuf_->size = (g_BUFSIZE - g_PAGESIZE);
msgbuf_->mem += g_pagesize;
msgbuf_->size = (g_bufsize - g_pagesize);
}
static
@ -73,49 +89,10 @@ void
_msgbuf_write_align(fuse_msgbuf_t *msgbuf_)
{
msgbuf_->mem = (char*)msgbuf_;
msgbuf_->mem += g_PAGESIZE;
msgbuf_->mem += g_pagesize;
msgbuf_->mem -= sizeof(struct fuse_in_header);
msgbuf_->mem -= sizeof(struct fuse_write_in);
msgbuf_->size = (g_BUFSIZE - g_PAGESIZE);
}
static
__attribute__((constructor))
void
_msgbuf_constructor()
{
long pagesize = sysconf(_SC_PAGESIZE);
assert(pagesize > 0);
g_PAGESIZE = pagesize;
// Ensure fuse headers fit within a single page for O_DIRECT alignment
assert((sizeof(struct fuse_in_header) + sizeof(struct fuse_write_in))
< g_PAGESIZE);
msgbuf_set_bufsize(FUSE_DEFAULT_MAX_MAX_PAGES);
}
static
__attribute__((destructor))
void
_msgbuf_destructor()
{
msgbuf_gc();
}
static
void*
_page_aligned_malloc(const u64 size_in_bytes_)
{
int rv;
void *buf = NULL;
rv = posix_memalign(&buf,g_PAGESIZE,size_in_bytes_);
if(rv != 0)
return NULL;
return buf;
msgbuf_->size = (g_bufsize - g_pagesize);
}
typedef void (*msgbuf_setup_func_t)(fuse_msgbuf_t*);
@ -126,32 +103,15 @@ _msgbuf_alloc(msgbuf_setup_func_t setup_func_)
{
fuse_msgbuf_t *msgbuf;
g_MUTEX.lock();
if(g_MSGBUF_STACK.empty())
{
g_MUTEX.unlock();
msgbuf = (fuse_msgbuf_t*)_page_aligned_malloc(g_BUFSIZE);
if(msgbuf == NULL)
return NULL;
g_MSGBUF_ALLOC_COUNT.fetch_add(1,std::memory_order_relaxed);
}
else
{
msgbuf = g_MSGBUF_STACK.back();
g_MSGBUF_STACK.pop_back();
g_MUTEX.unlock();
}
msgbuf = g_msgbuf_pool.alloc(g_bufsize);
if(msgbuf == NULL)
return NULL;
setup_func_(msgbuf);
return msgbuf;
}
// Offset the memory so write request payload will be placed on page
// boundry so O_DIRECT can work. No impact on other message types
// except for `read` which will require using `msgbuf_page_align`.
fuse_msgbuf_t*
msgbuf_alloc()
{
@ -164,88 +124,50 @@ msgbuf_alloc_page_aligned()
return _msgbuf_alloc(_msgbuf_page_align);
}
static
void
msgbuf_destroy(fuse_msgbuf_t *msgbuf_)
msgbuf_free(fuse_msgbuf_t *msgbuf_)
{
free(msgbuf_);
g_msgbuf_pool.free(msgbuf_, g_bufsize);
}
void
msgbuf_free(fuse_msgbuf_t *msgbuf_)
u64
msgbuf_get_bufsize()
{
bool destroy;
{
std::lock_guard<std::mutex> lck(g_MUTEX);
return g_bufsize;
}
destroy = (msgbuf_->size != (g_BUFSIZE - g_PAGESIZE));
if(!destroy)
g_MSGBUF_STACK.emplace_back(msgbuf_);
}
u32
msgbuf_get_pagesize()
{
return g_pagesize;
}
if(destroy)
{
msgbuf_destroy(msgbuf_);
g_MSGBUF_ALLOC_COUNT.fetch_sub(1,std::memory_order_relaxed);
}
void
msgbuf_set_bufsize(const u64 size_in_pages_)
{
g_bufsize = ((size_in_pages_ + 2) * g_pagesize);
}
u64
msgbuf_alloc_count()
{
return g_MSGBUF_ALLOC_COUNT;
return g_msgbuf_pool.size();
}
u64
msgbuf_avail_count()
{
std::lock_guard<std::mutex> lck(g_MUTEX);
return g_MSGBUF_STACK.size();
return g_msgbuf_pool.size();
}
void
msgbuf_gc_10percent()
msgbuf_gc()
{
std::vector<fuse_msgbuf_t*> togc;
{
std::size_t size;
std::size_t ten_percent;
std::lock_guard<std::mutex> lck(g_MUTEX);
size = g_MSGBUF_STACK.size();
ten_percent = (size / 10);
for(std::size_t i = 0; i < ten_percent; i++)
{
togc.push_back(g_MSGBUF_STACK.back());
g_MSGBUF_STACK.pop_back();
}
}
for(auto msgbuf : togc)
{
msgbuf_destroy(msgbuf);
g_MSGBUF_ALLOC_COUNT.fetch_sub(1,std::memory_order_relaxed);
}
g_msgbuf_pool.gc();
}
void
msgbuf_gc()
msgbuf_clear()
{
std::vector<fuse_msgbuf_t*> oldstack;
{
std::lock_guard<std::mutex> lck(g_MUTEX);
oldstack.swap(g_MSGBUF_STACK);
}
for(auto msgbuf: oldstack)
{
msgbuf_destroy(msgbuf);
g_MSGBUF_ALLOC_COUNT.fetch_sub(1,std::memory_order_relaxed);
}
g_msgbuf_pool.clear();
}

245
libfuse/lib/lfmp.h

@ -1,245 +0,0 @@
/*
ISC License
Copyright (c) 2020, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include "fmp.h"
#include "mutex.hpp"
#include <pthread.h>
typedef struct lfmp_t lfmp_t;
struct lfmp_t
{
fmp_t fmp;
pthread_mutex_t lock;
};
static
inline
void
lfmp_init(lfmp_t *lfmp_,
const uint64_t obj_size_,
const uint64_t page_multiple_)
{
fmp_init(&lfmp_->fmp,obj_size_,page_multiple_);
mutex_init(&lfmp_->lock);
}
static
inline
void
lfmp_lock(lfmp_t *lfmp_)
{
mutex_lock(&lfmp_->lock);
}
static
inline
void
lfmp_unlock(lfmp_t *lfmp_)
{
mutex_unlock(&lfmp_->lock);
}
static
inline
uint64_t
lfmp_slab_count(lfmp_t *lfmp_)
{
uint64_t rv;
mutex_lock(&lfmp_->lock);
rv = fmp_slab_count(&lfmp_->fmp);
mutex_unlock(&lfmp_->lock);
return rv;
}
static
inline
int
lfmp_slab_alloc(lfmp_t *lfmp_)
{
int rv;
mutex_lock(&lfmp_->lock);
rv = fmp_slab_alloc(&lfmp_->fmp);
mutex_unlock(&lfmp_->lock);
return rv;
}
static
inline
void*
lfmp_alloc(lfmp_t *lfmp_)
{
void *rv;
mutex_lock(&lfmp_->lock);
rv = fmp_alloc(&lfmp_->fmp);
mutex_unlock(&lfmp_->lock);
return rv;
}
static
inline
void*
lfmp_calloc(lfmp_t *lfmp_)
{
void *rv;
mutex_lock(&lfmp_->lock);
rv = fmp_calloc(&lfmp_->fmp);
mutex_unlock(&lfmp_->lock);
return rv;
}
static
inline
void
lfmp_free(lfmp_t *lfmp_,
void *obj_)
{
mutex_lock(&lfmp_->lock);
fmp_free(&lfmp_->fmp,obj_);
mutex_unlock(&lfmp_->lock);
}
static
inline
void
lfmp_clear(lfmp_t *lfmp_)
{
mutex_lock(&lfmp_->lock);
fmp_clear(&lfmp_->fmp);
mutex_unlock(&lfmp_->lock);
}
static
inline
void
lfmp_destroy(lfmp_t *lfmp_)
{
mutex_lock(&lfmp_->lock);
fmp_destroy(&lfmp_->fmp);
mutex_unlock(&lfmp_->lock);
mutex_destroy(&lfmp_->lock);
}
static
inline
uint64_t
lfmp_avail_objs(lfmp_t *lfmp_)
{
uint64_t rv;
mutex_lock(&lfmp_->lock);
rv = fmp_avail_objs(&lfmp_->fmp);
mutex_unlock(&lfmp_->lock);
return rv;
}
static
inline
uint64_t
lfmp_objs_in_slab(lfmp_t *lfmp_,
void *slab_)
{
uint64_t rv;
mutex_lock(&lfmp_->lock);
rv = fmp_objs_in_slab(&lfmp_->fmp,slab_);
mutex_unlock(&lfmp_->lock);
return rv;
}
static
inline
void
lfmp_remove_objs_in_slab(lfmp_t *lfmp_,
void *slab_)
{
mutex_lock(&lfmp_->lock);
fmp_remove_objs_in_slab(&lfmp_->fmp,slab_);
mutex_unlock(&lfmp_->lock);
}
static
inline
int
lfmp_gc(lfmp_t *lfmp_)
{
int rv;
mutex_lock(&lfmp_->lock);
rv = fmp_gc(&lfmp_->fmp);
mutex_unlock(&lfmp_->lock);
return rv;
}
static
inline
uint64_t
lfmp_objs_per_slab(lfmp_t *lfmp_)
{
uint64_t rv;
mutex_lock(&lfmp_->lock);
rv = fmp_objs_per_slab(&lfmp_->fmp);
mutex_unlock(&lfmp_->lock);
return rv;
}
static
inline
double
lfmp_slab_usage_ratio(lfmp_t *lfmp_)
{
double rv;
mutex_lock(&lfmp_->lock);
rv = fmp_slab_usage_ratio(&lfmp_->fmp);
mutex_unlock(&lfmp_->lock);
return rv;
}
static
inline
uint64_t
lfmp_total_allocated_memory(lfmp_t *lfmp_)
{
uint64_t rv;
mutex_lock(&lfmp_->lock);
rv = fmp_total_allocated_memory(&lfmp_->fmp);
mutex_unlock(&lfmp_->lock);
return rv;
}

9
libfuse/lib/maintenance_thread.cpp

@ -1,9 +1,10 @@
#include "maintenance_thread.hpp"
#include "mutex.hpp"
#include "fmt/core.h"
#include <cassert>
#include <mutex>
#include <vector>
#include <pthread.h>
@ -12,7 +13,7 @@
pthread_t g_thread;
std::vector<std::function<void(int)>> g_funcs;
std::mutex g_mutex;
mutex_t g_mutex;
static
void*
@ -27,7 +28,7 @@ _thread_loop(void *)
{
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
{
std::lock_guard<std::mutex> lg(g_mutex);
LockGuard lg(g_mutex);
for(auto &func : g_funcs)
func(count);
}
@ -52,7 +53,7 @@ MaintenanceThread::setup()
void
MaintenanceThread::push_job(const std::function<void(int)> &func_)
{
std::lock_guard<std::mutex> lg(g_mutex);
LockGuard lg(g_mutex);
g_funcs.emplace_back(func_);
}

14
src/branches.cpp

@ -409,10 +409,7 @@ Branches::from_string(const std::string_view str_)
Branches::Ptr impl;
Branches::Ptr new_impl;
{
std::lock_guard<std::mutex> lock_guard(_mutex);
impl = _impl;
}
impl = std::atomic_load(&_impl);
new_impl = std::make_shared<Branches::Impl>(impl->minfreespace());
*new_impl = *impl;
@ -421,10 +418,7 @@ Branches::from_string(const std::string_view str_)
if(rv < 0)
return rv;
{
std::lock_guard<std::mutex> lock_guard(_mutex);
_impl = new_impl;
}
std::atomic_store(&_impl,new_impl);
return 0;
}
@ -432,9 +426,7 @@ Branches::from_string(const std::string_view str_)
std::string
Branches::to_string(void) const
{
std::lock_guard<std::mutex> lock_guard(_mutex);
return _impl->to_string();
return std::atomic_load(&_impl)->to_string();
}
void

10
src/branches.hpp

@ -23,12 +23,11 @@
#include "strvec.hpp"
#include "tofrom_string.hpp"
#include <atomic>
#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#include <memory>
#define MINFREESPACE_DEFAULT (4294967295ULL)
@ -70,8 +69,7 @@ public:
u64 minfreespace = MINFREESPACE_DEFAULT;
private:
mutable std::mutex _mutex;
Ptr _impl;
mutable std::shared_ptr<Impl> _impl;
public:
Branches()
@ -84,8 +82,8 @@ public:
std::string to_string(void) const final;
public:
operator Ptr() const { std::lock_guard<std::mutex> lg(_mutex); return _impl; }
Ptr operator->() const { std::lock_guard<std::mutex> lg(_mutex); return _impl; }
operator Ptr() const { return std::atomic_load(&_impl); }
Ptr operator->() const { return std::atomic_load(&_impl); }
public:
Impl::iterator begin() { return _impl->begin(); }

9
src/fileinfo.hpp

@ -20,9 +20,9 @@
#include "fh.hpp"
#include "fs_path.hpp"
#include "base_types.h"
#include "mutex.hpp"
#include <mutex>
#include "base_types.h"
class FileInfo : public FH
@ -40,6 +40,7 @@ public:
branch(*branch_),
direct_io(direct_io_)
{
mutex_init(mutex);
}
FileInfo(const int fd_,
@ -51,6 +52,7 @@ public:
branch(branch_),
direct_io(direct_io_)
{
mutex_init(mutex);
}
FileInfo(const FileInfo *fi_)
@ -59,6 +61,7 @@ public:
branch(fi_->branch),
direct_io(fi_->direct_io)
{
mutex_init(mutex);
}
public:
@ -68,7 +71,7 @@ public:
int fd;
Branch branch;
u32 direct_io:1;
std::mutex mutex;
mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
};
inline

89
src/fixed_mem_pool.hpp

@ -1,89 +0,0 @@
/*
ISC License
Copyright (c) 2020, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include <cstdint>
#include <stdlib.h>
typedef struct fixed_mem_pool_t fixed_mem_pool_t;
struct fixed_mem_pool_t
{
fixed_mem_pool_t *next;
};
template<uint64_t SIZE>
class FixedMemPool
{
public:
FixedMemPool()
{
list.next = NULL;
}
~FixedMemPool()
{
void *mem;
while(!empty())
{
mem = alloc();
::free(mem);
}
}
bool
empty(void)
{
return (list.next == NULL);
}
uint64_t
size(void)
{
return SIZE;
}
void*
alloc(void)
{
void *rv;
if(list.next == NULL)
return malloc(SIZE);
rv = (void*)list.next;
list.next = list.next->next;
return rv;
}
void
free(void *mem_)
{
fixed_mem_pool_t *next;
next = (fixed_mem_pool_t*)mem_;
next->next = list.next;
list.next = next;
}
private:
fixed_mem_pool_t list;
};

8
src/fs_statvfs_cache.cpp

@ -27,7 +27,6 @@
#include <map>
#include <string>
#include <pthread.h>
#include <sys/statvfs.h>
#include <time.h>
@ -42,7 +41,8 @@ typedef std::map<std::string,Element> statvfs_cache;
static uint64_t g_timeout = 0;
static statvfs_cache g_cache;
static pthread_mutex_t g_cache_lock = PTHREAD_MUTEX_INITIALIZER;
static Mutex g_cache_lock;
static
uint64_t
@ -81,7 +81,7 @@ fs::statvfs_cache(const char *path_,
rv = 0;
now = ::_get_time();
mutex_lock(&g_cache_lock);
mutex_lock(g_cache_lock);
e = &g_cache[path_];
@ -93,7 +93,7 @@ fs::statvfs_cache(const char *path_,
*st_ = e->st;
mutex_unlock(&g_cache_lock);
mutex_unlock(g_cache_lock);
return rv;
}

2
src/fuse_readdir_cor.cpp

@ -51,7 +51,7 @@ _concurrent_readdir(ThreadPool &tp_,
fuse_dirents_t *dirents_)
{
HashSet names;
std::mutex mutex;
Mutex mutex;
std::vector<std::future<int>> futures;
fuse_dirents_reset(dirents_);

6
src/fuse_readdir_cor_getdents.icpp

@ -6,6 +6,8 @@
#include "fs_open.hpp"
#include "fs_path.hpp"
#include "hashset.hpp"
#include "mutex.hpp"
#include "scope_guard.hpp"
#include "fuse_msgbuf.hpp"
@ -19,7 +21,7 @@ _readdir(const fs::path &branch_path_,
const fs::path &rel_dirpath_,
HashSet &names_,
fuse_dirents_t *dirents_,
std::mutex &mutex_)
mutex_t &mutex_)
{
int fd;
fuse_msgbuf_t *buf;
@ -45,7 +47,7 @@ _readdir(const fs::path &branch_path_,
if(nread <= 0)
return nread;
std::lock_guard<std::mutex> lk(mutex_);
LockGuard lk(mutex_);
for(ssize_t pos = 0; pos < nread;)
{
int rv;

4
src/fuse_readdir_cor_readdir.icpp

@ -36,7 +36,7 @@ _readdir(const fs::path &branch_path_,
const fs::path &rel_dirpath_,
HashSet &names_,
fuse_dirents_t *dirents_,
std::mutex &mutex_)
mutex_t &mutex_)
{
DIR *dir;
fs::path rel_filepath;
@ -53,7 +53,7 @@ _readdir(const fs::path &branch_path_,
rel_filepath = rel_dirpath_ / "dummy";
while(true)
{
std::lock_guard<std::mutex> lk(mutex_);
LockGuard lk(mutex_);
for(int i = 0; i < MAX_ENTRIES_PER_LOOP; i++)
{
int rv;

5
src/fuse_write.cpp

@ -26,6 +26,8 @@
#include "fs_pwriten.hpp"
#include "ioprio.hpp"
#include "scope_guard.hpp"
#include "fuse.h"
@ -174,7 +176,8 @@ _write(const fuse_file_info_t *ffi_,
// could change the move file behavior to use a known target file
// and have threads use O_EXCL and back off and wait for the
// transfer to complete before retrying.
std::lock_guard<std::mutex> guard(fi->mutex);
mutex_lock(fi->mutex);
DEFER { mutex_unlock(fi->mutex); };
if(fi->direct_io)
return ::_write_direct_io(buf_,count_,offset_,fi);

72
src/locked_fixed_mem_pool.hpp

@ -1,72 +0,0 @@
/*
ISC License
Copyright (c) 2020, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include "fixed_mem_pool.hpp"
#include "mutex.hpp"
#include <pthread.h>
template<size_t SIZE>
class LockedFixedMemPool
{
public:
LockedFixedMemPool()
{
mutex_init(&_mutex);
}
~LockedFixedMemPool()
{
mutex_destroy(&_mutex);
}
public:
void*
alloc(void)
{
void *mem;
mutex_lock(&_mutex);
mem = _fmp.alloc();
mutex_unlock(&_mutex);
return mem;
}
void
free(void *mem_)
{
mutex_lock(&_mutex);
_fmp.free(mem_);
mutex_unlock(&_mutex);
}
uint64_t
size(void)
{
return _fmp.size();
}
private:
FixedMemPool<SIZE> _fmp;
pthread_mutex_t _mutex;
};

21
src/mempools.cpp

@ -1,21 +0,0 @@
/*
ISC License
Copyright (c) 2020, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include "locked_fixed_mem_pool.hpp"
LockedFixedMemPool<128 * 1024> g_DENTS_BUF_POOL;

23
src/mempools.hpp

@ -1,23 +0,0 @@
/*
ISC License
Copyright (c) 2020, Antonio SJ Musumeci <trapexit@spawn.link>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include "locked_fixed_mem_pool.hpp"
extern LockedFixedMemPool<128 * 1024> g_DENTS_BUF_POOL;
Loading…
Cancel
Save