From 51fabb38ea6da75ef1d3d991d8f1f11b4f4db7ae Mon Sep 17 00:00:00 2001 From: Rodolphe Breard Date: Thu, 23 Apr 2020 10:33:26 +0200 Subject: [PATCH] Remove threads The threading architecture was not good at all. It was built around the HTTP part, which isn't flexible enough. Hence, the decision has been made to refactor both of those parts. This will takes 4 steps : 1. remove threads and rate limits (this commit) 2. refactor the HTTP part (the tricky part) 3. reintroduce rate-limits (quite easy) 4. introduce a new threading system (the easiest part) --- acmed/src/acme_proto/http.rs | 4 - acmed/src/certificate.rs | 2 - acmed/src/config.rs | 22 +--- acmed/src/main.rs | 1 - acmed/src/main_event_loop.rs | 20 +--- acmed/src/rate_limits.rs | 188 ----------------------------------- 6 files changed, 2 insertions(+), 235 deletions(-) delete mode 100644 acmed/src/rate_limits.rs diff --git a/acmed/src/acme_proto/http.rs b/acmed/src/acme_proto/http.rs index ff19ead..923d12b 100644 --- a/acmed/src/acme_proto/http.rs +++ b/acmed/src/acme_proto/http.rs @@ -1,6 +1,5 @@ use crate::acme_proto::structs::{AcmeError, ApiError, Directory, HttpApiError}; use crate::certificate::Certificate; -use crate::rate_limits; use acme_common::error::Error; use http_req::request::{self, Method}; use http_req::response::Response; @@ -53,9 +52,6 @@ fn new_request<'a>(root_certs: &'a [String], uri: &'a Uri, method: Method) -> Re fn send_request(cert: &Certificate, request: &Request) -> Result<(Response, String), Error> { let mut buffer = Vec::new(); - cert.https_throttle - .send(rate_limits::Request::HttpsRequest) - .unwrap(); cert.debug(&format!("{}: {}", request.method, request.uri)); let res = request.r.send(&mut buffer)?; let res_str = String::from_utf8(buffer)?; diff --git a/acmed/src/certificate.rs b/acmed/src/certificate.rs index b871100..09e78c7 100644 --- a/acmed/src/certificate.rs +++ b/acmed/src/certificate.rs @@ -7,7 +7,6 @@ use acme_common::error::Error; use log::{debug, info, trace, warn}; use std::collections::{HashMap, HashSet}; use std::fmt; -use std::sync::mpsc::SyncSender; use std::time::Duration; #[derive(Clone, Debug)] @@ -50,7 +49,6 @@ pub struct Certificate { pub kp_reuse: bool, pub remote_url: String, pub tos_agreed: bool, - pub https_throttle: SyncSender, pub hooks: Vec, pub account_directory: String, pub crt_directory: String, diff --git a/acmed/src/config.rs b/acmed/src/config.rs index d36ea85..e76273c 100644 --- a/acmed/src/config.rs +++ b/acmed/src/config.rs @@ -1,6 +1,5 @@ use crate::certificate::Algorithm; use crate::hooks; -use crate::rate_limits; use acme_common::error::Error; use acme_common::to_idna; use log::info; @@ -9,8 +8,7 @@ use std::collections::HashMap; use std::fs::{self, File}; use std::io::prelude::*; use std::path::{Path, PathBuf}; -use std::sync::mpsc; -use std::{fmt, thread}; +use std::fmt; macro_rules! set_cfg_attr { ($to: expr, $from: expr) => { @@ -459,24 +457,6 @@ fn dispatch_global_env_vars(config: &mut Config) { } } -pub fn init_rate_limits( - config: &Config, -) -> Result>, Error> { - let mut corresp = HashMap::new(); - for endpoint in config.endpoint.iter() { - if endpoint.is_used(config) { - let mut limits = Vec::new(); - for l in endpoint.rate_limits.iter() { - limits.push(config.get_rate_limit(l)?); - } - let mut rl = rate_limits::RateLimit::new(&limits)?; - corresp.insert(endpoint.name.to_owned(), rl.get_sender()); - thread::spawn(move || rl.run()); - } - } - Ok(corresp) -} - pub fn from_file(file_name: &str) -> Result { let path = PathBuf::from(file_name); let mut config = read_cnf(&path)?; diff --git a/acmed/src/main.rs b/acmed/src/main.rs index fd351e0..3ea922c 100644 --- a/acmed/src/main.rs +++ b/acmed/src/main.rs @@ -9,7 +9,6 @@ mod config; mod hooks; mod jws; mod main_event_loop; -mod rate_limits; mod storage; pub const APP_NAME: &str = "ACMEd"; diff --git a/acmed/src/main_event_loop.rs b/acmed/src/main_event_loop.rs index b3e1238..a9a312f 100644 --- a/acmed/src/main_event_loop.rs +++ b/acmed/src/main_event_loop.rs @@ -32,20 +32,10 @@ pub struct MainEventLoop { impl MainEventLoop { pub fn new(config_file: &str, root_certs: &[&str]) -> Result { let cnf = config::from_file(config_file)?; - let rate_limits_corresp = config::init_rate_limits(&cnf)?; let mut certs = Vec::new(); for (i, crt) in cnf.certificate.iter().enumerate() { let ep_name = crt.get_endpoint_name(&cnf)?; - let https_throttle = rate_limits_corresp - .get(&ep_name) - .ok_or_else(|| { - Error::from(format!( - "{}: rate limit not found for this endpoint", - ep_name - )) - })? - .to_owned(); let cert = Certificate { account: crt.get_account(&cnf)?, domains: crt.get_domains()?, @@ -53,7 +43,6 @@ impl MainEventLoop { kp_reuse: crt.get_kp_reuse(), remote_url: crt.get_remote_url(&cnf)?, tos_agreed: crt.get_tos_agreement(&cnf)?, - https_throttle, hooks: crt.get_hooks(&cnf)?, account_directory: cnf.get_account_dir(), crt_directory: crt.get_crt_dir(&cnf), @@ -86,16 +75,12 @@ impl MainEventLoop { } fn renew_certificates(&self) { - let mut handles = vec![]; for crt in self.certs.iter() { match crt.should_renew() { Ok(true) => { let root_certs = self.root_certs.clone(); let cert = (*crt).clone(); - let handler = thread::spawn(move || { - renew_certificate(&cert, &root_certs); - }); - handles.push(handler); + renew_certificate(&cert, &root_certs); } Ok(false) => {} Err(e) => { @@ -103,8 +88,5 @@ impl MainEventLoop { } }; } - for handler in handles { - let _ = handler.join(); - } } } diff --git a/acmed/src/rate_limits.rs b/acmed/src/rate_limits.rs deleted file mode 100644 index ecf028a..0000000 --- a/acmed/src/rate_limits.rs +++ /dev/null @@ -1,188 +0,0 @@ -use acme_common::error::Error; -use nom::bytes::complete::take_while_m_n; -use nom::character::complete::digit1; -use nom::combinator::map_res; -use nom::multi::fold_many1; -use nom::IResult; -use std::cmp; -use std::sync::mpsc; -use std::thread; -use std::time::{Duration, Instant}; - -pub enum Request { - HttpsRequest, -} - -pub struct RateLimit { - limits: Vec<(usize, Duration)>, - sender: mpsc::SyncSender, - receiver: mpsc::Receiver, - log: Vec, -} - -impl RateLimit { - pub fn new(limits: &[(usize, String)]) -> Result { - let mut max_size = 0; - let mut parsed_limits = Vec::new(); - for (nb, raw_duration) in limits.iter() { - if *nb > max_size { - max_size = *nb; - } - let parsed_duration = parse_duration(raw_duration)?; - parsed_limits.push((*nb, parsed_duration)); - } - parsed_limits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap()); - parsed_limits.reverse(); - let (sender, receiver) = mpsc::sync_channel::(0); - Ok(RateLimit { - limits: parsed_limits, - sender, - receiver, - log: Vec::with_capacity(max_size), - }) - } - - pub fn get_sender(&self) -> mpsc::SyncSender { - self.sender.clone() - } - - pub fn run(&mut self) -> Result<(), Error> { - let sleep_duration = self.get_sleep_duration(); - loop { - self.prune_log(); - if self.request_allowed() { - match self.receiver.recv()? { - Request::HttpsRequest => { - if !self.limits.is_empty() { - self.log.push(Instant::now()); - } - } - } - } else { - // TODO: find a better sleep duration - thread::sleep(sleep_duration); - } - } - } - - fn get_sleep_duration(&self) -> Duration { - let (nb_req, min_duration) = match self.limits.last() { - Some((n, d)) => (*n as u64, *d), - None => { - return Duration::from_millis(0); - } - }; - let nb_mili = match min_duration.as_secs() { - 0 | 1 => crate::MIN_RATE_LIMIT_SLEEP_MILISEC, - n => { - let a = n * 200 / nb_req; - let a = cmp::min(a, crate::MAX_RATE_LIMIT_SLEEP_MILISEC); - cmp::max(a, crate::MIN_RATE_LIMIT_SLEEP_MILISEC) - } - }; - Duration::from_millis(nb_mili) - } - - fn request_allowed(&self) -> bool { - for (max_allowed, duration) in self.limits.iter() { - let max_date = Instant::now() - *duration; - let nb_req = self.log.iter().filter(move |x| **x > max_date).count(); - if nb_req >= *max_allowed { - return false; - } - } - true - } - - fn prune_log(&mut self) { - if let Some((_, max_limit)) = self.limits.first() { - let prune_date = Instant::now() - *max_limit; - self.log.retain(move |&d| d > prune_date); - } - } -} - -fn is_duration_chr(c: char) -> bool { - c == 's' || c == 'm' || c == 'h' || c == 'd' || c == 'w' -} - -fn get_multiplicator(input: &str) -> IResult<&str, u64> { - let (input, nb) = take_while_m_n(1, 1, is_duration_chr)(input)?; - let mult = match nb.chars().nth(0) { - Some('s') => 1, - Some('m') => 60, - Some('h') => 3_600, - Some('d') => 86_400, - Some('w') => 604_800, - _ => 0, - }; - Ok((input, mult)) -} - -fn get_duration_part(input: &str) -> IResult<&str, Duration> { - let (input, nb) = map_res(digit1, |s: &str| s.parse::())(input)?; - let (input, mult) = get_multiplicator(input)?; - Ok((input, Duration::from_secs(nb * mult))) -} - -fn get_duration(input: &str) -> IResult<&str, Duration> { - fold_many1( - get_duration_part, - Duration::new(0, 0), - |mut acc: Duration, item| { - acc += item; - acc - }, - )(input) -} - -fn parse_duration(input: &str) -> Result { - match get_duration(input) { - Ok((r, d)) => match r.len() { - 0 => Ok(d), - _ => Err(format!("{}: invalid duration", input).into()), - }, - Err(_) => Err(format!("{}: invalid duration", input).into()), - } -} - -#[cfg(test)] -mod tests { - use super::{parse_duration, RateLimit}; - - #[test] - fn test_rate_limit_build() { - let l = vec![ - (5, String::from("5s")), - (12, String::from("2m")), - (8, String::from("5m")), - (1, String::from("1s")), - (2, String::from("1m")), - ]; - let rl = RateLimit::new(l.as_slice()).unwrap(); - let ref_t = (8_usize, parse_duration("5m").unwrap()); - assert_eq!(rl.limits.first(), Some(&ref_t)); - assert_eq!(rl.log.len(), 0); - assert_eq!(rl.log.capacity(), 12); - } - - #[test] - fn test_parse_duration() { - let lst = [ - ("42s", 42), - ("21m", 1_260), - ("3h", 10_800), - ("2d", 172_800), - ("1w", 604_800), - ("42m30s", 2_550), - ("30s42m", 2_550), - ("3h5m12s", 11_112), - ("40s2s", 42), - ]; - for (fmt, ref_sec) in lst.iter() { - let d = parse_duration(fmt); - assert!(d.is_ok()); - assert_eq!(d.unwrap().as_secs(), *ref_sec); - } - } -}