mirror of https://github.com/trapexit/mergerfs.git
				
				
			
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							347 lines
						
					
					
						
							8.2 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							347 lines
						
					
					
						
							8.2 KiB
						
					
					
				| #ifndef _GNU_SOURCE | |
| #define _GNU_SOURCE | |
| #endif | |
|  | |
| #include "cpu.hpp" | |
| #include "pin_threads.hpp" | |
| #include "fmt/core.h" | |
| #include "scope_guard.hpp" | |
| #include "thread_pool.hpp" | |
| #include "syslog.hpp" | |
| #include "maintenance_thread.hpp" | |
|  | |
| #include "fuse_i.h" | |
| #include "fuse_kernel.h" | |
| #include "fuse_lowlevel.h" | |
|  | |
| #include "fuse_cfg.hpp" | |
| #include "fuse_msgbuf.hpp" | |
| #include "fuse_ll.hpp" | |
|  | |
| #include <cassert> | |
| #include <memory> | |
| #include <vector> | |
|  | |
| #include <errno.h> | |
| #include <pthread.h> | |
| #include <semaphore.h> | |
| #include <signal.h> | |
| #include <stdio.h> | |
| #include <stdlib.h> | |
| #include <string.h> | |
| #include <sys/time.h> | |
| #include <unistd.h> | |
|  | |
| 
 | |
| static | |
| bool | |
| _retriable_receive_error(const int err_) | |
| { | |
|   switch(err_) | |
|     { | |
|     case -EINTR: | |
|     case -EAGAIN: | |
|     case -ENOENT: | |
|       return true; | |
|     default: | |
|       return false; | |
|     } | |
| } | |
| 
 | |
| static | |
| void | |
| _print_error(int rv_) | |
| { | |
|   fmt::print(stderr, | |
|              "mergerfs: error reading from /dev/fuse - {} ({})\n", | |
|              strerror(-rv_), | |
|              -rv_); | |
| } | |
| 
 | |
| struct AsyncWorker | |
| { | |
|   fuse_session *_se; | |
|   sem_t *_finished; | |
|   std::shared_ptr<ThreadPool> _process_tp; | |
| 
 | |
|   AsyncWorker(fuse_session                *se_, | |
|               sem_t                       *finished_, | |
|               std::shared_ptr<ThreadPool>  process_tp_) | |
|     : _se(se_), | |
|       _finished(finished_), | |
|       _process_tp(process_tp_) | |
|   { | |
|   } | |
| 
 | |
|   inline | |
|   void | |
|   operator()() const | |
|   { | |
|     DEFER{ fuse_session_exit(_se); }; | |
|     DEFER{ sem_post(_finished); }; | |
| 
 | |
|     ThreadPool::PToken ptok(_process_tp->ptoken()); | |
|     while(!fuse_session_exited(_se)) | |
|       { | |
|         int rv; | |
|         fuse_msgbuf_t *msgbuf; | |
| 
 | |
|         msgbuf = msgbuf_alloc(); | |
| 
 | |
|         while(true) | |
|           { | |
|             pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL); | |
|             rv = _se->receive_buf(_se,msgbuf); | |
|             pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL); | |
| 
 | |
|             if(rv > 0) | |
|               break; | |
|             if(::_retriable_receive_error(rv)) | |
|               continue; | |
| 
 | |
|             msgbuf_free(msgbuf); | |
|             if((rv == 0) || (rv == -ENODEV)) | |
|               return; | |
| 
 | |
|             return ::_print_error(rv); | |
|           } | |
| 
 | |
|         _process_tp->enqueue_work(ptok, | |
|                                   [=]() | |
|                                   { | |
|                                     _se->process_buf(_se,msgbuf); | |
|                                     msgbuf_free(msgbuf); | |
|                                   }); | |
|       } | |
|   } | |
| }; | |
| 
 | |
