You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

512 lines
12 KiB

  1. #ifndef _GNU_SOURCE
  2. #define _GNU_SOURCE
  3. #endif
  4. #include "bounded_thread_pool.hpp"
  5. #include "cpu.hpp"
  6. #include "fmt/core.h"
  7. #include "scope_guard.hpp"
  8. #include "syslog.h"
  9. #include "fuse_i.h"
  10. #include "fuse_kernel.h"
  11. #include "fuse_lowlevel.h"
  12. #include "fuse_misc.h"
  13. #include "fuse_msgbuf.hpp"
  14. #include "fuse_ll.hpp"
  15. #include <errno.h>
  16. #include <pthread.h>
  17. #include <semaphore.h>
  18. #include <signal.h>
  19. #include <stdio.h>
  20. #include <stdlib.h>
  21. #include <string.h>
  22. #include <sys/time.h>
  23. #include <unistd.h>
  24. #include <cassert>
  25. #include <vector>
  26. static
  27. bool
  28. retriable_receive_error(const int err_)
  29. {
  30. switch(err_)
  31. {
  32. case -EINTR:
  33. case -EAGAIN:
  34. case -ENOENT:
  35. return true;
  36. default:
  37. return false;
  38. }
  39. }
  40. static
  41. bool
  42. fatal_receive_error(const int err_)
  43. {
  44. return (err_ < 0);
  45. }
  46. static
  47. void
  48. handle_receive_error(const int rv_,
  49. fuse_msgbuf_t *msgbuf_)
  50. {
  51. msgbuf_free(msgbuf_);
  52. fmt::print(stderr,
  53. "mergerfs: error reading from /dev/fuse - {} ({})\n",
  54. strerror(-rv_),
  55. -rv_);
  56. }
  57. struct AsyncWorker
  58. {
  59. fuse_session *_se;
  60. sem_t *_finished;
  61. std::shared_ptr<BoundedThreadPool> _process_tp;
  62. AsyncWorker(fuse_session *se_,
  63. sem_t *finished_,
  64. std::shared_ptr<BoundedThreadPool> process_tp_)
  65. : _se(se_),
  66. _finished(finished_),
  67. _process_tp(process_tp_)
  68. {
  69. }
  70. inline
  71. void
  72. operator()() const
  73. {
  74. DEFER{ fuse_session_exit(_se); };
  75. DEFER{ sem_post(_finished); };
  76. while(!fuse_session_exited(_se))
  77. {
  78. int rv;
  79. fuse_msgbuf_t *msgbuf;
  80. msgbuf = msgbuf_alloc();
  81. do
  82. {
  83. pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
  84. rv = _se->receive_buf(_se,msgbuf);
  85. pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
  86. if(rv == 0)
  87. return;
  88. if(retriable_receive_error(rv))
  89. continue;
  90. if(fatal_receive_error(rv))
  91. return handle_receive_error(rv,msgbuf);
  92. } while(false);
  93. _process_tp->enqueue_work([=] {
  94. _se->process_buf(_se,msgbuf);
  95. msgbuf_free(msgbuf);
  96. });
  97. }
  98. }
  99. };
  100. struct SyncWorker
  101. {
  102. fuse_session *_se;
  103. sem_t *_finished;
  104. SyncWorker(fuse_session *se_,
  105. sem_t *finished_)
  106. : _se(se_),
  107. _finished(finished_)
  108. {
  109. }
  110. inline
  111. void
  112. operator()() const
  113. {
  114. DEFER{ fuse_session_exit(_se); };
  115. DEFER{ sem_post(_finished); };
  116. while(!fuse_session_exited(_se))
  117. {
  118. int rv;
  119. fuse_msgbuf_t *msgbuf;
  120. msgbuf = msgbuf_alloc();
  121. do
  122. {
  123. pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
  124. rv = _se->receive_buf(_se,msgbuf);
  125. pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
  126. if(rv == 0)
  127. return;
  128. if(retriable_receive_error(rv))
  129. continue;
  130. if(fatal_receive_error(rv))
  131. return handle_receive_error(rv,msgbuf);
  132. } while(false);
  133. _se->process_buf(_se,msgbuf);
  134. msgbuf_free(msgbuf);
  135. }
  136. }
  137. };
  138. int
  139. fuse_start_thread(pthread_t *thread_id,
  140. void *(*func)(void *),
  141. void *arg)
  142. {
  143. int res;
  144. sigset_t oldset;
  145. sigset_t newset;
  146. sigfillset(&newset);
  147. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  148. res = pthread_create(thread_id,NULL,func,arg);
  149. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  150. if(res != 0)
  151. {
  152. fprintf(stderr,
  153. "fuse: error creating thread: %s\n",
  154. strerror(res));
  155. return -1;
  156. }
  157. return 0;
  158. }
  159. static
  160. int
  161. calculate_thread_count(const int raw_thread_count_)
  162. {
  163. int thread_count;
  164. thread_count = 4;
  165. if(raw_thread_count_ == 0)
  166. thread_count = std::thread::hardware_concurrency();
  167. else if(raw_thread_count_ < 0)
  168. thread_count = (std::thread::hardware_concurrency() / -raw_thread_count_);
  169. else if(raw_thread_count_ > 0)
  170. thread_count = raw_thread_count_;
  171. if(thread_count <= 0)
  172. thread_count = 1;
  173. return thread_count;
  174. }
  175. static
  176. void
  177. calculate_thread_counts(int *read_thread_count_,
  178. int *process_thread_count_,
  179. int *process_thread_queue_depth_)
  180. {
  181. if((*read_thread_count_ == -1) && (*process_thread_count_ == -1))
  182. {
  183. int nproc;
  184. nproc = std::thread::hardware_concurrency();
  185. *read_thread_count_ = 2;
  186. *process_thread_count_ = std::max(2,(nproc - 2));
  187. }
  188. else
  189. {
  190. *read_thread_count_ = ::calculate_thread_count(*read_thread_count_);
  191. if(*process_thread_count_ != -1)
  192. *process_thread_count_ = ::calculate_thread_count(*process_thread_count_);
  193. }
  194. if(*process_thread_queue_depth_ <= 0)
  195. *process_thread_queue_depth_ = *process_thread_count_;
  196. }
  197. static
  198. void
  199. pin_threads_R1L(const CPU::ThreadIdVec read_threads_)
  200. {
  201. CPU::CPUVec cpus;
  202. cpus = CPU::cpus();
  203. if(cpus.empty())
  204. return;
  205. for(auto const thread_id : read_threads_)
  206. CPU::setaffinity(thread_id,cpus.front());
  207. }
  208. static
  209. void
  210. pin_threads_R1P(const CPU::ThreadIdVec read_threads_)
  211. {
  212. CPU::Core2CPUsMap core2cpus;
  213. core2cpus = CPU::core2cpus();
  214. if(core2cpus.empty())
  215. return;
  216. for(auto const thread_id : read_threads_)
  217. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  218. }
  219. static
  220. void
  221. pin_threads_RP1L(const CPU::ThreadIdVec read_threads_,
  222. const CPU::ThreadIdVec process_threads_)
  223. {
  224. CPU::CPUVec cpus;
  225. cpus = CPU::cpus();
  226. if(cpus.empty())
  227. return;
  228. for(auto const thread_id : read_threads_)
  229. CPU::setaffinity(thread_id,cpus.front());
  230. for(auto const thread_id : process_threads_)
  231. CPU::setaffinity(thread_id,cpus.front());
  232. }
  233. static
  234. void
  235. pin_threads_RP1P(const CPU::ThreadIdVec read_threads_,
  236. const CPU::ThreadIdVec process_threads_)
  237. {
  238. CPU::Core2CPUsMap core2cpus;
  239. core2cpus = CPU::core2cpus();
  240. if(core2cpus.empty())
  241. return;
  242. for(auto const thread_id : read_threads_)
  243. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  244. for(auto const thread_id : process_threads_)
  245. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  246. }
  247. static
  248. void
  249. pin_threads_R1LP1L(const std::vector<pthread_t> read_threads_,
  250. const std::vector<pthread_t> process_threads_)
  251. {
  252. CPU::CPUVec cpus;
  253. cpus = CPU::cpus();
  254. if(cpus.empty())
  255. return;
  256. for(auto const thread_id : read_threads_)
  257. CPU::setaffinity(thread_id,cpus.front());
  258. for(auto const thread_id : process_threads_)
  259. CPU::setaffinity(thread_id,cpus.back());
  260. }
  261. static
  262. void
  263. pin_threads_R1PP1P(const std::vector<pthread_t> read_threads_,
  264. const std::vector<pthread_t> process_threads_)
  265. {
  266. CPU::Core2CPUsMap core2cpus;
  267. core2cpus = CPU::core2cpus();
  268. if(core2cpus.empty())
  269. return;
  270. for(auto const thread_id : read_threads_)
  271. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  272. if(core2cpus.size() > 1)
  273. core2cpus.erase(core2cpus.begin());
  274. for(auto const thread_id : process_threads_)
  275. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  276. }
  277. static
  278. void
  279. pin_threads_RPSL(const std::vector<pthread_t> read_threads_,
  280. const std::vector<pthread_t> process_threads_)
  281. {
  282. CPU::CPUVec cpus;
  283. cpus = CPU::cpus();
  284. if(cpus.empty())
  285. return;
  286. for(auto const thread_id : read_threads_)
  287. {
  288. if(cpus.empty())
  289. cpus = CPU::cpus();
  290. CPU::setaffinity(thread_id,cpus.back());
  291. cpus.pop_back();
  292. }
  293. for(auto const thread_id : process_threads_)
  294. {
  295. if(cpus.empty())
  296. cpus = CPU::cpus();
  297. CPU::setaffinity(thread_id,cpus.back());
  298. cpus.pop_back();
  299. }
  300. }
  301. static
  302. void
  303. pin_threads_RPSP(const std::vector<pthread_t> read_threads_,
  304. const std::vector<pthread_t> process_threads_)
  305. {
  306. CPU::Core2CPUsMap core2cpus;
  307. core2cpus = CPU::core2cpus();
  308. if(core2cpus.empty())
  309. return;
  310. for(auto const thread_id : read_threads_)
  311. {
  312. if(core2cpus.empty())
  313. core2cpus = CPU::core2cpus();
  314. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  315. core2cpus.erase(core2cpus.begin());
  316. }
  317. for(auto const thread_id : process_threads_)
  318. {
  319. if(core2cpus.empty())
  320. core2cpus = CPU::core2cpus();
  321. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  322. core2cpus.erase(core2cpus.begin());
  323. }
  324. }
  325. static
  326. void
  327. pin_threads_R1PPSP(const std::vector<pthread_t> read_threads_,
  328. const std::vector<pthread_t> process_threads_)
  329. {
  330. CPU::Core2CPUsMap core2cpus;
  331. CPU::Core2CPUsMap leftover;
  332. core2cpus = CPU::core2cpus();
  333. if(core2cpus.empty())
  334. return;
  335. for(auto const thread_id : read_threads_)
  336. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  337. core2cpus.erase(core2cpus.begin());
  338. if(core2cpus.empty())
  339. core2cpus = CPU::core2cpus();
  340. leftover = core2cpus;
  341. for(auto const thread_id : process_threads_)
  342. {
  343. if(core2cpus.empty())
  344. core2cpus = leftover;
  345. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  346. core2cpus.erase(core2cpus.begin());
  347. }
  348. }
  349. static
  350. void
  351. pin_threads(const std::vector<pthread_t> read_threads_,
  352. const std::vector<pthread_t> process_threads_,
  353. const std::string type_)
  354. {
  355. if(type_ == "R1L")
  356. return ::pin_threads_R1L(read_threads_);
  357. if(type_ == "R1P")
  358. return ::pin_threads_R1P(read_threads_);
  359. if(type_ == "RP1L")
  360. return ::pin_threads_RP1L(read_threads_,process_threads_);
  361. if(type_ == "RP1P")
  362. return ::pin_threads_RP1P(read_threads_,process_threads_);
  363. if(type_ == "R1LP1L")
  364. return ::pin_threads_R1LP1L(read_threads_,process_threads_);
  365. if(type_ == "R1PP1P")
  366. return ::pin_threads_R1PP1P(read_threads_,process_threads_);
  367. if(type_ == "RPSL")
  368. return ::pin_threads_RPSL(read_threads_,process_threads_);
  369. if(type_ == "RPSP")
  370. return ::pin_threads_RPSP(read_threads_,process_threads_);
  371. if(type_ == "R1PPSP")
  372. return ::pin_threads_R1PPSP(read_threads_,process_threads_);
  373. }
  374. static
  375. void
  376. wait(fuse_session *se_,
  377. sem_t *finished_sem_)
  378. {
  379. while(!fuse_session_exited(se_))
  380. sem_wait(finished_sem_);
  381. }
  382. int
  383. fuse_session_loop_mt(struct fuse_session *se_,
  384. const int raw_read_thread_count_,
  385. const int raw_process_thread_count_,
  386. const int raw_process_thread_queue_depth_,
  387. const char *pin_threads_type_)
  388. {
  389. sem_t finished;
  390. int read_thread_count;
  391. int process_thread_count;
  392. int process_thread_queue_depth;
  393. std::vector<pthread_t> read_threads;
  394. std::vector<pthread_t> process_threads;
  395. std::unique_ptr<BoundedThreadPool> read_tp;
  396. std::shared_ptr<BoundedThreadPool> process_tp;
  397. sem_init(&finished,0,0);
  398. read_thread_count = raw_read_thread_count_;
  399. process_thread_count = raw_process_thread_count_;
  400. process_thread_queue_depth = raw_process_thread_queue_depth_;
  401. ::calculate_thread_counts(&read_thread_count,
  402. &process_thread_count,
  403. &process_thread_queue_depth);
  404. if(process_thread_count > 0)
  405. process_tp = std::make_shared<BoundedThreadPool>(process_thread_count,process_thread_queue_depth);
  406. read_tp = std::make_unique<BoundedThreadPool>(read_thread_count);
  407. if(process_tp)
  408. {
  409. for(auto i = 0; i < read_thread_count; i++)
  410. read_tp->enqueue_work(AsyncWorker(se_,&finished,process_tp));
  411. }
  412. else
  413. {
  414. for(auto i = 0; i < read_thread_count; i++)
  415. read_tp->enqueue_work(SyncWorker(se_,&finished));
  416. }
  417. if(read_tp)
  418. read_threads = read_tp->threads();
  419. if(process_tp)
  420. process_threads = process_tp->threads();
  421. if(pin_threads_type_ != nullptr)
  422. ::pin_threads(read_threads,process_threads,pin_threads_type_);
  423. syslog_info("read-thread-count=%d; process-thread-count=%d; process-thread-queue-depth=%d",
  424. read_thread_count,
  425. process_thread_count,
  426. process_thread_queue_depth);
  427. ::wait(se_,&finished);
  428. sem_destroy(&finished);
  429. return 0;
  430. }