You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

232 lines
4.9 KiB

8 years ago
  1. /*
  2. FUSE: Filesystem in Userspace
  3. Copyright (C) 2001-2007 Miklos Szeredi <miklos@szeredi.hu>
  4. This program can be distributed under the terms of the GNU LGPLv2.
  5. See the file COPYING.LIB.
  6. */
  7. #include "fuse_i.h"
  8. #include "fuse_kernel.h"
  9. #include "fuse_lowlevel.h"
  10. #include "fuse_misc.h"
  11. #include <errno.h>
  12. #include <semaphore.h>
  13. #include <signal.h>
  14. #include <stdio.h>
  15. #include <stdlib.h>
  16. #include <string.h>
  17. #include <sys/time.h>
  18. #include <unistd.h>
  19. #include <unistd.h>
  20. /* Environment var controlling the thread stack size */
  21. #define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK"
  22. struct fuse_worker
  23. {
  24. struct fuse_worker *prev;
  25. struct fuse_worker *next;
  26. pthread_t thread_id;
  27. size_t bufsize;
  28. char *buf;
  29. struct fuse_mt *mt;
  30. };
  31. struct fuse_mt
  32. {
  33. struct fuse_session *se;
  34. struct fuse_chan *prevch;
  35. struct fuse_worker main;
  36. sem_t finish;
  37. int exit;
  38. int error;
  39. };
  40. static
  41. void
  42. list_add_worker(struct fuse_worker *w,
  43. struct fuse_worker *next)
  44. {
  45. struct fuse_worker *prev = next->prev;
  46. w->next = next;
  47. w->prev = prev;
  48. prev->next = w;
  49. next->prev = w;
  50. }
  51. static void list_del_worker(struct fuse_worker *w)
  52. {
  53. struct fuse_worker *prev = w->prev;
  54. struct fuse_worker *next = w->next;
  55. prev->next = next;
  56. next->prev = prev;
  57. }
  58. static int fuse_loop_start_thread(struct fuse_mt *mt);
  59. static
  60. void*
  61. fuse_do_work(void *data)
  62. {
  63. struct fuse_worker *w = (struct fuse_worker *) data;
  64. struct fuse_mt *mt = w->mt;
  65. while(!fuse_session_exited(mt->se))
  66. {
  67. int res;
  68. struct fuse_buf fbuf;
  69. struct fuse_chan *ch = mt->prevch;
  70. fbuf = (struct fuse_buf){ .mem = w->buf,
  71. .size = w->bufsize };
  72. pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
  73. res = fuse_session_receive_buf(mt->se, &fbuf, &ch);
  74. pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
  75. if(res == -EINTR)
  76. continue;
  77. if(res <= 0) {
  78. if(res < 0) {
  79. fuse_session_exit(mt->se);
  80. mt->error = -1;
  81. }
  82. break;
  83. }
  84. if(mt->exit)
  85. return NULL;
  86. fuse_session_process_buf(mt->se, &fbuf, ch);
  87. }
  88. sem_post(&mt->finish);
  89. return NULL;
  90. }
  91. int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg)
  92. {
  93. sigset_t oldset;
  94. sigset_t newset;
  95. int res;
  96. pthread_attr_t attr;
  97. char *stack_size;
  98. /* Override default stack size */
  99. pthread_attr_init(&attr);
  100. stack_size = getenv(ENVNAME_THREAD_STACK);
  101. if(stack_size && pthread_attr_setstacksize(&attr, atoi(stack_size)))
  102. fprintf(stderr, "fuse: invalid stack size: %s\n", stack_size);
  103. /* Disallow signal reception in worker threads */
  104. sigfillset(&newset);
  105. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  106. res = pthread_create(thread_id, &attr, func, arg);
  107. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  108. pthread_attr_destroy(&attr);
  109. if(res != 0) {
  110. fprintf(stderr, "fuse: error creating thread: %s\n",
  111. strerror(res));
  112. return -1;
  113. }
  114. return 0;
  115. }
  116. static int fuse_loop_start_thread(struct fuse_mt *mt)
  117. {
  118. int res;
  119. struct fuse_worker *w = malloc(sizeof(struct fuse_worker));
  120. if(!w) {
  121. fprintf(stderr, "fuse: failed to allocate worker structure\n");
  122. return -1;
  123. }
  124. memset(w, 0, sizeof(struct fuse_worker));
  125. w->bufsize = fuse_chan_bufsize(mt->prevch);
  126. w->buf = calloc(w->bufsize,1);
  127. w->mt = mt;
  128. if(!w->buf) {
  129. fprintf(stderr, "fuse: failed to allocate read buffer\n");
  130. free(w);
  131. return -1;
  132. }
  133. res = fuse_start_thread(&w->thread_id, fuse_do_work, w);
  134. if(res == -1) {
  135. free(w->buf);
  136. free(w);
  137. return -1;
  138. }
  139. list_add_worker(w, &mt->main);
  140. return 0;
  141. }
  142. static void fuse_join_worker(struct fuse_worker *w)
  143. {
  144. pthread_join(w->thread_id, NULL);
  145. list_del_worker(w);
  146. free(w->buf);
  147. free(w);
  148. }
  149. static int number_of_threads(void)
  150. {
  151. #ifdef _SC_NPROCESSORS_ONLN
  152. return sysconf(_SC_NPROCESSORS_ONLN);
  153. #endif
  154. return 4;
  155. }
  156. int
  157. fuse_session_loop_mt(struct fuse_session *se_,
  158. const int threads_)
  159. {
  160. int i;
  161. int err;
  162. int threads;
  163. struct fuse_mt mt;
  164. struct fuse_worker *w;
  165. memset(&mt,0,sizeof(struct fuse_mt));
  166. mt.se = se_;
  167. mt.prevch = fuse_session_next_chan(se_,NULL);
  168. mt.error = 0;
  169. mt.main.thread_id = pthread_self();
  170. mt.main.prev = mt.main.next = &mt.main;
  171. sem_init(&mt.finish,0,0);
  172. threads = ((threads_ > 0) ? threads_ : number_of_threads());
  173. if(threads_ < 0)
  174. threads /= -threads_;
  175. if(threads == 0)
  176. threads = 1;
  177. err = 0;
  178. for(i = 0; (i < threads) && !err; i++)
  179. err = fuse_loop_start_thread(&mt);
  180. if(!err)
  181. {
  182. /* sem_wait() is interruptible */
  183. while(!fuse_session_exited(se_))
  184. sem_wait(&mt.finish);
  185. for(w = mt.main.next; w != &mt.main; w = w->next)
  186. pthread_cancel(w->thread_id);
  187. mt.exit = 1;
  188. while(mt.main.next != &mt.main)
  189. fuse_join_worker(mt.main.next);
  190. err = mt.error;
  191. }
  192. sem_destroy(&mt.finish);
  193. fuse_session_reset(se_);
  194. return err;
  195. }