| struct SyncWorker | |
| { | |
|   fuse_session *_se; | |
|   sem_t *_finished; | |
| 
 | |
|   SyncWorker(fuse_session *se_, | |
|              sem_t        *finished_) | |
| 
 | |
|     : _se(se_), | |
|       _finished(finished_) | |
|   { | |
|   } | |
| 
 | |
|   inline | |
|   void | |
|   operator()() const | |
|   { | |
|     DEFER{ fuse_session_exit(_se); }; | |
|     DEFER{ sem_post(_finished); }; | |
| 
 | |
|     while(!fuse_session_exited(_se)) | |
|       { | |
|         int rv; | |
|         fuse_msgbuf_t *msgbuf; | |
| 
 | |
|         msgbuf = msgbuf_alloc(); | |
| 
 | |
|         while(true) | |
|           { | |
|             pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL); | |
|             rv = _se->receive_buf(_se,msgbuf); | |
|             pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL); | |
|             if(rv > 0) | |
|               break; | |
|             if(::_retriable_receive_error(rv)) | |
|               continue; | |
| 
 | |
|             msgbuf_free(msgbuf); | |
|             if((rv == 0) || (rv == -ENODEV)) | |
|               return; | |
| 
 | |
|             return ::_print_error(rv); | |
|           } | |
| 
 | |
|         _se->process_buf(_se,msgbuf); | |
| 
 | |
|         msgbuf_free(msgbuf); | |
|       } | |
|   } | |
| }; | |
| 
 | |
| int | |
| fuse_start_thread(pthread_t *thread_id, | |
|                   void      *(*func)(void *), | |
|                   void      *arg) | |
| { | |
|   int res; | |
|   sigset_t oldset; | |
|   sigset_t newset; | |
| 
 | |
|   sigfillset(&newset); | |
|   pthread_sigmask(SIG_BLOCK,&newset,&oldset); | |
|   res = pthread_create(thread_id,NULL,func,arg); | |
|   pthread_sigmask(SIG_SETMASK,&oldset,NULL); | |
| 
 | |
|   if(res != 0) | |
|     { | |
|       fprintf(stderr, | |
|               "fuse: error creating thread: %s\n", | |
|               strerror(res)); | |
|       return -1; | |
|     } | |
| 
 | |
|   return 0; | |
| } | |
| 
 | |
| static | |
| int | |
| _calculate_thread_count(const int raw_thread_count_) | |
| { | |
|   int thread_count = 1; | |
| 
 | |
|   if(raw_thread_count_ == 0) | |
|     { | |
|       thread_count = std::thread::hardware_concurrency(); | |
|       thread_count = std::min(8,thread_count); | |
|     } | |
|   else if(raw_thread_count_ < 0) | |
|     { | |
|       thread_count = (std::thread::hardware_concurrency() / -raw_thread_count_); | |
|       thread_count = std::min(1,thread_count); | |
|     } | |
|   else if(raw_thread_count_ > 0) | |
|     { | |
|       thread_count = raw_thread_count_; | |
|     } | |
| 
 | |
|   return thread_count; | |
| } | |
| 
 | |
| static | |
| void | |
| _calculate_thread_counts(int *read_thread_count_, | |
|                          int *process_thread_count_, | |
|                          int *process_thread_queue_depth_) | |
| { | |
|   if((*read_thread_count_ == 0) && (*process_thread_count_ == -1)) | |
|     { | |
|       int nproc; | |
| 
 | |
|       nproc = std::thread::hardware_concurrency(); | |
|       *read_thread_count_ = std::min(8,nproc); | |
|     } | |
|   else if((*read_thread_count_ == 0) && (*process_thread_count_ == 0)) | |
|     { | |
|       int nproc; | |
| 
 | |
|       nproc = std::thread::hardware_concurrency(); | |
|       *read_thread_count_ = 2; | |
|       *process_thread_count_ = std::max(2,(nproc - 2)); | |
|       *process_thread_count_ = std::min(8,*process_thread_count_); | |
|     } | |
|   else if((*read_thread_count_ == 0) && (*process_thread_count_ != -1)) | |
|     { | |
|       *read_thread_count_    = 2; | |
|       *process_thread_count_ = ::_calculate_thread_count(*process_thread_count_); | |
|     } | |
|   else | |
|     { | |
|       *read_thread_count_    = ::_calculate_thread_count(*read_thread_count_); | |
|       if(*process_thread_count_ != -1) | |
|         *process_thread_count_ = ::_calculate_thread_count(*process_thread_count_); | |
|     } | |
| 
 | |
|   if(*process_thread_queue_depth_ <= 0) | |
|     *process_thread_queue_depth_ = 2; | |
| 
 | |
|   *process_thread_queue_depth_ *= std::abs(*process_thread_count_); | |
| } | |
| 
 | |
