You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

555 lines
13 KiB

  1. #ifndef _GNU_SOURCE
  2. #define _GNU_SOURCE
  3. #endif
  4. #include "cpu.hpp"
  5. #include "fmt/core.h"
  6. #include "make_unique.hpp"
  7. #include "scope_guard.hpp"
  8. #include "thread_pool.hpp"
  9. #include "fuse_i.h"
  10. #include "fuse_kernel.h"
  11. #include "fuse_lowlevel.h"
  12. #include "fuse_misc.h"
  13. #include "fuse_config.hpp"
  14. #include "fuse_msgbuf.hpp"
  15. #include "fuse_ll.hpp"
  16. #include <errno.h>
  17. #include <pthread.h>
  18. #include <semaphore.h>
  19. #include <signal.h>
  20. #include <stdio.h>
  21. #include <stdlib.h>
  22. #include <string.h>
  23. #include <sys/time.h>
  24. #include <syslog.h>
  25. #include <unistd.h>
  26. #include <cassert>
  27. #include <vector>
  28. static
  29. bool
  30. retriable_receive_error(const int err_)
  31. {
  32. switch(err_)
  33. {
  34. case -EINTR:
  35. case -EAGAIN:
  36. case -ENOENT:
  37. return true;
  38. default:
  39. return false;
  40. }
  41. }
  42. static
  43. bool
  44. fatal_receive_error(const int err_)
  45. {
  46. return (err_ < 0);
  47. }
  48. static
  49. void
  50. handle_receive_error(const int rv_,
  51. fuse_msgbuf_t *msgbuf_)
  52. {
  53. msgbuf_free(msgbuf_);
  54. fmt::print(stderr,
  55. "mergerfs: error reading from /dev/fuse - {} ({})\n",
  56. strerror(-rv_),
  57. -rv_);
  58. }
  59. struct AsyncWorker
  60. {
  61. fuse_session *_se;
  62. sem_t *_finished;
  63. std::shared_ptr<ThreadPool> _process_tp;
  64. AsyncWorker(fuse_session *se_,
  65. sem_t *finished_,
  66. std::shared_ptr<ThreadPool> process_tp_)
  67. : _se(se_),
  68. _finished(finished_),
  69. _process_tp(process_tp_)
  70. {
  71. }
  72. inline
  73. void
  74. operator()() const
  75. {
  76. DEFER{ fuse_session_exit(_se); };
  77. DEFER{ sem_post(_finished); };
  78. moodycamel::ProducerToken ptok(_process_tp->ptoken());
  79. while(!fuse_session_exited(_se))
  80. {
  81. int rv;
  82. fuse_msgbuf_t *msgbuf;
  83. msgbuf = msgbuf_alloc();
  84. do
  85. {
  86. pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
  87. rv = _se->receive_buf(_se,msgbuf);
  88. pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
  89. if(rv == 0)
  90. return;
  91. if(retriable_receive_error(rv))
  92. continue;
  93. if(fatal_receive_error(rv))
  94. return handle_receive_error(rv,msgbuf);
  95. } while(false);
  96. auto const func = [=]
  97. {
  98. _se->process_buf(_se,msgbuf);
  99. msgbuf_free(msgbuf);
  100. };
  101. _process_tp->enqueue_work(ptok,func);
  102. }
  103. }
  104. };
  105. struct SyncWorker
  106. {
  107. fuse_session *_se;
  108. sem_t *_finished;
  109. SyncWorker(fuse_session *se_,
  110. sem_t *finished_)
  111. : _se(se_),
  112. _finished(finished_)
  113. {
  114. }
  115. inline
  116. void
  117. operator()() const
  118. {
  119. DEFER{ fuse_session_exit(_se); };
  120. DEFER{ sem_post(_finished); };
  121. while(!fuse_session_exited(_se))
  122. {
  123. int rv;
  124. fuse_msgbuf_t *msgbuf;
  125. msgbuf = msgbuf_alloc();
  126. do
  127. {
  128. pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
  129. rv = _se->receive_buf(_se,msgbuf);
  130. pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
  131. if(rv == 0)
  132. return;
  133. if(retriable_receive_error(rv))
  134. continue;
  135. if(fatal_receive_error(rv))
  136. return handle_receive_error(rv,msgbuf);
  137. } while(false);
  138. _se->process_buf(_se,msgbuf);
  139. msgbuf_free(msgbuf);
  140. }
  141. }
  142. };
  143. int
  144. fuse_start_thread(pthread_t *thread_id,
  145. void *(*func)(void *),
  146. void *arg)
  147. {
  148. int res;
  149. sigset_t oldset;
  150. sigset_t newset;
  151. sigfillset(&newset);
  152. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  153. res = pthread_create(thread_id,NULL,func,arg);
  154. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  155. if(res != 0)
  156. {
  157. fprintf(stderr,
  158. "fuse: error creating thread: %s\n",
  159. strerror(res));
  160. return -1;
  161. }
  162. return 0;
  163. }
  164. static
  165. int
  166. calculate_thread_count(const int raw_thread_count_)
  167. {
  168. int thread_count;
  169. thread_count = 4;
  170. if(raw_thread_count_ == 0)
  171. thread_count = std::thread::hardware_concurrency();
  172. else if(raw_thread_count_ < 0)
  173. thread_count = (std::thread::hardware_concurrency() / -raw_thread_count_);
  174. else if(raw_thread_count_ > 0)
  175. thread_count = raw_thread_count_;
  176. if(thread_count <= 0)
  177. thread_count = 1;
  178. return thread_count;
  179. }
  180. static
  181. void
  182. calculate_thread_counts(int *read_thread_count_,
  183. int *process_thread_count_,
  184. int *process_thread_queue_depth_)
  185. {
  186. if((*read_thread_count_ == -1) && (*process_thread_count_ == -1))
  187. {
  188. int nproc;
  189. nproc = std::thread::hardware_concurrency();
  190. *read_thread_count_ = 2;
  191. *process_thread_count_ = std::max(2,(nproc - 2));
  192. }
  193. else
  194. {
  195. *read_thread_count_ = ::calculate_thread_count(*read_thread_count_);
  196. if(*process_thread_count_ != -1)
  197. *process_thread_count_ = ::calculate_thread_count(*process_thread_count_);
  198. }
  199. if(*process_thread_queue_depth_ <= 0)
  200. *process_thread_queue_depth_ = *process_thread_count_;
  201. }
  202. static
  203. void
  204. pin_threads_R1L(const CPU::ThreadIdVec read_threads_)
  205. {
  206. CPU::CPUVec cpus;
  207. cpus = CPU::cpus();
  208. if(cpus.empty())
  209. return;
  210. for(auto const thread_id : read_threads_)
  211. CPU::setaffinity(thread_id,cpus.front());
  212. }
  213. static
  214. void
  215. pin_threads_R1P(const CPU::ThreadIdVec read_threads_)
  216. {
  217. CPU::Core2CPUsMap core2cpus;
  218. core2cpus = CPU::core2cpus();
  219. if(core2cpus.empty())
  220. return;
  221. for(auto const thread_id : read_threads_)
  222. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  223. }
  224. static
  225. void
  226. pin_threads_RP1L(const CPU::ThreadIdVec read_threads_,
  227. const CPU::ThreadIdVec process_threads_)
  228. {
  229. CPU::CPUVec cpus;
  230. cpus = CPU::cpus();
  231. if(cpus.empty())
  232. return;
  233. for(auto const thread_id : read_threads_)
  234. CPU::setaffinity(thread_id,cpus.front());
  235. for(auto const thread_id : process_threads_)
  236. CPU::setaffinity(thread_id,cpus.front());
  237. }
  238. static
  239. void
  240. pin_threads_RP1P(const CPU::ThreadIdVec read_threads_,
  241. const CPU::ThreadIdVec process_threads_)
  242. {
  243. CPU::Core2CPUsMap core2cpus;
  244. core2cpus = CPU::core2cpus();
  245. if(core2cpus.empty())
  246. return;
  247. for(auto const thread_id : read_threads_)
  248. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  249. for(auto const thread_id : process_threads_)
  250. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  251. }
  252. static
  253. void
  254. pin_threads_R1LP1L(const std::vector<pthread_t> read_threads_,
  255. const std::vector<pthread_t> process_threads_)
  256. {
  257. CPU::CPUVec cpus;
  258. cpus = CPU::cpus();
  259. if(cpus.empty())
  260. return;
  261. for(auto const thread_id : read_threads_)
  262. CPU::setaffinity(thread_id,cpus.front());
  263. for(auto const thread_id : process_threads_)
  264. CPU::setaffinity(thread_id,cpus.back());
  265. }
  266. static
  267. void
  268. pin_threads_R1PP1P(const std::vector<pthread_t> read_threads_,
  269. const std::vector<pthread_t> process_threads_)
  270. {
  271. CPU::Core2CPUsMap core2cpus;
  272. core2cpus = CPU::core2cpus();
  273. if(core2cpus.empty())
  274. return;
  275. for(auto const thread_id : read_threads_)
  276. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  277. if(core2cpus.size() > 1)
  278. core2cpus.erase(core2cpus.begin());
  279. for(auto const thread_id : process_threads_)
  280. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  281. }
  282. static
  283. void
  284. pin_threads_RPSL(const std::vector<pthread_t> read_threads_,
  285. const std::vector<pthread_t> process_threads_)
  286. {
  287. CPU::CPUVec cpus;
  288. cpus = CPU::cpus();
  289. if(cpus.empty())
  290. return;
  291. for(auto const thread_id : read_threads_)
  292. {
  293. if(cpus.empty())
  294. cpus = CPU::cpus();
  295. CPU::setaffinity(thread_id,cpus.back());
  296. cpus.pop_back();
  297. }
  298. for(auto const thread_id : process_threads_)
  299. {
  300. if(cpus.empty())
  301. cpus = CPU::cpus();
  302. CPU::setaffinity(thread_id,cpus.back());
  303. cpus.pop_back();
  304. }
  305. }
  306. static
  307. void
  308. pin_threads_RPSP(const std::vector<pthread_t> read_threads_,
  309. const std::vector<pthread_t> process_threads_)
  310. {
  311. CPU::Core2CPUsMap core2cpus;
  312. core2cpus = CPU::core2cpus();
  313. if(core2cpus.empty())
  314. return;
  315. for(auto const thread_id : read_threads_)
  316. {
  317. if(core2cpus.empty())
  318. core2cpus = CPU::core2cpus();
  319. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  320. core2cpus.erase(core2cpus.begin());
  321. }
  322. for(auto const thread_id : process_threads_)
  323. {
  324. if(core2cpus.empty())
  325. core2cpus = CPU::core2cpus();
  326. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  327. core2cpus.erase(core2cpus.begin());
  328. }
  329. }
  330. static
  331. void
  332. pin_threads_R1PPSP(const std::vector<pthread_t> read_threads_,
  333. const std::vector<pthread_t> process_threads_)
  334. {
  335. CPU::Core2CPUsMap core2cpus;
  336. CPU::Core2CPUsMap leftover;
  337. core2cpus = CPU::core2cpus();
  338. if(core2cpus.empty())
  339. return;
  340. for(auto const thread_id : read_threads_)
  341. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  342. core2cpus.erase(core2cpus.begin());
  343. if(core2cpus.empty())
  344. core2cpus = CPU::core2cpus();
  345. leftover = core2cpus;
  346. for(auto const thread_id : process_threads_)
  347. {
  348. if(core2cpus.empty())
  349. core2cpus = leftover;
  350. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  351. core2cpus.erase(core2cpus.begin());
  352. }
  353. }
  354. static
  355. void
  356. pin_threads(const std::vector<pthread_t> read_threads_,
  357. const std::vector<pthread_t> process_threads_,
  358. const std::string type_)
  359. {
  360. if(type_.empty() || (type_ == "false"))
  361. return;
  362. if(type_ == "R1L")
  363. return ::pin_threads_R1L(read_threads_);
  364. if(type_ == "R1P")
  365. return ::pin_threads_R1P(read_threads_);
  366. if(type_ == "RP1L")
  367. return ::pin_threads_RP1L(read_threads_,process_threads_);
  368. if(type_ == "RP1P")
  369. return ::pin_threads_RP1P(read_threads_,process_threads_);
  370. if(type_ == "R1LP1L")
  371. return ::pin_threads_R1LP1L(read_threads_,process_threads_);
  372. if(type_ == "R1PP1P")
  373. return ::pin_threads_R1PP1P(read_threads_,process_threads_);
  374. if(type_ == "RPSL")
  375. return ::pin_threads_RPSL(read_threads_,process_threads_);
  376. if(type_ == "RPSP")
  377. return ::pin_threads_RPSP(read_threads_,process_threads_);
  378. if(type_ == "R1PPSP")
  379. return ::pin_threads_R1PPSP(read_threads_,process_threads_);
  380. syslog(LOG_WARNING,
  381. "Invalid pin-threads value, ignoring: %s",
  382. type_.c_str());
  383. }
  384. static
  385. void
  386. wait(fuse_session *se_,
  387. sem_t *finished_sem_)
  388. {
  389. while(!fuse_session_exited(se_))
  390. sem_wait(finished_sem_);
  391. }
  392. int
  393. fuse_session_loop_mt(struct fuse_session *se_,
  394. const int raw_read_thread_count_,
  395. const int raw_process_thread_count_,
  396. const int raw_process_thread_queue_depth_,
  397. const std::string pin_threads_type_)
  398. {
  399. sem_t finished;
  400. int read_thread_count;
  401. int process_thread_count;
  402. int process_thread_queue_depth;
  403. std::vector<pthread_t> read_threads;
  404. std::vector<pthread_t> process_threads;
  405. std::unique_ptr<ThreadPool> read_tp;
  406. std::shared_ptr<ThreadPool> process_tp;
  407. sem_init(&finished,0,0);
  408. read_thread_count = raw_read_thread_count_;
  409. process_thread_count = raw_process_thread_count_;
  410. process_thread_queue_depth = raw_process_thread_queue_depth_;
  411. ::calculate_thread_counts(&read_thread_count,
  412. &process_thread_count,
  413. &process_thread_queue_depth);
  414. if(process_thread_count > 0)
  415. process_tp = std::make_shared<ThreadPool>(process_thread_count,
  416. (process_thread_count *
  417. process_thread_queue_depth),
  418. "fuse.process");
  419. read_tp = std::make_unique<ThreadPool>(read_thread_count,
  420. read_thread_count,
  421. "fuse.read");
  422. if(process_tp)
  423. {
  424. for(auto i = 0; i < read_thread_count; i++)
  425. read_tp->enqueue_work(AsyncWorker(se_,&finished,process_tp));
  426. }
  427. else
  428. {
  429. for(auto i = 0; i < read_thread_count; i++)
  430. read_tp->enqueue_work(SyncWorker(se_,&finished));
  431. }
  432. if(read_tp)
  433. read_threads = read_tp->threads();
  434. if(process_tp)
  435. process_threads = process_tp->threads();
  436. ::pin_threads(read_threads,process_threads,pin_threads_type_);
  437. syslog(LOG_INFO,
  438. "read-thread-count=%d; "
  439. "process-thread-count=%d; "
  440. "process-thread-queue-depth=%d; "
  441. "pin-threads=%s;"
  442. ,
  443. read_thread_count,
  444. process_thread_count,
  445. process_thread_queue_depth,
  446. pin_threads_type_.c_str());
  447. ::wait(se_,&finished);
  448. sem_destroy(&finished);
  449. return 0;
  450. }
  451. int
  452. fuse_loop_mt(struct fuse *f)
  453. {
  454. if(f == NULL)
  455. return -1;
  456. int res = fuse_start_maintenance_thread(f);
  457. if(res)
  458. return -1;
  459. res = fuse_session_loop_mt(fuse_get_session(f),
  460. fuse_config_get_read_thread_count(),
  461. fuse_config_get_process_thread_count(),
  462. fuse_config_get_process_thread_queue_depth(),
  463. fuse_config_get_pin_threads());
  464. fuse_stop_maintenance_thread(f);
  465. return res;
  466. }