Browse Source

Merge pull request #424 from trapexit/threading

Threading
pull/428/head 2.23.0
Antonio SJ Musumeci 7 years ago
committed by GitHub
parent
commit
c35a4dadee
  1. 1
      README.md
  2. 2
      libfuse/include/fuse.h
  3. 2
      libfuse/include/fuse_lowlevel.h
  4. 2
      libfuse/lib/cuse_lowlevel.c
  5. 7
      libfuse/lib/fuse.c
  6. 87
      libfuse/lib/fuse_loop_mt.c
  7. 6
      libfuse/lib/fuse_mt.c
  8. 12
      man/mergerfs.1
  9. 5
      src/option_parser.cpp

1
README.md

@ -42,6 +42,7 @@ mergerfs -o<options> <srcmounts> <mountpoint>
* **symlinkify_timeout**: time to wait, in seconds, to activate the **symlinkify** behavior. (default: 3600) * **symlinkify_timeout**: time to wait, in seconds, to activate the **symlinkify** behavior. (default: 3600)
* **nullrw**: turns reads and writes into no-ops. The request will succeed but do nothing. Useful for benchmarking mergerfs. (default: false) * **nullrw**: turns reads and writes into no-ops. The request will succeed but do nothing. Useful for benchmarking mergerfs. (default: false)
* **ignorepponrename**: ignore path preserving on rename. Typically rename and link act differently depending on the policy of `create` (read below). Enabling this will cause rename and link to always use the non-path preserving behavior. This means files, when renamed or linked, will stay on the same drive. * **ignorepponrename**: ignore path preserving on rename. Typically rename and link act differently depending on the policy of `create` (read below). Enabling this will cause rename and link to always use the non-path preserving behavior. This means files, when renamed or linked, will stay on the same drive.
* **threads**: number of threads to use in multithreaded mode. When set to zero (the default) it will attempt to discover and use the number of logical cores. If the lookup fails it will fall back to using 4. If the thread count is set negative it will look up the number of cores then divide by the absolute value. ie. threads=-2 on an 8 core machine will result in 8 / 2 = 4 threads. There will always be at least 1 thread. NOTE: higher number of threads increases parallelism but usually decreases throughput.
* **fsname**: sets the name of the filesystem as seen in **mount**, **df**, etc. Defaults to a list of the source paths concatenated together with the longest common prefix removed. * **fsname**: sets the name of the filesystem as seen in **mount**, **df**, etc. Defaults to a list of the source paths concatenated together with the longest common prefix removed.
* **func.<func>=<policy>**: sets the specific FUSE function's policy. See below for the list of value types. Example: **func.getattr=newest** * **func.<func>=<policy>**: sets the specific FUSE function's policy. See below for the list of value types. Example: **func.getattr=newest**
* **category.<category>=<policy>**: Sets policy of all FUSE functions in the provided category. Example: **category.create=mfs** * **category.<category>=<policy>**: Sets policy of all FUSE functions in the provided category. Example: **category.create=mfs**

2
libfuse/include/fuse.h

