You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

487 lines
11 KiB

  1. #ifndef _GNU_SOURCE
  2. #define _GNU_SOURCE
  3. #endif
  4. #include "thread_pool.hpp"
  5. #include "cpu.hpp"
  6. #include "fmt/core.h"
  7. #include "fuse_i.h"
  8. #include "fuse_kernel.h"
  9. #include "fuse_lowlevel.h"
  10. #include "fuse_misc.h"
  11. #include "fuse_msgbuf.hpp"
  12. #include "fuse_ll.hpp"
  13. #include <errno.h>
  14. #include <pthread.h>
  15. #include <semaphore.h>
  16. #include <signal.h>
  17. #include <stdio.h>
  18. #include <stdlib.h>
  19. #include <string.h>
  20. #include <sys/time.h>
  21. #include <unistd.h>
  22. #include <cassert>
  23. #include <vector>
  24. struct fuse_worker_data_t
  25. {
  26. struct fuse_session *se;
  27. sem_t finished;
  28. std::function<void(fuse_worker_data_t*,fuse_msgbuf_t*)> msgbuf_processor;
  29. std::shared_ptr<ThreadPool> tp;
  30. };
  31. class WorkerCleanup
  32. {
  33. public:
  34. WorkerCleanup(fuse_worker_data_t *wd_)
  35. : _wd(wd_)
  36. {
  37. }
  38. ~WorkerCleanup()
  39. {
  40. fuse_session_exit(_wd->se);
  41. sem_post(&_wd->finished);
  42. }
  43. private:
  44. fuse_worker_data_t *_wd;
  45. };
  46. static
  47. bool
  48. retriable_receive_error(const int err_)
  49. {
  50. switch(err_)
  51. {
  52. case -EINTR:
  53. case -EAGAIN:
  54. case -ENOENT:
  55. return true;
  56. default:
  57. return false;
  58. }
  59. }
  60. static
  61. bool
  62. fatal_receive_error(const int err_)
  63. {
  64. return (err_ < 0);
  65. }
  66. static
  67. void*
  68. handle_receive_error(const int rv_,
  69. fuse_msgbuf_t *msgbuf_)
  70. {
  71. msgbuf_free(msgbuf_);
  72. fprintf(stderr,
  73. "mergerfs: error reading from /dev/fuse - %s (%d)\n",
  74. strerror(-rv_),
  75. -rv_);
  76. return NULL;
  77. }
  78. static
  79. void*
  80. fuse_do_work(void *data)
  81. {
  82. fuse_worker_data_t *wd = (fuse_worker_data_t*)data;
  83. fuse_session *se = wd->se;
  84. auto &process_msgbuf = wd->msgbuf_processor;
  85. WorkerCleanup workercleanup(wd);
  86. while(!fuse_session_exited(se))
  87. {
  88. int rv;
  89. fuse_msgbuf_t *msgbuf;
  90. msgbuf = msgbuf_alloc();
  91. do
  92. {
  93. pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
  94. rv = se->receive_buf(se,msgbuf);
  95. pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
  96. if(rv == 0)
  97. return NULL;
  98. if(retriable_receive_error(rv))
  99. continue;
  100. if(fatal_receive_error(rv))
  101. return handle_receive_error(rv,msgbuf);
  102. } while(false);
  103. process_msgbuf(wd,msgbuf);
  104. }
  105. return NULL;
  106. }
  107. int
  108. fuse_start_thread(pthread_t *thread_id,
  109. void *(*func)(void *),
  110. void *arg)
  111. {
  112. int res;
  113. sigset_t oldset;
  114. sigset_t newset;
  115. sigfillset(&newset);
  116. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  117. res = pthread_create(thread_id,NULL,func,arg);
  118. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  119. if(res != 0)
  120. {
  121. fprintf(stderr,
  122. "fuse: error creating thread: %s\n",
  123. strerror(res));
  124. return -1;
  125. }
  126. return 0;
  127. }
  128. static
  129. int
  130. calculate_thread_count(const int raw_thread_count_)
  131. {
  132. int thread_count;
  133. thread_count = 4;
  134. if(raw_thread_count_ == 0)
  135. thread_count = std::thread::hardware_concurrency();
  136. else if(raw_thread_count_ < 0)
  137. thread_count = (std::thread::hardware_concurrency() / -raw_thread_count_);
  138. else if(raw_thread_count_ > 0)
  139. thread_count = raw_thread_count_;
  140. if(thread_count <= 0)
  141. thread_count = 1;
  142. return thread_count;
  143. }
  144. static
  145. void
  146. calculate_thread_counts(int *read_thread_count_,
  147. int *process_thread_count_)
  148. {
  149. if((*read_thread_count_ == -1) && (*process_thread_count_ == -1))
  150. {
  151. int nproc;
  152. nproc = std::thread::hardware_concurrency();
  153. *read_thread_count_ = 2;
  154. *process_thread_count_ = std::max(2,(nproc - 2));
  155. }
  156. else
  157. {
  158. *read_thread_count_ = ::calculate_thread_count(*read_thread_count_);
  159. if(*process_thread_count_ != -1)
  160. *process_thread_count_ = ::calculate_thread_count(*process_thread_count_);
  161. }
  162. }
  163. static
  164. void
  165. process_msgbuf_sync(fuse_worker_data_t *wd_,
  166. fuse_msgbuf_t *msgbuf_)
  167. {
  168. wd_->se->process_buf(wd_->se,msgbuf_);
  169. msgbuf_free(msgbuf_);
  170. }
  171. static
  172. void
  173. process_msgbuf_async(fuse_worker_data_t *wd_,
  174. fuse_msgbuf_t *msgbuf_)
  175. {
  176. const auto func = [=] {
  177. process_msgbuf_sync(wd_,msgbuf_);
  178. };
  179. wd_->tp->enqueue_work(func);
  180. }
  181. static
  182. void
  183. pin_threads_R1L(const CPU::ThreadIdVec read_threads_)
  184. {
  185. CPU::CPUVec cpus;
  186. cpus = CPU::cpus();
  187. if(cpus.empty())
  188. return;
  189. for(auto const thread_id : read_threads_)
  190. CPU::setaffinity(thread_id,cpus.front());
  191. }
  192. static
  193. void
  194. pin_threads_R1P(const CPU::ThreadIdVec read_threads_)
  195. {
  196. CPU::Core2CPUsMap core2cpus;
  197. core2cpus = CPU::core2cpus();
  198. if(core2cpus.empty())
  199. return;
  200. for(auto const thread_id : read_threads_)
  201. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  202. }
  203. static
  204. void
  205. pin_threads_RP1L(const CPU::ThreadIdVec read_threads_,
  206. const CPU::ThreadIdVec process_threads_)
  207. {
  208. CPU::CPUVec cpus;
  209. cpus = CPU::cpus();
  210. if(cpus.empty())
  211. return;
  212. for(auto const thread_id : read_threads_)
  213. CPU::setaffinity(thread_id,cpus.front());
  214. for(auto const thread_id : process_threads_)
  215. CPU::setaffinity(thread_id,cpus.front());
  216. }
  217. static
  218. void
  219. pin_threads_RP1P(const CPU::ThreadIdVec read_threads_,
  220. const CPU::ThreadIdVec process_threads_)
  221. {
  222. CPU::Core2CPUsMap core2cpus;
  223. core2cpus = CPU::core2cpus();
  224. if(core2cpus.empty())
  225. return;
  226. for(auto const thread_id : read_threads_)
  227. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  228. for(auto const thread_id : process_threads_)
  229. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  230. }
  231. static
  232. void
  233. pin_threads_R1LP1L(const std::vector<pthread_t> read_threads_,
  234. const std::vector<pthread_t> process_threads_)
  235. {
  236. CPU::CPUVec cpus;
  237. cpus = CPU::cpus();
  238. if(cpus.empty())
  239. return;
  240. for(auto const thread_id : read_threads_)
  241. CPU::setaffinity(thread_id,cpus.front());
  242. for(auto const thread_id : process_threads_)
  243. CPU::setaffinity(thread_id,cpus.back());
  244. }
  245. static
  246. void
  247. pin_threads_R1PP1P(const std::vector<pthread_t> read_threads_,
  248. const std::vector<pthread_t> process_threads_)
  249. {
  250. CPU::Core2CPUsMap core2cpus;
  251. core2cpus = CPU::core2cpus();
  252. if(core2cpus.empty())
  253. return;
  254. for(auto const thread_id : read_threads_)
  255. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  256. if(core2cpus.size() > 1)
  257. core2cpus.erase(core2cpus.begin());
  258. for(auto const thread_id : process_threads_)
  259. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  260. }
  261. static
  262. void
  263. pin_threads_RPSL(const std::vector<pthread_t> read_threads_,
  264. const std::vector<pthread_t> process_threads_)
  265. {
  266. CPU::CPUVec cpus;
  267. cpus = CPU::cpus();
  268. if(cpus.empty())
  269. return;
  270. for(auto const thread_id : read_threads_)
  271. {
  272. if(cpus.empty())
  273. cpus = CPU::cpus();
  274. CPU::setaffinity(thread_id,cpus.back());
  275. cpus.pop_back();
  276. }
  277. for(auto const thread_id : process_threads_)
  278. {
  279. if(cpus.empty())
  280. cpus = CPU::cpus();
  281. CPU::setaffinity(thread_id,cpus.back());
  282. cpus.pop_back();
  283. }
  284. }
  285. static
  286. void
  287. pin_threads_RPSP(const std::vector<pthread_t> read_threads_,
  288. const std::vector<pthread_t> process_threads_)
  289. {
  290. CPU::Core2CPUsMap core2cpus;
  291. core2cpus = CPU::core2cpus();
  292. if(core2cpus.empty())
  293. return;
  294. for(auto const thread_id : read_threads_)
  295. {
  296. if(core2cpus.empty())
  297. core2cpus = CPU::core2cpus();
  298. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  299. core2cpus.erase(core2cpus.begin());
  300. }
  301. for(auto const thread_id : process_threads_)
  302. {
  303. if(core2cpus.empty())
  304. core2cpus = CPU::core2cpus();
  305. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  306. core2cpus.erase(core2cpus.begin());
  307. }
  308. }
  309. static
  310. void
  311. pin_threads_R1PPSP(const std::vector<pthread_t> read_threads_,
  312. const std::vector<pthread_t> process_threads_)
  313. {
  314. CPU::Core2CPUsMap core2cpus;
  315. CPU::Core2CPUsMap leftover;
  316. core2cpus = CPU::core2cpus();
  317. if(core2cpus.empty())
  318. return;
  319. for(auto const thread_id : read_threads_)
  320. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  321. core2cpus.erase(core2cpus.begin());
  322. if(core2cpus.empty())
  323. core2cpus = CPU::core2cpus();
  324. leftover = core2cpus;
  325. for(auto const thread_id : process_threads_)
  326. {
  327. if(core2cpus.empty())
  328. core2cpus = leftover;
  329. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  330. core2cpus.erase(core2cpus.begin());
  331. }
  332. }
  333. static
  334. void
  335. pin_threads(const std::vector<pthread_t> read_threads_,
  336. const std::vector<pthread_t> process_threads_,
  337. const std::string type_)
  338. {
  339. if(type_ == "R1L")
  340. return ::pin_threads_R1L(read_threads_);
  341. if(type_ == "R1P")
  342. return ::pin_threads_R1P(read_threads_);
  343. if(type_ == "RP1L")
  344. return ::pin_threads_RP1L(read_threads_,process_threads_);
  345. if(type_ == "RP1P")
  346. return ::pin_threads_RP1P(read_threads_,process_threads_);
  347. if(type_ == "R1LP1L")
  348. return ::pin_threads_R1LP1L(read_threads_,process_threads_);
  349. if(type_ == "R1PP1P")
  350. return ::pin_threads_R1PP1P(read_threads_,process_threads_);
  351. if(type_ == "RPSL")
  352. return ::pin_threads_RPSL(read_threads_,process_threads_);
  353. if(type_ == "RPSP")
  354. return ::pin_threads_RPSP(read_threads_,process_threads_);
  355. if(type_ == "R1PPSP")
  356. return ::pin_threads_R1PPSP(read_threads_,process_threads_);
  357. }
  358. int
  359. fuse_session_loop_mt(struct fuse_session *se_,
  360. const int raw_read_thread_count_,
  361. const int raw_process_thread_count_,
  362. const char *pin_threads_type_)
  363. {
  364. int err;
  365. int read_thread_count;
  366. int process_thread_count;
  367. fuse_worker_data_t wd = {0};
  368. std::vector<pthread_t> read_threads;
  369. std::vector<pthread_t> process_threads;
  370. read_thread_count = raw_read_thread_count_;
  371. process_thread_count = raw_process_thread_count_;
  372. ::calculate_thread_counts(&read_thread_count,&process_thread_count);
  373. if(process_thread_count > 0)
  374. {
  375. wd.tp = std::make_shared<ThreadPool>(process_thread_count);
  376. wd.msgbuf_processor = process_msgbuf_async;
  377. process_threads = wd.tp->threads();
  378. }
  379. else
  380. {
  381. wd.msgbuf_processor = process_msgbuf_sync;
  382. }
  383. wd.se = se_;
  384. sem_init(&wd.finished,0,0);
  385. err = 0;
  386. for(int i = 0; i < read_thread_count; i++)
  387. {
  388. pthread_t thread_id;
  389. err = fuse_start_thread(&thread_id,fuse_do_work,&wd);
  390. assert(err == 0);
  391. read_threads.push_back(thread_id);
  392. }
  393. if(pin_threads_type_ != NULL)
  394. ::pin_threads(read_threads,process_threads,pin_threads_type_);
  395. if(!err)
  396. {
  397. /* sem_wait() is interruptible */
  398. while(!fuse_session_exited(se_))
  399. sem_wait(&wd.finished);
  400. for(const auto &thread_id : read_threads)
  401. pthread_cancel(thread_id);
  402. for(const auto &thread_id : read_threads)
  403. pthread_join(thread_id,NULL);
  404. }
  405. sem_destroy(&wd.finished);
  406. return err;
  407. }