You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

231 lines
4.8 KiB

8 years ago
  1. /*
  2. FUSE: Filesystem in Userspace
  3. Copyright (C) 2001-2007 Miklos Szeredi <miklos@szeredi.hu>
  4. This program can be distributed under the terms of the GNU LGPLv2.
  5. See the file COPYING.LIB.
  6. */
  7. #include "fuse_i.h"
  8. #include "fuse_kernel.h"
  9. #include "fuse_lowlevel.h"
  10. #include "fuse_misc.h"
  11. #include <errno.h>
  12. #include <semaphore.h>
  13. #include <signal.h>
  14. #include <stdio.h>
  15. #include <stdlib.h>
  16. #include <string.h>
  17. #include <sys/time.h>
  18. #include <unistd.h>
  19. #include <unistd.h>
  20. /* Environment var controlling the thread stack size */
  21. #define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK"
  22. struct fuse_worker
  23. {
  24. struct fuse_worker *prev;
  25. struct fuse_worker *next;
  26. pthread_t thread_id;
  27. size_t bufsize;
  28. char *buf;
  29. struct fuse_mt *mt;
  30. };
  31. struct fuse_mt
  32. {
  33. struct fuse_session *se;
  34. struct fuse_worker main;
  35. sem_t finish;
  36. int exit;
  37. int error;
  38. };
  39. static
  40. void
  41. list_add_worker(struct fuse_worker *w,
  42. struct fuse_worker *next)
  43. {
  44. struct fuse_worker *prev = next->prev;
  45. w->next = next;
  46. w->prev = prev;
  47. prev->next = w;
  48. next->prev = w;
  49. }
  50. static void list_del_worker(struct fuse_worker *w)
  51. {
  52. struct fuse_worker *prev = w->prev;
  53. struct fuse_worker *next = w->next;
  54. prev->next = next;
  55. next->prev = prev;
  56. }
  57. static int fuse_loop_start_thread(struct fuse_mt *mt);
  58. static
  59. void*
  60. fuse_do_work(void *data)
  61. {
  62. struct fuse_worker *w = (struct fuse_worker *) data;
  63. struct fuse_mt *mt = w->mt;
  64. while(!fuse_session_exited(mt->se))
  65. {
  66. int res;
  67. struct fuse_buf fbuf;
  68. fbuf = (struct fuse_buf){ .mem = w->buf,
  69. .size = w->bufsize };
  70. pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
  71. res = fuse_session_receive(mt->se,&fbuf);
  72. pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
  73. if(res == -EINTR)
  74. continue;
  75. if(res <= 0)
  76. {
  77. if(res < 0)
  78. {
  79. mt->se->exited = 1;
  80. mt->error = -1;
  81. }
  82. break;
  83. }
  84. if(mt->exit)
  85. return NULL;
  86. fuse_session_process(mt->se,&fbuf);
  87. }
  88. sem_post(&mt->finish);
  89. return NULL;
  90. }
  91. int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg)
  92. {
  93. sigset_t oldset;
  94. sigset_t newset;
  95. int res;
  96. pthread_attr_t attr;
  97. char *stack_size;
  98. /* Override default stack size */
  99. pthread_attr_init(&attr);
  100. stack_size = getenv(ENVNAME_THREAD_STACK);
  101. if(stack_size && pthread_attr_setstacksize(&attr, atoi(stack_size)))
  102. fprintf(stderr, "fuse: invalid stack size: %s\n", stack_size);
  103. /* Disallow signal reception in worker threads */
  104. sigfillset(&newset);
  105. pthread_sigmask(SIG_BLOCK,&newset,&oldset);
  106. res = pthread_create(thread_id, &attr, func, arg);
  107. pthread_sigmask(SIG_SETMASK,&oldset,NULL);
  108. pthread_attr_destroy(&attr);
  109. if(res != 0) {
  110. fprintf(stderr, "fuse: error creating thread: %s\n",
  111. strerror(res));
  112. return -1;
  113. }
  114. return 0;
  115. }
  116. static int fuse_loop_start_thread(struct fuse_mt *mt)
  117. {
  118. int res;
  119. struct fuse_worker *w = malloc(sizeof(struct fuse_worker));
  120. if(!w) {
  121. fprintf(stderr, "fuse: failed to allocate worker structure\n");
  122. return -1;
  123. }
  124. memset(w, 0, sizeof(struct fuse_worker));
  125. w->bufsize = fuse_chan_bufsize(mt->se->ch);
  126. w->buf = calloc(w->bufsize,1);
  127. w->mt = mt;
  128. if(!w->buf) {
  129. fprintf(stderr, "fuse: failed to allocate read buffer\n");
  130. free(w);
  131. return -1;
  132. }
  133. res = fuse_start_thread(&w->thread_id, fuse_do_work, w);
  134. if(res == -1) {
  135. free(w->buf);
  136. free(w);
  137. return -1;
  138. }
  139. list_add_worker(w, &mt->main);
  140. return 0;
  141. }
  142. static void fuse_join_worker(struct fuse_worker *w)
  143. {
  144. pthread_join(w->thread_id, NULL);
  145. list_del_worker(w);
  146. free(w->buf);
  147. free(w);
  148. }
  149. static int number_of_threads(void)
  150. {
  151. #ifdef _SC_NPROCESSORS_ONLN
  152. return sysconf(_SC_NPROCESSORS_ONLN);
  153. #endif
  154. return 4;
  155. }
  156. int
  157. fuse_session_loop_mt(struct fuse_session *se_,
  158. const int threads_)
  159. {
  160. int i;
  161. int err;
  162. int threads;
  163. struct fuse_mt mt;
  164. struct fuse_worker *w;
  165. memset(&mt,0,sizeof(struct fuse_mt));
  166. mt.se = se_;
  167. mt.error = 0;
  168. mt.main.thread_id = pthread_self();
  169. mt.main.prev = mt.main.next = &mt.main;
  170. sem_init(&mt.finish,0,0);
  171. threads = ((threads_ > 0) ? threads_ : number_of_threads());
  172. if(threads_ < 0)
  173. threads /= -threads_;
  174. if(threads == 0)
  175. threads = 1;
  176. err = 0;
  177. for(i = 0; (i < threads) && !err; i++)
  178. err = fuse_loop_start_thread(&mt);
  179. if(!err)
  180. {
  181. /* sem_wait() is interruptible */
  182. while(!fuse_session_exited(se_))
  183. sem_wait(&mt.finish);
  184. for(w = mt.main.next; w != &mt.main; w = w->next)
  185. pthread_cancel(w->thread_id);
  186. mt.exit = 1;
  187. while(mt.main.next != &mt.main)
  188. fuse_join_worker(mt.main.next);
  189. err = mt.error;
  190. }
  191. sem_destroy(&mt.finish);
  192. fuse_session_reset(se_);
  193. return err;
  194. }