| int | |
| fuse_session_loop_mt(struct fuse_session *se_, | |
|                      const int            raw_read_thread_count_, | |
|                      const int            raw_process_thread_count_, | |
|                      const int            raw_process_thread_queue_depth_, | |
|                      const std::string    pin_threads_type_) | |
| { | |
|   sem_t finished; | |
|   int read_thread_count; | |
|   int process_thread_count; | |
|   int process_thread_queue_depth; | |
|   std::vector<pthread_t> read_threads; | |
|   std::vector<pthread_t> process_threads; | |
|   std::unique_ptr<ThreadPool> read_tp; | |
|   std::shared_ptr<ThreadPool> process_tp; | |
| 
 | |
|   sem_init(&finished,0,0); | |
| 
 | |
|   read_thread_count          = raw_read_thread_count_; | |
|   process_thread_count       = raw_process_thread_count_; | |
|   process_thread_queue_depth = raw_process_thread_queue_depth_; | |
|   ::_calculate_thread_counts(&read_thread_count, | |
|                              &process_thread_count, | |
|                              &process_thread_queue_depth); | |
| 
 | |
|   if(process_thread_count > 0) | |
|     process_tp = std::make_shared<ThreadPool>(process_thread_count, | |
|                                               process_thread_queue_depth, | |
|                                               "fuse.process"); | |
| 
 | |
|   read_tp = std::make_unique<ThreadPool>(read_thread_count, | |
|                                          read_thread_count, | |
|                                          "fuse.read"); | |
|   if(process_tp) | |
|     { | |
|       for(auto i = 0; i < read_thread_count; i++) | |
|         read_tp->enqueue_work(AsyncWorker(se_,&finished,process_tp)); | |
|     } | |
|   else | |
|     { | |
|       for(auto i = 0; i < read_thread_count; i++) | |
|         read_tp->enqueue_work(SyncWorker(se_,&finished)); | |
|     } | |
| 
 | |
|   if(read_tp) | |
|     read_threads = read_tp->threads(); | |
|   if(process_tp) | |
|     process_threads = process_tp->threads(); | |
| 
 | |
|   PinThreads::pin(read_threads,process_threads,pin_threads_type_); | |
| 
 | |
|   SysLog::info("read-thread-count={}; " | |
|                "process-thread-count={}; " | |
|                "process-thread-queue-depth={}; " | |
|                "pin-threads={};", | |
|                read_thread_count, | |
|                process_thread_count, | |
|                process_thread_queue_depth, | |
|                pin_threads_type_); | |
| 
 | |
|   while(!fuse_session_exited(se_)) | |
|     sem_wait(&finished); | |
| 
 | |
|   sem_destroy(&finished); | |
| 
 | |
|   return 0; | |
| } | |
| 
 | |
| int | |
| fuse_loop_mt(struct fuse *f_) | |
| { | |
|   int res; | |
| 
 | |
|   if(f_ == NULL) | |
|     return -1; | |
| 
 | |
|   MaintenanceThread::setup(); | |
|   fuse_populate_maintenance_thread(f_); | |
| 
 | |
|   res = fuse_session_loop_mt(fuse_get_session(), | |
|                              fuse_cfg.read_thread_count, | |
|                              fuse_cfg.process_thread_count, | |
|                              fuse_cfg.process_thread_queue_depth, | |
|                              fuse_cfg.pin_threads); | |
| 
 | |
|   MaintenanceThread::stop(); | |
| 
 | |
|   return res; | |
| }
 |