@ -696,6 +696,8 @@ int fuse_loop(struct fuse *f);
*/ */
void fuse_exit(struct fuse *f); void fuse_exit(struct fuse *f);
int fuse_config_num_threads(const struct fuse *f);
/** /**
* FUSE event loop with multiple threads * FUSE event loop with multiple threads
* *

2
libfuse/include/fuse_lowlevel.h

@ -1700,7 +1700,7 @@ int fuse_session_loop(struct fuse_session *se);
* @param se the session * @param se the session
* @return 0 on success, -1 on error * @return 0 on success, -1 on error
*/ */
int fuse_session_loop_mt(struct fuse_session *se);
int fuse_session_loop_mt(struct fuse_session *se, const int threads);
/* ----------------------------------------------------------- * /* ----------------------------------------------------------- *
* Channel interface * * Channel interface *

2
libfuse/lib/cuse_lowlevel.c

@ -359,7 +359,7 @@ int cuse_lowlevel_main(int argc, char *argv[], const struct cuse_info *ci,
return 1; return 1;
if (multithreaded) if (multithreaded)
res = fuse_session_loop_mt(se);
res = fuse_session_loop_mt(se, 0);
else else
res = fuse_session_loop(se); res = fuse_session_loop(se);

7
libfuse/lib/fuse.c

@ -77,6 +77,7 @@ struct fuse_config {
int intr_signal; int intr_signal;
int help; int help;
char *modules; char *modules;
int threads;
}; };
struct fuse_fs { struct fuse_fs {
@ -4405,6 +4406,7 @@ static const struct fuse_opt fuse_lib_opts[] = {
FUSE_LIB_OPT("intr", intr, 1), FUSE_LIB_OPT("intr", intr, 1),
FUSE_LIB_OPT("intr_signal=%d", intr_signal, 0), FUSE_LIB_OPT("intr_signal=%d", intr_signal, 0),
FUSE_LIB_OPT("modules=%s", modules, 0), FUSE_LIB_OPT("modules=%s", modules, 0),
FUSE_LIB_OPT("threads=%d", threads, 0),
FUSE_OPT_END FUSE_OPT_END
}; };
@ -4900,3 +4902,8 @@ struct fuse *fuse_new_compat25(int fd, struct fuse_args *args,
} }
FUSE_SYMVER(".symver fuse_new_compat25,fuse_new@FUSE_2.5"); FUSE_SYMVER(".symver fuse_new_compat25,fuse_new@FUSE_2.5");
int fuse_config_num_threads(const struct fuse *f)
{
return f->conf.threads;
}

87
libfuse/lib/fuse_loop_mt.c

@ -19,6 +19,7 @@
#include <semaphore.h> #include <semaphore.h>
#include <errno.h> #include <errno.h>
#include <sys/time.h> #include <sys/time.h>
#include <unistd.h>
/* Environment var controlling the thread stack size */ /* Environment var controlling the thread stack size */
#define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK" #define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK"
@ -33,9 +34,6 @@ struct fuse_worker {
}; };
struct fuse_mt { struct fuse_mt {
pthread_mutex_t lock;
int numworker;
int numavail;
struct fuse_session *se; struct fuse_session *se;
struct fuse_chan *prevch; struct fuse_chan *prevch;
struct fuse_worker main; struct fuse_worker main;
@ -69,7 +67,6 @@ static void *fuse_do_work(void *data)
struct fuse_mt *mt = w->mt; struct fuse_mt *mt = w->mt;
while (!fuse_session_exited(mt->se)) { while (!fuse_session_exited(mt->se)) {
int isforget = 0;
struct fuse_chan *ch = mt->prevch; struct fuse_chan *ch = mt->prevch;
struct fuse_buf fbuf = { struct fuse_buf fbuf = {
.mem = w->buf, .mem = w->buf,
@ -90,51 +87,10 @@ static void *fuse_do_work(void *data)
break; break;
} }
pthread_mutex_lock(&mt->lock);
if (mt->exit) {
pthread_mutex_unlock(&mt->lock);
if (mt->exit)
return NULL; return NULL;
}
/*
* This disgusting hack is needed so that zillions of threads
* are not created on a burst of FORGET messages
*/
if (!(fbuf.flags & FUSE_BUF_IS_FD)) {
struct fuse_in_header *in = fbuf.mem;
if (in->opcode == FUSE_FORGET ||
in->opcode == FUSE_BATCH_FORGET)
isforget = 1;
}
if (!isforget)
mt->numavail--;
if (mt->numavail == 0)
fuse_loop_start_thread(mt);
pthread_mutex_unlock(&mt->lock);
fuse_session_process_buf(mt->se, &fbuf, ch); fuse_session_process_buf(mt->se, &fbuf, ch);
pthread_mutex_lock(&mt->lock);
if (!isforget)
mt->numavail++;
if (mt->numavail > 10) {
if (mt->exit) {
pthread_mutex_unlock(&mt->lock);
return NULL;
}
list_del_worker(w);
mt->numavail--;
mt->numworker--;
pthread_mutex_unlock(&mt->lock);
pthread_detach(w->thread_id);
free(w->buf);
free(w);
return NULL;
}
pthread_mutex_unlock(&mt->lock);
} }
sem_post(&mt->finish); sem_post(&mt->finish);
@ -200,25 +156,33 @@ static int fuse_loop_start_thread(struct fuse_mt *mt)
return -1; return -1;
} }
list_add_worker(w, &mt->main); list_add_worker(w, &mt->main);
mt->numavail ++;
mt->numworker ++;
return 0; return 0;
} }
static void fuse_join_worker(struct fuse_mt *mt, struct fuse_worker *w)
static void fuse_join_worker(struct fuse_worker *w)
{ {
pthread_join(w->thread_id, NULL); pthread_join(w->thread_id, NULL);
pthread_mutex_lock(&mt->lock);
list_del_worker(w); list_del_worker(w);
pthread_mutex_unlock(&mt->lock);
free(w->buf); free(w->buf);
free(w); free(w);
} }
int fuse_session_loop_mt(struct fuse_session *se)
static int number_of_threads(void)
{ {
#ifdef _SC_NPROCESSORS_ONLN
return sysconf(_SC_NPROCESSORS_ONLN);
#endif
return 4;
}
int fuse_session_loop_mt(struct fuse_session *se,
const int _threads)
{
int i;
int err; int err;
int threads;
struct fuse_mt mt; struct fuse_mt mt;
struct fuse_worker *w; struct fuse_worker *w;
@ -226,34 +190,35 @@ int fuse_session_loop_mt(struct fuse_session *se)
mt.se = se; mt.se = se;
mt.prevch = fuse_session_next_chan(se, NULL); mt.prevch = fuse_session_next_chan(se, NULL);
mt.error = 0; mt.error = 0;
mt.numworker = 0;
mt.numavail = 0;
mt.main.thread_id = pthread_self(); mt.main.thread_id = pthread_self();
mt.main.prev = mt.main.next = &mt.main; mt.main.prev = mt.main.next = &mt.main;
sem_init(&mt.finish, 0, 0); sem_init(&mt.finish, 0, 0);
fuse_mutex_init(&mt.lock);
pthread_mutex_lock(&mt.lock);
threads = ((_threads > 0) ? _threads : number_of_threads());
if(_threads < 0)
threads /= -_threads;
if(threads == 0)
threads = 1;
err = 0;
for(i = 0; (i < threads) && !err; i++)
err = fuse_loop_start_thread(&mt); err = fuse_loop_start_thread(&mt);
pthread_mutex_unlock(&mt.lock);
if (!err) { if (!err) {
/* sem_wait() is interruptible */ /* sem_wait() is interruptible */
while (!fuse_session_exited(se)) while (!fuse_session_exited(se))
sem_wait(&mt.finish); sem_wait(&mt.finish);
pthread_mutex_lock(&mt.lock);
for (w = mt.main.next; w != &mt.main; w = w->next) for (w = mt.main.next; w != &mt.main; w = w->next)
pthread_cancel(w->thread_id); pthread_cancel(w->thread_id);
mt.exit = 1; mt.exit = 1;
pthread_mutex_unlock(&mt.lock);
while (mt.main.next != &mt.main) while (mt.main.next != &mt.main)
fuse_join_worker(&mt, mt.main.next);
fuse_join_worker(mt.main.next);
err = mt.error; err = mt.error;
} }
pthread_mutex_destroy(&mt.lock);
sem_destroy(&mt.finish); sem_destroy(&mt.finish);
fuse_session_reset(se); fuse_session_reset(se);
return err; return err;

6
libfuse/lib/fuse_mt.c

@ -100,7 +100,8 @@ int fuse_loop_mt_proc(struct fuse *f, fuse_processor_t proc, void *data)
return -1; return -1;
} }
fuse_session_add_chan(se, ch); fuse_session_add_chan(se, ch);
res = fuse_session_loop_mt(se);
res = fuse_session_loop_mt(se,
fuse_config_num_threads(f));
fuse_session_destroy(se); fuse_session_destroy(se);
return res; return res;
} }
@ -114,7 +115,8 @@ int fuse_loop_mt(struct fuse *f)
if (res) if (res)
return -1; return -1;
res = fuse_session_loop_mt(fuse_get_session(f));
res = fuse_session_loop_mt(fuse_get_session(f),
fuse_config_num_threads(f));
fuse_stop_cleanup_thread(f); fuse_stop_cleanup_thread(f);
return res; return res;
} }

