You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

544 lines
13 KiB

  1. #ifndef _GNU_SOURCE
  2. #define _GNU_SOURCE
  3. #endif
  4. #include "bounded_thread_pool.hpp"
  5. #include "cpu.hpp"
  6. #include "fmt/core.h"
  7. #include "scope_guard.hpp"
  8. #include "syslog.h"
  9. #include "fuse_i.h"
  10. #include "fuse_kernel.h"
  11. #include "fuse_lowlevel.h"
  12. #include "fuse_misc.h"
  13. #include "fuse_config.hpp"
  14. #include "fuse_msgbuf.hpp"
  15. #include "fuse_ll.hpp"
  16. #include <errno.h>
  17. #include <pthread.h>
  18. #include <semaphore.h>
  19. #include <signal.h>
  20. #include <stdio.h>
  21. #include <stdlib.h>
  22. #include <string.h>
  23. #include <sys/time.h>
  24. #include <unistd.h>
  25. #include <cassert>
  26. #include <vector>
  27. static
  28. bool
  29. retriable_receive_error(const int err_)
  30. {
  31. switch(err_)
  32. {
  33. case -EINTR:
  34. case -EAGAIN:
  35. case -ENOENT:
  36. return true;
  37. default:
  38. return false;
  39. }
  40. }
  41. static
  42. bool
  43. fatal_receive_error(const int err_)
  44. {
  45. return (err_ < 0);
  46. }
  47. static
  48. void
  49. handle_receive_error(const int rv_,
  50. fuse_msgbuf_t *msgbuf_)
  51. {
  52. msgbuf_free(msgbuf_);
  53. fmt::print(stderr,
  54. "mergerfs: error reading from /dev/fuse - {} ({})\n",
  55. strerror(-rv_),
  56. -rv_);
  57. }
  58. struct AsyncWorker
  59. {
  60. fuse_session *_se;
  61. sem_t *_finished;
  62. std::shared_ptr<BoundedThreadPool> _process_tp;
  63. AsyncWorker(fuse_session *se_,
  64. sem_t *finished_,
  65. std::shared_ptr<BoundedThreadPool> process_tp_)
  66. : _se(se_),
  67. _finished(finished_),
  68. _process_tp(process_tp_)
  69. {
  70. }
  71. inline
  72. void
  73. operator()() const
  74. {
  75. DEFER{ fuse_session_exit(_se); };
  76. DEFER{ sem_post(_finished); };
  77. while(!fuse_session_exited(_se))
  78. {
  79. int rv;
  80. fuse_msgbuf_t *msgbuf;
  81. msgbuf = msgbuf_alloc();
  82. do
  83. {
  84. pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
  85. rv = _se->receive_buf(_se,msgbuf);
  86. pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
  87. if(rv == 0)
  88. return;
  89. if(retriable_receive_error(rv))
  90. continue;
  91. if(fatal_receive_error(rv))
  92. return handle_receive_error(rv,msgbuf);
  93. } while(false);
  94. _process_tp->enqueue_work([=] {
  95. _se->process_buf(_se,msgbuf);
  96. msgbuf_free(msgbuf);
  97. });
  98. }
  99. }
  100. };
  101. struct SyncWorker
  102. {
  103. fuse_session *_se;
  104. sem_t *_finished;
  105. SyncWorker(fuse_session *se_,
  106. sem_t *finished_)
  107. : _se(se_),
  108. _finished(finished_)
  109. {
  110. }
  111. inline
  112. void
  113. operator()() const
  114. {
  115. DEFER{ fuse_session_exit(_se); };
  116. DEFER{ sem_post(_finished); };
  117. while(!fuse_session_exited(_se))
  118. {
  119. int rv;
  120. fuse_msgbuf_t *msgbuf;
  121. msgbuf = msgbuf_alloc();
  122. do
  123. {
  124. pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
  125. rv = _se->receive_buf(_se,msgbuf);
  126. pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
  127. if(rv == 0)
  128. return;
  129. if(retriable_receive_error(rv))
  130. continue;
  131. if(fatal_receive_error(rv))
  132. return handle_receive_error(rv,msgbuf);
  133. } while(false);
  134. _se->process_buf(_se,msgbuf);
  135. msgbuf_free(msgbuf);
  136. }
  137. }
  138. };
  139. int
  140. fuse_start_thread(pthread_t *thread_id,
  141. void *(*func)(void *),
  142. void *arg)
  143. {
  144. int res;
  145. sigset_t oldset;
  146. sigset_t newset;
  147. sigfillset(&newset);
  148. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  149. res = pthread_create(thread_id,NULL,func,arg);
  150. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  151. if(res != 0)
  152. {
  153. fprintf(stderr,
  154. "fuse: error creating thread: %s\n",
  155. strerror(res));
  156. return -1;
  157. }
  158. return 0;
  159. }
  160. static
  161. int
  162. calculate_thread_count(const int raw_thread_count_)
  163. {
  164. int thread_count;
  165. thread_count = 4;
  166. if(raw_thread_count_ == 0)
  167. thread_count = std::thread::hardware_concurrency();
  168. else if(raw_thread_count_ < 0)
  169. thread_count = (std::thread::hardware_concurrency() / -raw_thread_count_);
  170. else if(raw_thread_count_ > 0)
  171. thread_count = raw_thread_count_;
  172. if(thread_count <= 0)
  173. thread_count = 1;
  174. return thread_count;
  175. }
  176. static
  177. void
  178. calculate_thread_counts(int *read_thread_count_,
  179. int *process_thread_count_,
  180. int *process_thread_queue_depth_)
  181. {
  182. if((*read_thread_count_ == -1) && (*process_thread_count_ == -1))
  183. {
  184. int nproc;
  185. nproc = std::thread::hardware_concurrency();
  186. *read_thread_count_ = 2;
  187. *process_thread_count_ = std::max(2,(nproc - 2));
  188. }
  189. else
  190. {
  191. *read_thread_count_ = ::calculate_thread_count(*read_thread_count_);
  192. if(*process_thread_count_ != -1)
  193. *process_thread_count_ = ::calculate_thread_count(*process_thread_count_);
  194. }
  195. if(*process_thread_queue_depth_ <= 0)
  196. *process_thread_queue_depth_ = *process_thread_count_;
  197. }
  198. static
  199. void
  200. pin_threads_R1L(const CPU::ThreadIdVec read_threads_)
  201. {
  202. CPU::CPUVec cpus;
  203. cpus = CPU::cpus();
  204. if(cpus.empty())
  205. return;
  206. for(auto const thread_id : read_threads_)
  207. CPU::setaffinity(thread_id,cpus.front());
  208. }
  209. static
  210. void
  211. pin_threads_R1P(const CPU::ThreadIdVec read_threads_)
  212. {
  213. CPU::Core2CPUsMap core2cpus;
  214. core2cpus = CPU::core2cpus();
  215. if(core2cpus.empty())
  216. return;
  217. for(auto const thread_id : read_threads_)
  218. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  219. }
  220. static
  221. void
  222. pin_threads_RP1L(const CPU::ThreadIdVec read_threads_,
  223. const CPU::ThreadIdVec process_threads_)
  224. {
  225. CPU::CPUVec cpus;
  226. cpus = CPU::cpus();
  227. if(cpus.empty())
  228. return;
  229. for(auto const thread_id : read_threads_)
  230. CPU::setaffinity(thread_id,cpus.front());
  231. for(auto const thread_id : process_threads_)
  232. CPU::setaffinity(thread_id,cpus.front());
  233. }
  234. static
  235. void
  236. pin_threads_RP1P(const CPU::ThreadIdVec read_threads_,
  237. const CPU::ThreadIdVec process_threads_)
  238. {
  239. CPU::Core2CPUsMap core2cpus;
  240. core2cpus = CPU::core2cpus();
  241. if(core2cpus.empty())
  242. return;
  243. for(auto const thread_id : read_threads_)
  244. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  245. for(auto const thread_id : process_threads_)
  246. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  247. }
  248. static
  249. void
  250. pin_threads_R1LP1L(const std::vector<pthread_t> read_threads_,
  251. const std::vector<pthread_t> process_threads_)
  252. {
  253. CPU::CPUVec cpus;
  254. cpus = CPU::cpus();
  255. if(cpus.empty())
  256. return;
  257. for(auto const thread_id : read_threads_)
  258. CPU::setaffinity(thread_id,cpus.front());
  259. for(auto const thread_id : process_threads_)
  260. CPU::setaffinity(thread_id,cpus.back());
  261. }
  262. static
  263. void
  264. pin_threads_R1PP1P(const std::vector<pthread_t> read_threads_,
  265. const std::vector<pthread_t> process_threads_)
  266. {
  267. CPU::Core2CPUsMap core2cpus;
  268. core2cpus = CPU::core2cpus();
  269. if(core2cpus.empty())
  270. return;
  271. for(auto const thread_id : read_threads_)
  272. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  273. if(core2cpus.size() > 1)
  274. core2cpus.erase(core2cpus.begin());
  275. for(auto const thread_id : process_threads_)
  276. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  277. }
  278. static
  279. void
  280. pin_threads_RPSL(const std::vector<pthread_t> read_threads_,
  281. const std::vector<pthread_t> process_threads_)
  282. {
  283. CPU::CPUVec cpus;
  284. cpus = CPU::cpus();
  285. if(cpus.empty())
  286. return;
  287. for(auto const thread_id : read_threads_)
  288. {
  289. if(cpus.empty())
  290. cpus = CPU::cpus();
  291. CPU::setaffinity(thread_id,cpus.back());
  292. cpus.pop_back();
  293. }
  294. for(auto const thread_id : process_threads_)
  295. {
  296. if(cpus.empty())
  297. cpus = CPU::cpus();
  298. CPU::setaffinity(thread_id,cpus.back());
  299. cpus.pop_back();
  300. }
  301. }
  302. static
  303. void
  304. pin_threads_RPSP(const std::vector<pthread_t> read_threads_,
  305. const std::vector<pthread_t> process_threads_)
  306. {
  307. CPU::Core2CPUsMap core2cpus;
  308. core2cpus = CPU::core2cpus();
  309. if(core2cpus.empty())
  310. return;
  311. for(auto const thread_id : read_threads_)
  312. {
  313. if(core2cpus.empty())
  314. core2cpus = CPU::core2cpus();
  315. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  316. core2cpus.erase(core2cpus.begin());
  317. }
  318. for(auto const thread_id : process_threads_)
  319. {
  320. if(core2cpus.empty())
  321. core2cpus = CPU::core2cpus();
  322. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  323. core2cpus.erase(core2cpus.begin());
  324. }
  325. }
  326. static
  327. void
  328. pin_threads_R1PPSP(const std::vector<pthread_t> read_threads_,
  329. const std::vector<pthread_t> process_threads_)
  330. {
  331. CPU::Core2CPUsMap core2cpus;
  332. CPU::Core2CPUsMap leftover;
  333. core2cpus = CPU::core2cpus();
  334. if(core2cpus.empty())
  335. return;
  336. for(auto const thread_id : read_threads_)
  337. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  338. core2cpus.erase(core2cpus.begin());
  339. if(core2cpus.empty())
  340. core2cpus = CPU::core2cpus();
  341. leftover = core2cpus;
  342. for(auto const thread_id : process_threads_)
  343. {
  344. if(core2cpus.empty())
  345. core2cpus = leftover;
  346. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  347. core2cpus.erase(core2cpus.begin());
  348. }
  349. }
  350. static
  351. void
  352. pin_threads(const std::vector<pthread_t> read_threads_,
  353. const std::vector<pthread_t> process_threads_,
  354. const std::string type_)
  355. {
  356. if(type_.empty() || (type_ == "false"))
  357. return;
  358. if(type_ == "R1L")
  359. return ::pin_threads_R1L(read_threads_);
  360. if(type_ == "R1P")
  361. return ::pin_threads_R1P(read_threads_);
  362. if(type_ == "RP1L")
  363. return ::pin_threads_RP1L(read_threads_,process_threads_);
  364. if(type_ == "RP1P")
  365. return ::pin_threads_RP1P(read_threads_,process_threads_);
  366. if(type_ == "R1LP1L")
  367. return ::pin_threads_R1LP1L(read_threads_,process_threads_);
  368. if(type_ == "R1PP1P")
  369. return ::pin_threads_R1PP1P(read_threads_,process_threads_);
  370. if(type_ == "RPSL")
  371. return ::pin_threads_RPSL(read_threads_,process_threads_);
  372. if(type_ == "RPSP")
  373. return ::pin_threads_RPSP(read_threads_,process_threads_);
  374. if(type_ == "R1PPSP")
  375. return ::pin_threads_R1PPSP(read_threads_,process_threads_);
  376. syslog_warning("Invalid pin-threads value, ignoring: %s",type_.c_str());
  377. }
  378. static
  379. void
  380. wait(fuse_session *se_,
  381. sem_t *finished_sem_)
  382. {
  383. while(!fuse_session_exited(se_))
  384. sem_wait(finished_sem_);
  385. }
  386. int
  387. fuse_session_loop_mt(struct fuse_session *se_,
  388. const int raw_read_thread_count_,
  389. const int raw_process_thread_count_,
  390. const int raw_process_thread_queue_depth_,
  391. const std::string pin_threads_type_)
  392. {
  393. sem_t finished;
  394. int read_thread_count;
  395. int process_thread_count;
  396. int process_thread_queue_depth;
  397. std::vector<pthread_t> read_threads;
  398. std::vector<pthread_t> process_threads;
  399. std::unique_ptr<BoundedThreadPool> read_tp;
  400. std::shared_ptr<BoundedThreadPool> process_tp;
  401. sem_init(&finished,0,0);
  402. read_thread_count = raw_read_thread_count_;
  403. process_thread_count = raw_process_thread_count_;
  404. process_thread_queue_depth = raw_process_thread_queue_depth_;
  405. ::calculate_thread_counts(&read_thread_count,
  406. &process_thread_count,
  407. &process_thread_queue_depth);
  408. if(process_thread_count > 0)
  409. process_tp = std::make_shared<BoundedThreadPool>(process_thread_count,
  410. process_thread_queue_depth,
  411. "fuse.process");
  412. read_tp = std::make_unique<BoundedThreadPool>(read_thread_count,1,"fuse.read");
  413. if(process_tp)
  414. {
  415. for(auto i = 0; i < read_thread_count; i++)
  416. read_tp->enqueue_work(AsyncWorker(se_,&finished,process_tp));
  417. }
  418. else
  419. {
  420. for(auto i = 0; i < read_thread_count; i++)
  421. read_tp->enqueue_work(SyncWorker(se_,&finished));
  422. }
  423. if(read_tp)
  424. read_threads = read_tp->threads();
  425. if(process_tp)
  426. process_threads = process_tp->threads();
  427. ::pin_threads(read_threads,process_threads,pin_threads_type_);
  428. syslog_info("read-thread-count=%d; "
  429. "process-thread-count=%d; "
  430. "process-thread-queue-depth=%d; "
  431. "pin-threads=%s;"
  432. ,
  433. read_thread_count,
  434. process_thread_count,
  435. process_thread_queue_depth,
  436. pin_threads_type_.c_str());
  437. ::wait(se_,&finished);
  438. sem_destroy(&finished);
  439. return 0;
  440. }
  441. int
  442. fuse_loop_mt(struct fuse *f)
  443. {
  444. if(f == NULL)
  445. return -1;
  446. int res = fuse_start_maintenance_thread(f);
  447. if(res)
  448. return -1;
  449. res = fuse_session_loop_mt(fuse_get_session(f),
  450. fuse_config_get_read_thread_count(),
  451. fuse_config_get_process_thread_count(),
  452. fuse_config_get_process_thread_queue_depth(),
  453. fuse_config_get_pin_threads());
  454. fuse_stop_maintenance_thread(f);
  455. return res;
  456. }