You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

491 lines
11 KiB

  1. #ifndef _GNU_SOURCE
  2. #define _GNU_SOURCE
  3. #endif
  4. #include "thread_pool.hpp"
  5. #include "cpu.hpp"
  6. #include "fmt/core.h"
  7. #include "fuse_i.h"
  8. #include "fuse_kernel.h"
  9. #include "fuse_lowlevel.h"
  10. #include "fuse_misc.h"
  11. #include "fuse_msgbuf.hpp"
  12. #include "fuse_ll.hpp"
  13. #include <errno.h>
  14. #include <pthread.h>
  15. #include <semaphore.h>
  16. #include <signal.h>
  17. #include <stdio.h>
  18. #include <stdlib.h>
  19. #include <string.h>
  20. #include <sys/time.h>
  21. #include <unistd.h>
  22. #include <cassert>
  23. #include <vector>
  24. struct fuse_worker_data_t
  25. {
  26. struct fuse_session *se;
  27. sem_t finished;
  28. std::function<void(fuse_worker_data_t*,fuse_msgbuf_t*)> msgbuf_processor;
  29. std::function<fuse_msgbuf_t*(void)> msgbuf_allocator;
  30. std::shared_ptr<ThreadPool> tp;
  31. };
  32. class WorkerCleanup
  33. {
  34. public:
  35. WorkerCleanup(fuse_worker_data_t *wd_)
  36. : _wd(wd_)
  37. {
  38. }
  39. ~WorkerCleanup()
  40. {
  41. fuse_session_exit(_wd->se);
  42. sem_post(&_wd->finished);
  43. }
  44. private:
  45. fuse_worker_data_t *_wd;
  46. };
  47. static
  48. bool
  49. retriable_receive_error(const int err_)
  50. {
  51. switch(err_)
  52. {
  53. case -EINTR:
  54. case -EAGAIN:
  55. case -ENOENT:
  56. return true;
  57. default:
  58. return false;
  59. }
  60. }
  61. static
  62. bool
  63. fatal_receive_error(const int err_)
  64. {
  65. return (err_ < 0);
  66. }
  67. static
  68. void*
  69. handle_receive_error(const int rv_,
  70. fuse_msgbuf_t *msgbuf_)
  71. {
  72. msgbuf_free(msgbuf_);
  73. fprintf(stderr,
  74. "mergerfs: error reading from /dev/fuse - %s (%d)\n",
  75. strerror(-rv_),
  76. -rv_);
  77. return NULL;
  78. }
  79. static
  80. void*
  81. fuse_do_work(void *data)
  82. {
  83. fuse_worker_data_t *wd = (fuse_worker_data_t*)data;
  84. fuse_session *se = wd->se;
  85. auto &process_msgbuf = wd->msgbuf_processor;
  86. auto &msgbuf_allocator = wd->msgbuf_allocator;
  87. WorkerCleanup workercleanup(wd);
  88. while(!fuse_session_exited(se))
  89. {
  90. int rv;
  91. fuse_msgbuf_t *msgbuf;
  92. msgbuf = msgbuf_allocator();
  93. do
  94. {
  95. pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
  96. rv = se->receive_buf(se,msgbuf);
  97. pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
  98. if(rv == 0)
  99. return NULL;
  100. if(retriable_receive_error(rv))
  101. continue;
  102. if(fatal_receive_error(rv))
  103. return handle_receive_error(rv,msgbuf);
  104. } while(false);
  105. process_msgbuf(wd,msgbuf);
  106. }
  107. return NULL;
  108. }
  109. int
  110. fuse_start_thread(pthread_t *thread_id,
  111. void *(*func)(void *),
  112. void *arg)
  113. {
  114. int res;
  115. sigset_t oldset;
  116. sigset_t newset;
  117. sigfillset(&newset);
  118. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  119. res = pthread_create(thread_id,NULL,func,arg);
  120. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  121. if(res != 0)
  122. {
  123. fprintf(stderr,
  124. "fuse: error creating thread: %s\n",
  125. strerror(res));
  126. return -1;
  127. }
  128. return 0;
  129. }
  130. static
  131. int
  132. calculate_thread_count(const int raw_thread_count_)
  133. {
  134. int thread_count;
  135. thread_count = 4;
  136. if(raw_thread_count_ == 0)
  137. thread_count = std::thread::hardware_concurrency();
  138. else if(raw_thread_count_ < 0)
  139. thread_count = (std::thread::hardware_concurrency() / -raw_thread_count_);
  140. else if(raw_thread_count_ > 0)
  141. thread_count = raw_thread_count_;
  142. if(thread_count <= 0)
  143. thread_count = 1;
  144. return thread_count;
  145. }
  146. static
  147. void
  148. calculate_thread_counts(int *read_thread_count_,
  149. int *process_thread_count_)
  150. {
  151. if((*read_thread_count_ == -1) && (*process_thread_count_ == -1))
  152. {
  153. int nproc;
  154. nproc = std::thread::hardware_concurrency();
  155. *read_thread_count_ = 2;
  156. *process_thread_count_ = std::max(2,(nproc - 2));
  157. }
  158. else
  159. {
  160. *read_thread_count_ = ::calculate_thread_count(*read_thread_count_);
  161. if(*process_thread_count_ != -1)
  162. *process_thread_count_ = ::calculate_thread_count(*process_thread_count_);
  163. }
  164. }
  165. static
  166. void
  167. process_msgbuf_sync(fuse_worker_data_t *wd_,
  168. fuse_msgbuf_t *msgbuf_)
  169. {
  170. wd_->se->process_buf(wd_->se,msgbuf_);
  171. msgbuf_free(msgbuf_);
  172. }
  173. static
  174. void
  175. process_msgbuf_async(fuse_worker_data_t *wd_,
  176. fuse_msgbuf_t *msgbuf_)
  177. {
  178. const auto func = [=] {
  179. process_msgbuf_sync(wd_,msgbuf_);
  180. };
  181. wd_->tp->enqueue_work(func);
  182. }
  183. static
  184. void
  185. pin_threads_R1L(const CPU::ThreadIdVec read_threads_)
  186. {
  187. CPU::CPUVec cpus;
  188. cpus = CPU::cpus();
  189. if(cpus.empty())
  190. return;
  191. for(auto const thread_id : read_threads_)
  192. CPU::setaffinity(thread_id,cpus.front());
  193. }
  194. static
  195. void
  196. pin_threads_R1P(const CPU::ThreadIdVec read_threads_)
  197. {
  198. CPU::Core2CPUsMap core2cpus;
  199. core2cpus = CPU::core2cpus();
  200. if(core2cpus.empty())
  201. return;
  202. for(auto const thread_id : read_threads_)
  203. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  204. }
  205. static
  206. void
  207. pin_threads_RP1L(const CPU::ThreadIdVec read_threads_,
  208. const CPU::ThreadIdVec process_threads_)
  209. {
  210. CPU::CPUVec cpus;
  211. cpus = CPU::cpus();
  212. if(cpus.empty())
  213. return;
  214. for(auto const thread_id : read_threads_)
  215. CPU::setaffinity(thread_id,cpus.front());
  216. for(auto const thread_id : process_threads_)
  217. CPU::setaffinity(thread_id,cpus.front());
  218. }
  219. static
  220. void
  221. pin_threads_RP1P(const CPU::ThreadIdVec read_threads_,
  222. const CPU::ThreadIdVec process_threads_)
  223. {
  224. CPU::Core2CPUsMap core2cpus;
  225. core2cpus = CPU::core2cpus();
  226. if(core2cpus.empty())
  227. return;
  228. for(auto const thread_id : read_threads_)
  229. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  230. for(auto const thread_id : process_threads_)
  231. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  232. }
  233. static
  234. void
  235. pin_threads_R1LP1L(const std::vector<pthread_t> read_threads_,
  236. const std::vector<pthread_t> process_threads_)
  237. {
  238. CPU::CPUVec cpus;
  239. cpus = CPU::cpus();
  240. if(cpus.empty())
  241. return;
  242. for(auto const thread_id : read_threads_)
  243. CPU::setaffinity(thread_id,cpus.front());
  244. for(auto const thread_id : process_threads_)
  245. CPU::setaffinity(thread_id,cpus.back());
  246. }
  247. static
  248. void
  249. pin_threads_R1PP1P(const std::vector<pthread_t> read_threads_,
  250. const std::vector<pthread_t> process_threads_)
  251. {
  252. CPU::Core2CPUsMap core2cpus;
  253. core2cpus = CPU::core2cpus();
  254. if(core2cpus.empty())
  255. return;
  256. for(auto const thread_id : read_threads_)
  257. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  258. if(core2cpus.size() > 1)
  259. core2cpus.erase(core2cpus.begin());
  260. for(auto const thread_id : process_threads_)
  261. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  262. }
  263. static
  264. void
  265. pin_threads_RPSL(const std::vector<pthread_t> read_threads_,
  266. const std::vector<pthread_t> process_threads_)
  267. {
  268. CPU::CPUVec cpus;
  269. cpus = CPU::cpus();
  270. if(cpus.empty())
  271. return;
  272. for(auto const thread_id : read_threads_)
  273. {
  274. if(cpus.empty())
  275. cpus = CPU::cpus();
  276. CPU::setaffinity(thread_id,cpus.back());
  277. cpus.pop_back();
  278. }
  279. for(auto const thread_id : process_threads_)
  280. {
  281. if(cpus.empty())
  282. cpus = CPU::cpus();
  283. CPU::setaffinity(thread_id,cpus.back());
  284. cpus.pop_back();
  285. }
  286. }
  287. static
  288. void
  289. pin_threads_RPSP(const std::vector<pthread_t> read_threads_,
  290. const std::vector<pthread_t> process_threads_)
  291. {
  292. CPU::Core2CPUsMap core2cpus;
  293. core2cpus = CPU::core2cpus();
  294. if(core2cpus.empty())
  295. return;
  296. for(auto const thread_id : read_threads_)
  297. {
  298. if(core2cpus.empty())
  299. core2cpus = CPU::core2cpus();
  300. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  301. core2cpus.erase(core2cpus.begin());
  302. }
  303. for(auto const thread_id : process_threads_)
  304. {
  305. if(core2cpus.empty())
  306. core2cpus = CPU::core2cpus();
  307. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  308. core2cpus.erase(core2cpus.begin());
  309. }
  310. }
  311. static
  312. void
  313. pin_threads_R1PPSP(const std::vector<pthread_t> read_threads_,
  314. const std::vector<pthread_t> process_threads_)
  315. {
  316. CPU::Core2CPUsMap core2cpus;
  317. CPU::Core2CPUsMap leftover;
  318. core2cpus = CPU::core2cpus();
  319. if(core2cpus.empty())
  320. return;
  321. for(auto const thread_id : read_threads_)
  322. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  323. core2cpus.erase(core2cpus.begin());
  324. if(core2cpus.empty())
  325. core2cpus = CPU::core2cpus();
  326. leftover = core2cpus;
  327. for(auto const thread_id : process_threads_)
  328. {
  329. if(core2cpus.empty())
  330. core2cpus = leftover;
  331. CPU::setaffinity(thread_id,core2cpus.begin()->second);
  332. core2cpus.erase(core2cpus.begin());
  333. }
  334. }
  335. static
  336. void
  337. pin_threads(const std::vector<pthread_t> read_threads_,
  338. const std::vector<pthread_t> process_threads_,
  339. const std::string type_)
  340. {
  341. if(type_ == "R1L")
  342. return ::pin_threads_R1L(read_threads_);
  343. if(type_ == "R1P")
  344. return ::pin_threads_R1P(read_threads_);
  345. if(type_ == "RP1L")
  346. return ::pin_threads_RP1L(read_threads_,process_threads_);
  347. if(type_ == "RP1P")
  348. return ::pin_threads_RP1P(read_threads_,process_threads_);
  349. if(type_ == "R1LP1L")
  350. return ::pin_threads_R1LP1L(read_threads_,process_threads_);
  351. if(type_ == "R1PP1P")
  352. return ::pin_threads_R1PP1P(read_threads_,process_threads_);
  353. if(type_ == "RPSL")
  354. return ::pin_threads_RPSL(read_threads_,process_threads_);
  355. if(type_ == "RPSP")
  356. return ::pin_threads_RPSP(read_threads_,process_threads_);
  357. if(type_ == "R1PPSP")
  358. return ::pin_threads_R1PPSP(read_threads_,process_threads_);
  359. }
  360. int
  361. fuse_session_loop_mt(struct fuse_session *se_,
  362. const int raw_read_thread_count_,
  363. const int raw_process_thread_count_,
  364. const char *pin_threads_type_)
  365. {
  366. int err;
  367. int read_thread_count;
  368. int process_thread_count;
  369. fuse_worker_data_t wd = {0};
  370. std::vector<pthread_t> read_threads;
  371. std::vector<pthread_t> process_threads;
  372. read_thread_count = raw_read_thread_count_;
  373. process_thread_count = raw_process_thread_count_;
  374. ::calculate_thread_counts(&read_thread_count,&process_thread_count);
  375. if(process_thread_count > 0)
  376. {
  377. wd.tp = std::make_shared<ThreadPool>(process_thread_count);
  378. wd.msgbuf_processor = process_msgbuf_async;
  379. process_threads = wd.tp->threads();
  380. }
  381. else
  382. {
  383. wd.msgbuf_processor = process_msgbuf_sync;
  384. }
  385. wd.msgbuf_allocator = ((se_->f->splice_read) ? msgbuf_alloc : msgbuf_alloc_memonly);
  386. wd.se = se_;
  387. sem_init(&wd.finished,0,0);
  388. err = 0;
  389. for(int i = 0; i < read_thread_count; i++)
  390. {
  391. pthread_t thread_id;
  392. err = fuse_start_thread(&thread_id,fuse_do_work,&wd);
  393. assert(err == 0);
  394. read_threads.push_back(thread_id);
  395. }
  396. if(pin_threads_type_ != NULL)
  397. ::pin_threads(read_threads,process_threads,pin_threads_type_);
  398. if(!err)
  399. {
  400. /* sem_wait() is interruptible */
  401. while(!fuse_session_exited(se_))
  402. sem_wait(&wd.finished);
  403. for(const auto &thread_id : read_threads)
  404. pthread_cancel(thread_id);
  405. for(const auto &thread_id : read_threads)
  406. pthread_join(thread_id,NULL);
  407. }
  408. sem_destroy(&wd.finished);
  409. return err;
  410. }