Browse Source

Remove threads

The threading architecture was not good at all. It was built around the HTTP part, which isn't flexible enough. Hence, the decision has been made to refactor both of those parts. This will takes 4 steps :
1. remove threads and rate limits (this commit)
2. refactor the HTTP part (the tricky part)
3. reintroduce rate-limits (quite easy)
4. introduce a new threading system (the easiest part)
pull/31/head
Rodolphe Breard 5 years ago
parent
commit
51fabb38ea
  1. 4
      acmed/src/acme_proto/http.rs
  2. 2
      acmed/src/certificate.rs
  3. 22
      acmed/src/config.rs
  4. 1
      acmed/src/main.rs
  5. 20
      acmed/src/main_event_loop.rs
  6. 188
      acmed/src/rate_limits.rs

4
acmed/src/acme_proto/http.rs

@ -1,6 +1,5 @@
use crate::acme_proto::structs::{AcmeError, ApiError, Directory, HttpApiError}; use crate::acme_proto::structs::{AcmeError, ApiError, Directory, HttpApiError};
use crate::certificate::Certificate; use crate::certificate::Certificate;
use crate::rate_limits;
use acme_common::error::Error; use acme_common::error::Error;
use http_req::request::{self, Method}; use http_req::request::{self, Method};
use http_req::response::Response; use http_req::response::Response;
@ -53,9 +52,6 @@ fn new_request<'a>(root_certs: &'a [String], uri: &'a Uri, method: Method) -> Re
fn send_request(cert: &Certificate, request: &Request) -> Result<(Response, String), Error> { fn send_request(cert: &Certificate, request: &Request) -> Result<(Response, String), Error> {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
cert.https_throttle
.send(rate_limits::Request::HttpsRequest)
.unwrap();
cert.debug(&format!("{}: {}", request.method, request.uri)); cert.debug(&format!("{}: {}", request.method, request.uri));
let res = request.r.send(&mut buffer)?; let res = request.r.send(&mut buffer)?;
let res_str = String::from_utf8(buffer)?; let res_str = String::from_utf8(buffer)?;

2
acmed/src/certificate.rs

@ -7,7 +7,6 @@ use acme_common::error::Error;
use log::{debug, info, trace, warn}; use log::{debug, info, trace, warn};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::fmt; use std::fmt;
use std::sync::mpsc::SyncSender;
use std::time::Duration; use std::time::Duration;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -50,7 +49,6 @@ pub struct Certificate {
pub kp_reuse: bool, pub kp_reuse: bool,
pub remote_url: String, pub remote_url: String,
pub tos_agreed: bool, pub tos_agreed: bool,
pub https_throttle: SyncSender<crate::rate_limits::Request>,
pub hooks: Vec<Hook>, pub hooks: Vec<Hook>,
pub account_directory: String, pub account_directory: String,
pub crt_directory: String, pub crt_directory: String,

22
acmed/src/config.rs

@ -1,6 +1,5 @@
use crate::certificate::Algorithm; use crate::certificate::Algorithm;
use crate::hooks; use crate::hooks;
use crate::rate_limits;
use acme_common::error::Error; use acme_common::error::Error;
use acme_common::to_idna; use acme_common::to_idna;
use log::info; use log::info;
@ -9,8 +8,7 @@ use std::collections::HashMap;
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::prelude::*; use std::io::prelude::*;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::{fmt, thread};
use std::fmt;
macro_rules! set_cfg_attr { macro_rules! set_cfg_attr {
($to: expr, $from: expr) => { ($to: expr, $from: expr) => {
@ -459,24 +457,6 @@ fn dispatch_global_env_vars(config: &mut Config) {
} }
} }
pub fn init_rate_limits(
config: &Config,
) -> Result<HashMap<String, mpsc::SyncSender<rate_limits::Request>>, Error> {
let mut corresp = HashMap::new();
for endpoint in config.endpoint.iter() {
if endpoint.is_used(config) {
let mut limits = Vec::new();
for l in endpoint.rate_limits.iter() {
limits.push(config.get_rate_limit(l)?);
}
let mut rl = rate_limits::RateLimit::new(&limits)?;
corresp.insert(endpoint.name.to_owned(), rl.get_sender());
thread::spawn(move || rl.run());
}
}
Ok(corresp)
}
pub fn from_file(file_name: &str) -> Result<Config, Error> { pub fn from_file(file_name: &str) -> Result<Config, Error> {
let path = PathBuf::from(file_name); let path = PathBuf::from(file_name);
let mut config = read_cnf(&path)?; let mut config = read_cnf(&path)?;

1
acmed/src/main.rs

@ -9,7 +9,6 @@ mod config;
mod hooks; mod hooks;
mod jws; mod jws;
mod main_event_loop; mod main_event_loop;
mod rate_limits;
mod storage; mod storage;
pub const APP_NAME: &str = "ACMEd"; pub const APP_NAME: &str = "ACMEd";

20
acmed/src/main_event_loop.rs

@ -32,20 +32,10 @@ pub struct MainEventLoop {
impl MainEventLoop { impl MainEventLoop {
pub fn new(config_file: &str, root_certs: &[&str]) -> Result<Self, Error> { pub fn new(config_file: &str, root_certs: &[&str]) -> Result<Self, Error> {
let cnf = config::from_file(config_file)?; let cnf = config::from_file(config_file)?;
let rate_limits_corresp = config::init_rate_limits(&cnf)?;
let mut certs = Vec::new(); let mut certs = Vec::new();
for (i, crt) in cnf.certificate.iter().enumerate() { for (i, crt) in cnf.certificate.iter().enumerate() {
let ep_name = crt.get_endpoint_name(&cnf)?; let ep_name = crt.get_endpoint_name(&cnf)?;
let https_throttle = rate_limits_corresp
.get(&ep_name)
.ok_or_else(|| {
Error::from(format!(
"{}: rate limit not found for this endpoint",
ep_name
))
})?
.to_owned();
let cert = Certificate { let cert = Certificate {
account: crt.get_account(&cnf)?, account: crt.get_account(&cnf)?,
domains: crt.get_domains()?, domains: crt.get_domains()?,
@ -53,7 +43,6 @@ impl MainEventLoop {
kp_reuse: crt.get_kp_reuse(), kp_reuse: crt.get_kp_reuse(),
remote_url: crt.get_remote_url(&cnf)?, remote_url: crt.get_remote_url(&cnf)?,
tos_agreed: crt.get_tos_agreement(&cnf)?, tos_agreed: crt.get_tos_agreement(&cnf)?,
https_throttle,
hooks: crt.get_hooks(&cnf)?, hooks: crt.get_hooks(&cnf)?,
account_directory: cnf.get_account_dir(), account_directory: cnf.get_account_dir(),
crt_directory: crt.get_crt_dir(&cnf), crt_directory: crt.get_crt_dir(&cnf),
@ -86,16 +75,12 @@ impl MainEventLoop {
} }
fn renew_certificates(&self) { fn renew_certificates(&self) {
let mut handles = vec![];
for crt in self.certs.iter() { for crt in self.certs.iter() {
match crt.should_renew() { match crt.should_renew() {
Ok(true) => { Ok(true) => {
let root_certs = self.root_certs.clone(); let root_certs = self.root_certs.clone();
let cert = (*crt).clone(); let cert = (*crt).clone();
let handler = thread::spawn(move || {
renew_certificate(&cert, &root_certs);
});
handles.push(handler);
renew_certificate(&cert, &root_certs);
} }
Ok(false) => {} Ok(false) => {}
Err(e) => { Err(e) => {
@ -103,8 +88,5 @@ impl MainEventLoop {
} }
}; };
} }
for handler in handles {
let _ = handler.join();
}
} }
} }

188
acmed/src/rate_limits.rs

@ -1,188 +0,0 @@
use acme_common::error::Error;
use nom::bytes::complete::take_while_m_n;
use nom::character::complete::digit1;
use nom::combinator::map_res;
use nom::multi::fold_many1;
use nom::IResult;
use std::cmp;
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
pub enum Request {
HttpsRequest,
}
pub struct RateLimit {
limits: Vec<(usize, Duration)>,
sender: mpsc::SyncSender<Request>,
receiver: mpsc::Receiver<Request>,
log: Vec<Instant>,
}
impl RateLimit {
pub fn new(limits: &[(usize, String)]) -> Result<Self, Error> {
let mut max_size = 0;
let mut parsed_limits = Vec::new();
for (nb, raw_duration) in limits.iter() {
if *nb > max_size {
max_size = *nb;
}
let parsed_duration = parse_duration(raw_duration)?;
parsed_limits.push((*nb, parsed_duration));
}
parsed_limits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
parsed_limits.reverse();
let (sender, receiver) = mpsc::sync_channel::<Request>(0);
Ok(RateLimit {
limits: parsed_limits,
sender,
receiver,
log: Vec::with_capacity(max_size),
})
}
pub fn get_sender(&self) -> mpsc::SyncSender<Request> {
self.sender.clone()
}
pub fn run(&mut self) -> Result<(), Error> {
let sleep_duration = self.get_sleep_duration();
loop {
self.prune_log();
if self.request_allowed() {
match self.receiver.recv()? {
Request::HttpsRequest => {
if !self.limits.is_empty() {
self.log.push(Instant::now());
}
}
}
} else {
// TODO: find a better sleep duration
thread::sleep(sleep_duration);
}
}
}
fn get_sleep_duration(&self) -> Duration {
let (nb_req, min_duration) = match self.limits.last() {
Some((n, d)) => (*n as u64, *d),
None => {
return Duration::from_millis(0);
}
};
let nb_mili = match min_duration.as_secs() {
0 | 1 => crate::MIN_RATE_LIMIT_SLEEP_MILISEC,
n => {
let a = n * 200 / nb_req;
let a = cmp::min(a, crate::MAX_RATE_LIMIT_SLEEP_MILISEC);
cmp::max(a, crate::MIN_RATE_LIMIT_SLEEP_MILISEC)
}
};
Duration::from_millis(nb_mili)
}
fn request_allowed(&self) -> bool {
for (max_allowed, duration) in self.limits.iter() {
let max_date = Instant::now() - *duration;
let nb_req = self.log.iter().filter(move |x| **x > max_date).count();
if nb_req >= *max_allowed {
return false;
}
}
true
}
fn prune_log(&mut self) {
if let Some((_, max_limit)) = self.limits.first() {
let prune_date = Instant::now() - *max_limit;
self.log.retain(move |&d| d > prune_date);
}
}
}
fn is_duration_chr(c: char) -> bool {
c == 's' || c == 'm' || c == 'h' || c == 'd' || c == 'w'
}
fn get_multiplicator(input: &str) -> IResult<&str, u64> {
let (input, nb) = take_while_m_n(1, 1, is_duration_chr)(input)?;
let mult = match nb.chars().nth(0) {
Some('s') => 1,
Some('m') => 60,
Some('h') => 3_600,
Some('d') => 86_400,
Some('w') => 604_800,
_ => 0,
};
Ok((input, mult))
}
fn get_duration_part(input: &str) -> IResult<&str, Duration> {
let (input, nb) = map_res(digit1, |s: &str| s.parse::<u64>())(input)?;
let (input, mult) = get_multiplicator(input)?;
Ok((input, Duration::from_secs(nb * mult)))
}
fn get_duration(input: &str) -> IResult<&str, Duration> {
fold_many1(
get_duration_part,
Duration::new(0, 0),
|mut acc: Duration, item| {
acc += item;
acc
},
)(input)
}
fn parse_duration(input: &str) -> Result<Duration, Error> {
match get_duration(input) {
Ok((r, d)) => match r.len() {
0 => Ok(d),
_ => Err(format!("{}: invalid duration", input).into()),
},
Err(_) => Err(format!("{}: invalid duration", input).into()),
}
}
#[cfg(test)]
mod tests {
use super::{parse_duration, RateLimit};
#[test]
fn test_rate_limit_build() {
let l = vec![
(5, String::from("5s")),
(12, String::from("2m")),
(8, String::from("5m")),
(1, String::from("1s")),
(2, String::from("1m")),
];
let rl = RateLimit::new(l.as_slice()).unwrap();
let ref_t = (8_usize, parse_duration("5m").unwrap());
assert_eq!(rl.limits.first(), Some(&ref_t));
assert_eq!(rl.log.len(), 0);
assert_eq!(rl.log.capacity(), 12);
}
#[test]
fn test_parse_duration() {
let lst = [
("42s", 42),
("21m", 1_260),
("3h", 10_800),
("2d", 172_800),
("1w", 604_800),
("42m30s", 2_550),
("30s42m", 2_550),
("3h5m12s", 11_112),
("40s2s", 42),
];
for (fmt, ref_sec) in lst.iter() {
let d = parse_duration(fmt);
assert!(d.is_ok());
assert_eq!(d.unwrap().as_secs(), *ref_sec);
}
}
}
Loading…
Cancel
Save