Browse Source
Merge pull request #1228 from trapexit/threading
Add thread names for easier debugging
pull/1231/head
trapexit
1 year ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with
46 additions and
21 deletions
-
libfuse/lib/bounded_thread_pool.hpp
-
libfuse/lib/fuse.c
-
libfuse/lib/fuse_loop.cpp
-
src/fuse_readdir_cor.cpp
-
src/fuse_readdir_cosr.cpp
-
src/unbounded_thread_pool.hpp
|
|
@ -3,17 +3,17 @@ |
|
|
|
#include "bounded_queue.hpp"
|
|
|
|
#include "make_unique.hpp"
|
|
|
|
|
|
|
|
#include <signal.h>
|
|
|
|
|
|
|
|
#include <tuple>
|
|
|
|
#include <atomic>
|
|
|
|
#include <vector>
|
|
|
|
#include <thread>
|
|
|
|
#include <memory>
|
|
|
|
#include <future>
|
|
|
|
#include <utility>
|
|
|
|
#include <csignal>
|
|
|
|
#include <functional>
|
|
|
|
#include <future>
|
|
|
|
#include <memory>
|
|
|
|
#include <string>
|
|
|
|
#include <thread>
|
|
|
|
#include <tuple>
|
|
|
|
#include <type_traits>
|
|
|
|
#include <utility>
|
|
|
|
#include <vector>
|
|
|
|
|
|
|
|
|
|
|
|
class BoundedThreadPool |
|
|
@ -25,8 +25,9 @@ private: |
|
|
|
|
|
|
|
public: |
|
|
|
explicit |
|
|
|
BoundedThreadPool(const std::size_t thread_count_ = std::thread::hardware_concurrency(), |
|
|
|
const std::size_t queue_depth_ = 1) |
|
|
|
BoundedThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(), |
|
|
|
std::size_t const queue_depth_ = 1, |
|
|
|
std::string const name_ = {}) |
|
|
|
: _queues(), |
|
|
|
_count(thread_count_) |
|
|
|
{ |
|
|
@ -61,6 +62,11 @@ public: |
|
|
|
_threads.reserve(thread_count_); |
|
|
|
for(std::size_t i = 0; i < thread_count_; ++i) |
|
|
|
_threads.emplace_back(worker, i); |
|
|
|
if(!name_.empty()) |
|
|
|
{ |
|
|
|
for(auto &t : _threads) |
|
|
|
pthread_setname_np(t.native_handle(),name_.c_str()); |
|
|
|
} |
|
|
|
|
|
|
|
pthread_sigmask(SIG_SETMASK,&oldset,NULL); |
|
|
|
} |
|
|
|
|
|
@ -3948,6 +3948,8 @@ fuse_maintenance_loop(void *fuse_) |
|
|
|
int sleep_time; |
|
|
|
struct fuse *f = (struct fuse*)fuse_; |
|
|
|
|
|
|
|
pthread_setname_np(pthread_self(),"fuse.maint"); |
|
|
|
|
|
|
|
loops = 0; |
|
|
|
sleep_time = 60; |
|
|
|
while(1) |
|
|
|
|
|
@ -482,9 +482,11 @@ fuse_session_loop_mt(struct fuse_session *se_, |
|
|
|
&process_thread_queue_depth); |
|
|
|
|
|
|
|
if(process_thread_count > 0) |
|
|
|
process_tp = std::make_shared<BoundedThreadPool>(process_thread_count,process_thread_queue_depth); |
|
|
|
process_tp = std::make_shared<BoundedThreadPool>(process_thread_count, |
|
|
|
process_thread_queue_depth, |
|
|
|
"fuse.process"); |
|
|
|
|
|
|
|
read_tp = std::make_unique<BoundedThreadPool>(read_thread_count); |
|
|
|
read_tp = std::make_unique<BoundedThreadPool>(read_thread_count,1,"fuse.read"); |
|
|
|
if(process_tp) |
|
|
|
{ |
|
|
|
for(auto i = 0; i < read_thread_count; i++) |
|
|
|
|
|
@ -32,7 +32,7 @@ |
|
|
|
|
|
|
|
|
|
|
|
FUSE::ReadDirCOR::ReadDirCOR(unsigned concurrency_) |
|
|
|
: _tp(concurrency_) |
|
|
|
: _tp(concurrency_,"readdir.cor") |
|
|
|
{ |
|
|
|
|
|
|
|
} |
|
|
|
|
|
@ -34,7 +34,7 @@ |
|
|
|
|
|
|
|
|
|
|
|
FUSE::ReadDirCOSR::ReadDirCOSR(unsigned concurrency_) |
|
|
|
: _tp(concurrency_) |
|
|
|
: _tp(concurrency_,"readdir.cosr") |
|
|
|
{ |
|
|
|
|
|
|
|
} |
|
|
|
|
|
@ -2,22 +2,24 @@ |
|
|
|
|
|
|
|
#include "unbounded_queue.hpp"
|
|
|
|
|
|
|
|
#include <tuple>
|
|
|
|
#include <atomic>
|
|
|
|
#include <vector>
|
|
|
|
#include <thread>
|
|
|
|
#include <memory>
|
|
|
|
#include <future>
|
|
|
|
#include <utility>
|
|
|
|
#include <csignal>
|
|
|
|
#include <functional>
|
|
|
|
#include <future>
|
|
|
|
#include <memory>
|
|
|
|
#include <thread>
|
|
|
|
#include <tuple>
|
|
|
|
#include <type_traits>
|
|
|
|
#include <utility>
|
|
|
|
#include <vector>
|
|
|
|
|
|
|
|
|
|
|
|
class ThreadPool |
|
|
|
{ |
|
|
|
public: |
|
|
|
explicit |
|
|
|
ThreadPool(const std::size_t thread_count_ = std::thread::hardware_concurrency()) |
|
|
|
ThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(), |
|
|
|
std::string const name_ = {}) |
|
|
|
: _queues(thread_count_), |
|
|
|
_count(thread_count_) |
|
|
|
{ |
|
|
@ -40,9 +42,22 @@ public: |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
sigset_t oldset; |
|
|
|
sigset_t newset; |
|
|
|
|
|
|
|
sigfillset(&newset); |
|
|
|
pthread_sigmask(SIG_BLOCK,&newset,&oldset); |
|
|
|
|
|
|
|
_threads.reserve(thread_count_); |
|
|
|
for(std::size_t i = 0; i < thread_count_; ++i) |
|
|
|
_threads.emplace_back(worker, i); |
|
|
|
if(!name_.empty()) |
|
|
|
{ |
|
|
|
for(auto &t : _threads) |
|
|
|
pthread_setname_np(t.native_handle(),name_.c_str()); |
|
|
|
} |
|
|
|
|
|
|
|
pthread_sigmask(SIG_SETMASK,&oldset,NULL); |
|
|
|
} |
|
|
|
|
|
|
|
~ThreadPool() |
|
|
|