12
man/mergerfs.1

@ -103,6 +103,18 @@ Enabling this will cause rename and link to always use the non\-path
preserving behavior. preserving behavior.
This means files, when renamed or linked, will stay on the same drive. This means files, when renamed or linked, will stay on the same drive.
.IP \[bu] 2 .IP \[bu] 2
\f[B]threads\f[]: number of threads to use in multithreaded mode.
When set to zero (the default) it will attempt to discover and use the
number of logical cores.
If the lookup fails it will fall back to using 4.
If the thread count is set negative it will look up the number of cores
then divide by the absolute value.
ie.
threads=\-2 on an 8 core machine will result in 8 / 2 = 4 threads.
There will always be at least 1 thread.
NOTE: higher number of threads increases parallelism but usually
decreases throughput.
.IP \[bu] 2
\f[B]fsname\f[]: sets the name of the filesystem as seen in \f[B]fsname\f[]: sets the name of the filesystem as seen in
\f[B]mount\f[], \f[B]df\f[], etc. \f[B]mount\f[], \f[B]df\f[], etc.
Defaults to a list of the source paths concatenated together with the Defaults to a list of the source paths concatenated together with the

5
src/option_parser.cpp

@ -298,7 +298,10 @@ usage(void)
" -o nullrw=<bool> Disables reads and writes. For benchmarking.\n" " -o nullrw=<bool> Disables reads and writes. For benchmarking.\n"
" -o ignorepponrename=<bool>\n" " -o ignorepponrename=<bool>\n"
" Ignore path preserving when performing renames\n" " Ignore path preserving when performing renames\n"
" and links. default = false"
" and links. default = false\n"
" -o threads=<int> number of worker threads. 0 = autodetect.\n"
" Negative values autodetect then divide by\n"
" absolute value. default = 0\n"
<< std::endl; << std::endl;
} }

Loading…
Cancel
Save