Browse Source

Rewrite the main event loop using async

Manual threads have some huge drawbacks and are therefore not well
suited for this task. Using async with a multi-threaded runtime,
however, does not have those drawbacks and keep the advantage of a
multi-threaded environment.

This is only the first part of the switch to async, the next step being
to use it in file operation, HTTP requests and sleeps.
pull/88/head
Rodolphe Bréard 2 years ago
parent
commit
4fc1f0cb8b
  1. 26
      README.md
  2. 3
      acmed/Cargo.toml
  3. 65
      acmed/src/certificate_manager.rs
  4. 19
      acmed/src/main.rs
  5. 111
      acmed/src/main_event_loop.rs

26
README.md

@ -197,29 +197,15 @@ The `contrib/systemd` contains examples of a service file as well as a `sysusers
### Does ACMEd uses any threading or parallelization? ### Does ACMEd uses any threading or parallelization?
ACMEd uses a dedicated thread for each endpoint. Certificates of different endpoint can therefore be renewed in parallel while inside each endpoints certificates are renewed sequentially.
Yes, ACMEd is [asynchronous](https://en.wikipedia.org/wiki/Asynchrony_(computer_programming)) and uses a multi-thread runtime. In order to check the number of threads, you may run the following command:
### Can I use the same account on different endpoints?
Short answer: Yes, that is possible. However, if you do so, you should be aware that this might eventually hurt the parallelization.
Long answer: Accounts requires to acquire a global lock, therefore an endpoint thread wanting to renew a certificate has to wait that the associated account lock has been released from any other endpoint thread. Since each endpoints renew certificates in a sequential order, they will block on the first certificates which associated account is already in use.
Example:
- 2 accounts: A1 and A2
- 2 endpoints: E1 and E2
- 3 certificates, all requiring to be renewed:
* C1 on E1 with A1
* C2 on E1 with A2
* C3 on E2 with A1
Let's suppose that E1 will renew C1 and C2 in that order. We just launched ACMEd and threads for E1 and E2 starts almost simultaneously. Two cases are possible:
```
ps -T -p "$(pgrep acmed)"
```
1. E1 acquires the lock for A1 first. E2 is therefore blocked. C1 will be renewed first and only after that C2 and C3 will be renewed in parallel.
2. E2 acquires the lock for A1 first. E1 is therefore blocked. All certificates will be renewed in this sequential order, without any benefit from parallelism: C3, C1 and C2.
### Can I use the same account on different endpoints?
There is no way to control neither the sequential certificate renew order inside each endpoint nor the order in which the account locks are acquired.
Yes, that is possible. However, you should aware that some certificate authorities (e.g. Let's Encrypt) have policies restricting the use multiple accounts. Please check your CA's documentation.
### Why is RSA 2048 the default certificate key type? ### Why is RSA 2048 the default certificate key type?

3
acmed/Cargo.toml

@ -21,9 +21,11 @@ openssl_vendored = ["crypto_openssl", "acme_common/openssl_vendored", "attohttpc
[dependencies] [dependencies]
acme_common = { path = "../acme_common" } acme_common = { path = "../acme_common" }
async-lock = "2.6"
attohttpc = { version = "0.24", default-features = false, features = ["charsets", "json"] } attohttpc = { version = "0.24", default-features = false, features = ["charsets", "json"] }
bincode = "1.3" bincode = "1.3"
clap = { version = "4.0", features = ["string"] } clap = { version = "4.0", features = ["string"] }
futures = "0.3"
glob = "0.3" glob = "0.3"
log = "0.4" log = "0.4"
nom = { version = "7.0", default-features = false, features = [] } nom = { version = "7.0", default-features = false, features = [] }
@ -31,6 +33,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
tinytemplate = "1.2" tinytemplate = "1.2"
toml = "0.7" toml = "0.7"
tokio = { version = "1", features = ["full"] }
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]
nix = "0.26" nix = "0.26"

65
acmed/src/certificate_manager.rs

@ -0,0 +1,65 @@
use crate::acme_proto::request_certificate;
use crate::certificate::Certificate;
use crate::logs::HasLogger;
use crate::{AccountSync, EndpointSync};
use std::time::Duration;
use tokio::time::sleep;
#[derive(Clone, Debug)]
pub struct CertificateManager {
cert: Certificate,
}
impl CertificateManager {
pub fn new(cert: Certificate) -> Self {
Self { cert }
}
pub fn get_id(&self) -> String {
self.cert.get_id()
}
pub fn get_account_name(&self) -> String {
self.cert.account_name.clone()
}
pub fn get_endpoint_name(&self) -> String {
self.cert.endpoint_name.clone()
}
pub async fn renew(
&mut self,
account_s: AccountSync,
endpoint_s: EndpointSync,
) -> (&mut Self, AccountSync, EndpointSync) {
loop {
match self.cert.should_renew() {
Ok(true) => break,
Ok(false) => {}
Err(e) => {
self.cert.warn(&e.message);
}
}
sleep(Duration::from_secs(crate::DEFAULT_SLEEP_TIME)).await;
}
let mut account = account_s.write().await;
let mut endpoint = endpoint_s.write().await;
let (status, is_success) =
match request_certificate(&self.cert, &mut endpoint, &mut account) {
Ok(_) => ("success".to_string(), true),
Err(e) => {
let e = e.prefix("unable to renew the certificate");
self.cert.warn(&e.message);
(e.message, false)
}
};
match self.cert.call_post_operation_hooks(&status, is_success) {
Ok(_) => {}
Err(e) => {
let e = e.prefix("post-operation hook error");
self.cert.warn(&e.message);
}
};
(self, account_s.clone(), endpoint_s.clone())
}
}

19
acmed/src/main.rs

@ -4,12 +4,16 @@ use acme_common::crypto::{
}; };
use acme_common::logs::{set_log_system, DEFAULT_LOG_LEVEL}; use acme_common::logs::{set_log_system, DEFAULT_LOG_LEVEL};
use acme_common::{clean_pid_file, init_server}; use acme_common::{clean_pid_file, init_server};
use async_lock::RwLock;
use clap::{Arg, ArgAction, Command}; use clap::{Arg, ArgAction, Command};
use log::error; use log::error;
use std::sync::Arc;
use tokio::runtime::Builder;
mod account; mod account;
mod acme_proto; mod acme_proto;
mod certificate; mod certificate;
mod certificate_manager;
mod config; mod config;
mod duration; mod duration;
mod endpoint; mod endpoint;
@ -23,6 +27,7 @@ mod storage;
mod template; mod template;
pub const APP_NAME: &str = "ACMEd"; pub const APP_NAME: &str = "ACMEd";
pub const APP_THREAD_NAME: &str = "acmed-runtime";
pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
pub const DEFAULT_ACCOUNTS_DIR: &str = env!("ACMED_DEFAULT_ACCOUNTS_DIR"); pub const DEFAULT_ACCOUNTS_DIR: &str = env!("ACMED_DEFAULT_ACCOUNTS_DIR");
pub const DEFAULT_CERT_DIR: &str = env!("ACMED_DEFAULT_CERT_DIR"); pub const DEFAULT_CERT_DIR: &str = env!("ACMED_DEFAULT_CERT_DIR");
@ -48,7 +53,19 @@ pub const DEFAULT_HOOK_ALLOW_FAILURE: bool = false;
pub const MAX_RATE_LIMIT_SLEEP_MILISEC: u64 = 3_600_000; pub const MAX_RATE_LIMIT_SLEEP_MILISEC: u64 = 3_600_000;
pub const MIN_RATE_LIMIT_SLEEP_MILISEC: u64 = 100; pub const MIN_RATE_LIMIT_SLEEP_MILISEC: u64 = 100;
type AccountSync = Arc<RwLock<account::Account>>;
type EndpointSync = Arc<RwLock<endpoint::Endpoint>>;
fn main() { fn main() {
Builder::new_multi_thread()
.enable_all()
.thread_name(APP_THREAD_NAME)
.build()
.unwrap()
.block_on(inner_main());
}
async fn inner_main() {
let full_version = format!( let full_version = format!(
"{APP_VERSION} built for {}\n\nCryptographic library:\n - {} {}\nHTTP client library:\n - {} {}", "{APP_VERSION} built for {}\n\nCryptographic library:\n - {} {}\nHTTP client library:\n - {} {}",
env!("ACMED_TARGET"), env!("ACMED_TARGET"),
@ -160,5 +177,5 @@ fn main() {
std::process::exit(1); std::process::exit(1);
} }
}; };
srv.run();
srv.run().await;
} }

111
acmed/src/main_event_loop.rs

@ -1,40 +1,20 @@
use crate::account::Account; use crate::account::Account;
use crate::acme_proto::request_certificate;
use crate::certificate::Certificate; use crate::certificate::Certificate;
use crate::certificate_manager::CertificateManager;
use crate::config; use crate::config;
use crate::endpoint::Endpoint; use crate::endpoint::Endpoint;
use crate::hooks::HookType; use crate::hooks::HookType;
use crate::logs::HasLogger;
use crate::storage::FileManager; use crate::storage::FileManager;
use crate::{AccountSync, EndpointSync};
use acme_common::error::Error; use acme_common::error::Error;
use async_lock::RwLock;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
type AccountSync = Arc<RwLock<Account>>;
type EndpointSync = Arc<RwLock<Endpoint>>;
fn renew_certificate(crt: &Certificate, endpoint: &mut Endpoint, account: &mut Account) {
let (status, is_success) = match request_certificate(crt, endpoint, account) {
Ok(_) => ("success".to_string(), true),
Err(e) => {
let e = e.prefix("unable to renew the certificate");
crt.warn(&e.message);
(e.message, false)
}
};
match crt.call_post_operation_hooks(&status, is_success) {
Ok(_) => {}
Err(e) => {
let e = e.prefix("post-operation hook error");
crt.warn(&e.message);
}
};
}
use std::sync::Arc;
pub struct MainEventLoop { pub struct MainEventLoop {
certs: Vec<Certificate>,
cert_managers: HashMap<String, CertificateManager>,
accounts: HashMap<String, AccountSync>, accounts: HashMap<String, AccountSync>,
endpoints: HashMap<String, EndpointSync>, endpoints: HashMap<String, EndpointSync>,
} }
@ -62,8 +42,8 @@ impl MainEventLoop {
.into_iter() .into_iter()
.collect(); .collect();
let mut accounts = HashMap::new();
for acc in cnf.account.iter() {
let mut accounts: HashMap<String, Account> = HashMap::new();
for acc in &cnf.account {
let fm = FileManager { let fm = FileManager {
account_directory: cnf.get_account_dir(), account_directory: cnf.get_account_dir(),
account_name: acc.name.clone(), account_name: acc.name.clone(),
@ -86,11 +66,14 @@ impl MainEventLoop {
env: acc.env.clone(), env: acc.env.clone(),
}; };
let account = acc.to_generic(&fm)?; let account = acc.to_generic(&fm)?;
accounts.insert(acc.name.clone(), account);
let name = acc.name.clone();
accounts.insert(name, account);
} }
let mut certs: Vec<Certificate> = Vec::new();
let mut endpoints = HashMap::new();
// TODO: Virer la création de l'endpoint de ce bloc
// TODO: Construire une liste de CertificateManager et non de Certificate
let mut endpoints: HashMap<String, Endpoint> = HashMap::new();
let mut cert_managers: HashMap<String, CertificateManager> = HashMap::new();
for crt in cnf.certificate.iter() { for crt in cnf.certificate.iter() {
let endpoint = crt.get_endpoint(&cnf, root_certs)?; let endpoint = crt.get_endpoint(&cnf, root_certs)?;
let endpoint_name = endpoint.name.clone(); let endpoint_name = endpoint.name.clone();
@ -136,7 +119,7 @@ impl MainEventLoop {
file_manager: fm, file_manager: fm,
}; };
let crt_id = cert.get_id(); let crt_id = cert.get_id();
if certs.iter().any(|c| c.get_id() == crt_id) {
if cert_managers.contains_key(&crt_id) {
let msg = format!("{crt_id}: duplicate certificate id"); let msg = format!("{crt_id}: duplicate certificate id");
return Err(msg.into()); return Err(msg.into());
} }
@ -147,12 +130,14 @@ impl MainEventLoop {
return Err(msg.into()); return Err(msg.into());
} }
}; };
endpoints.entry(endpoint_name).or_insert(endpoint);
certs.push(cert);
if !endpoints.contains_key(&endpoint.name) {
endpoints.insert(endpoint.name.clone(), endpoint);
}
cert_managers.insert(crt_id, CertificateManager::new(cert));
} }
Ok(MainEventLoop { Ok(MainEventLoop {
certs,
cert_managers,
accounts: accounts accounts: accounts
.iter() .iter()
.map(|(k, v)| (k.to_owned(), Arc::new(RwLock::new(v.to_owned())))) .map(|(k, v)| (k.to_owned(), Arc::new(RwLock::new(v.to_owned()))))
@ -164,46 +149,26 @@ impl MainEventLoop {
}) })
} }
pub fn run(&mut self) {
loop {
self.renew_certificates();
thread::sleep(Duration::from_secs(crate::DEFAULT_SLEEP_TIME));
}
}
fn renew_certificates(&mut self) {
let mut handles = vec![];
for (ep_name, endpoint_lock) in self.endpoints.iter_mut() {
let mut certs_to_renew = vec![];
for crt in self.certs.iter() {
if crt.endpoint_name == *ep_name {
match crt.should_renew() {
Ok(true) => {
let crt_arc = Arc::new(crt.clone());
certs_to_renew.push(crt_arc);
}
Ok(false) => {}
Err(e) => {
crt.warn(&e.message);
}
}
pub async fn run(&mut self) {
let mut renewals = FuturesUnordered::new();
for (_, crt) in self.cert_managers.iter_mut() {
log::trace!("Adding certificate: {}", crt.get_id());
if let Some(acc) = self.accounts.get(&crt.get_account_name()) {
if let Some(ept) = self.endpoints.get(&crt.get_endpoint_name()) {
renewals.push(crt.renew(acc.clone(), ept.clone()));
} else {
} }
} else {
} }
let mut accounts_lock = self.accounts.clone();
let ep_lock = endpoint_lock.clone();
let handle = thread::spawn(move || {
let mut endpoint = ep_lock.write().unwrap();
for crt in certs_to_renew {
if let Some(acc_lock) = accounts_lock.get_mut(&crt.account_name) {
let mut account = acc_lock.write().unwrap();
renew_certificate(&crt, &mut endpoint, &mut account);
};
}
});
handles.push(handle);
} }
for handle in handles {
let _ = handle.join();
loop {
if renewals.is_empty() {
log::error!("No certificate found.");
return;
}
if let Some((crt, acc, ept)) = renewals.next().await {
renewals.push(crt.renew(acc, ept));
}
} }
} }
} }
Loading…
Cancel
Save