You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

266 lines
5.5 KiB

  1. #include "thread_pool.hpp"
  2. #include "fuse_i.h"
  3. #include "fuse_kernel.h"
  4. #include "fuse_lowlevel.h"
  5. #include "fuse_misc.h"
  6. #include "fuse_msgbuf.hpp"
  7. #include "fuse_ll.hpp"
  8. #include <errno.h>
  9. #include <semaphore.h>
  10. #include <signal.h>
  11. #include <stdio.h>
  12. #include <stdlib.h>
  13. #include <string.h>
  14. #include <sys/time.h>
  15. #include <unistd.h>
  16. #include <cassert>
  17. #include <vector>
  18. struct fuse_worker_data_t
  19. {
  20. struct fuse_session *se;
  21. sem_t finished;
  22. std::function<void(fuse_worker_data_t*,fuse_msgbuf_t*)> msgbuf_processor;
  23. std::function<fuse_msgbuf_t*(void)> msgbuf_allocator;
  24. std::shared_ptr<ThreadPool> tp;
  25. };
  26. class WorkerCleanup
  27. {
  28. public:
  29. WorkerCleanup(fuse_worker_data_t *wd_)
  30. : _wd(wd_)
  31. {
  32. }
  33. ~WorkerCleanup()
  34. {
  35. fuse_session_exit(_wd->se);
  36. sem_post(&_wd->finished);
  37. }
  38. private:
  39. fuse_worker_data_t *_wd;
  40. };
  41. static
  42. bool
  43. retriable_receive_error(const int err_)
  44. {
  45. switch(err_)
  46. {
  47. case -EINTR:
  48. case -EAGAIN:
  49. case -ENOENT:
  50. return true;
  51. default:
  52. return false;
  53. }
  54. }
  55. static
  56. bool
  57. fatal_receive_error(const int err_)
  58. {
  59. return (err_ < 0);
  60. }
  61. static
  62. void*
  63. handle_receive_error(const int rv_,
  64. fuse_msgbuf_t *msgbuf_)
  65. {
  66. msgbuf_free(msgbuf_);
  67. fprintf(stderr,
  68. "mergerfs: error reading from /dev/fuse - %s (%d)\n",
  69. strerror(-rv_),
  70. -rv_);
  71. return NULL;
  72. }
  73. static
  74. void*
  75. fuse_do_work(void *data)
  76. {
  77. fuse_worker_data_t *wd = (fuse_worker_data_t*)data;
  78. fuse_session *se = wd->se;
  79. auto &process_msgbuf = wd->msgbuf_processor;
  80. auto &msgbuf_allocator = wd->msgbuf_allocator;
  81. WorkerCleanup workercleanup(wd);
  82. while(!fuse_session_exited(se))
  83. {
  84. int rv;
  85. fuse_msgbuf_t *msgbuf;
  86. msgbuf = msgbuf_allocator();
  87. do
  88. {
  89. pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
  90. rv = se->receive_buf(se,msgbuf);
  91. pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
  92. if(rv == 0)
  93. return NULL;
  94. if(retriable_receive_error(rv))
  95. continue;
  96. if(fatal_receive_error(rv))
  97. return handle_receive_error(rv,msgbuf);
  98. } while(false);
  99. process_msgbuf(wd,msgbuf);
  100. }
  101. return NULL;
  102. }
  103. int
  104. fuse_start_thread(pthread_t *thread_id,
  105. void *(*func)(void *),
  106. void *arg)
  107. {
  108. int res;
  109. sigset_t oldset;
  110. sigset_t newset;
  111. sigfillset(&newset);
  112. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  113. res = pthread_create(thread_id,NULL,func,arg);
  114. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  115. if(res != 0)
  116. {
  117. fprintf(stderr,
  118. "fuse: error creating thread: %s\n",
  119. strerror(res));
  120. return -1;
  121. }
  122. return 0;
  123. }
  124. static
  125. int
  126. calculate_thread_count(const int raw_thread_count_)
  127. {
  128. int thread_count;
  129. thread_count = 4;
  130. if(raw_thread_count_ == 0)
  131. thread_count = std::thread::hardware_concurrency();
  132. else if(raw_thread_count_ < 0)
  133. thread_count = (std::thread::hardware_concurrency() / -raw_thread_count_);
  134. else if(raw_thread_count_ > 0)
  135. thread_count = raw_thread_count_;
  136. if(thread_count <= 0)
  137. thread_count = 1;
  138. return thread_count;
  139. }
  140. static
  141. void
  142. calculate_thread_counts(int *read_thread_count_,
  143. int *process_thread_count_)
  144. {
  145. if((*read_thread_count_ == -1) && (*process_thread_count_ == -1))
  146. {
  147. int nproc;
  148. nproc = std::thread::hardware_concurrency();
  149. *read_thread_count_ = 2;
  150. *process_thread_count_ = std::max(2,(nproc - 2));
  151. }
  152. else
  153. {
  154. *read_thread_count_ = ::calculate_thread_count(*read_thread_count_);
  155. if(*process_thread_count_ != -1)
  156. *process_thread_count_ = ::calculate_thread_count(*process_thread_count_);
  157. }
  158. }
  159. static
  160. void
  161. process_msgbuf_sync(fuse_worker_data_t *wd_,
  162. fuse_msgbuf_t *msgbuf_)
  163. {
  164. wd_->se->process_buf(wd_->se,msgbuf_);
  165. msgbuf_free(msgbuf_);
  166. }
  167. static
  168. void
  169. process_msgbuf_async(fuse_worker_data_t *wd_,
  170. fuse_msgbuf_t *msgbuf_)
  171. {
  172. const auto func = [=] {
  173. process_msgbuf_sync(wd_,msgbuf_);
  174. };
  175. wd_->tp->enqueue_work(func);
  176. }
  177. int
  178. fuse_session_loop_mt(struct fuse_session *se_,
  179. const int raw_read_thread_count_,
  180. const int raw_process_thread_count_)
  181. {
  182. int err;
  183. int read_thread_count;
  184. int process_thread_count;
  185. fuse_worker_data_t wd = {0};
  186. std::vector<pthread_t> threads;
  187. read_thread_count = raw_read_thread_count_;
  188. process_thread_count = raw_process_thread_count_;
  189. ::calculate_thread_counts(&read_thread_count,&process_thread_count);
  190. if(process_thread_count > 0)
  191. {
  192. wd.tp = std::make_shared<ThreadPool>(process_thread_count);
  193. wd.msgbuf_processor = process_msgbuf_async;
  194. }
  195. else
  196. {
  197. wd.msgbuf_processor = process_msgbuf_sync;
  198. }
  199. wd.msgbuf_allocator = ((se_->f->splice_read) ? msgbuf_alloc : msgbuf_alloc_memonly);
  200. wd.se = se_;
  201. sem_init(&wd.finished,0,0);
  202. err = 0;
  203. for(int i = 0; i < read_thread_count; i++)
  204. {
  205. pthread_t thread_id;
  206. err = fuse_start_thread(&thread_id,fuse_do_work,&wd);
  207. assert(err == 0);
  208. threads.push_back(thread_id);
  209. }
  210. if(!err)
  211. {
  212. /* sem_wait() is interruptible */
  213. while(!fuse_session_exited(se_))
  214. sem_wait(&wd.finished);
  215. for(const auto &thread_id : threads)
  216. pthread_cancel(thread_id);
  217. for(const auto &thread_id : threads)
  218. pthread_join(thread_id,NULL);
  219. }
  220. sem_destroy(&wd.finished);
  221. return err;
  222. }