From 4fc1f0cb8ba2a1bce747764e4af6222a2b83c0cc Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Rodolphe=20Br=C3=A9ard?= <rodolphe@what.tf>
Date: Tue, 14 Feb 2023 18:32:30 +0100
Subject: [PATCH] Rewrite the main event loop using async

Manual threads have some huge drawbacks and are therefore not well
suited for this task. Using async with a multi-threaded runtime,
however, does not have those drawbacks and keep the advantage of a
multi-threaded environment.

This is only the first part of the switch to async, the next step being
to use it in file operation, HTTP requests and sleeps.
---
 README.md                        |  26 ++------
 acmed/Cargo.toml                 |   3 +
 acmed/src/certificate_manager.rs |  65 ++++++++++++++++++
 acmed/src/main.rs                |  19 +++++-
 acmed/src/main_event_loop.rs     | 111 +++++++++++--------------------
 5 files changed, 130 insertions(+), 94 deletions(-)
 create mode 100644 acmed/src/certificate_manager.rs

diff --git a/README.md b/README.md
index 957e896..7158109 100644
--- a/README.md
+++ b/README.md
@@ -197,29 +197,15 @@ The `contrib/systemd` contains examples of a service file as well as a `sysusers
 
 ### Does ACMEd uses any threading or parallelization?
 
-ACMEd uses a dedicated thread for each endpoint. Certificates of different endpoint can therefore be renewed in parallel while inside each endpoints certificates are renewed sequentially.
+Yes, ACMEd is [asynchronous](https://en.wikipedia.org/wiki/Asynchrony_(computer_programming)) and uses a multi-thread runtime. In order to check the number of threads, you may run the following command:
 
-### Can I use the same account on different endpoints?
-
-Short answer: Yes, that is possible. However, if you do so, you should be aware that this might eventually hurt the parallelization.
-
-Long answer: Accounts requires to acquire a global lock, therefore an endpoint thread wanting to renew a certificate has to wait that the associated account lock has been released from any other endpoint thread. Since each endpoints renew certificates in a sequential order, they will block on the first certificates which associated account is already in use.
-
-Example:
-
-- 2 accounts: A1 and A2
-- 2 endpoints: E1 and E2
-- 3 certificates, all requiring to be renewed:
-  * C1 on E1 with A1
-  * C2 on E1 with A2
-  * C3 on E2 with A1
-
-Let's suppose that E1 will renew C1 and C2 in that order. We just launched ACMEd and threads for E1 and E2 starts almost simultaneously. Two cases are possible:
+```
+ps -T -p "$(pgrep acmed)"
+```
 
-1. E1 acquires the lock for A1 first. E2 is therefore blocked. C1 will be renewed first and only after that C2 and C3 will be renewed in parallel.
-2. E2 acquires the lock for A1 first. E1 is therefore blocked. All certificates will be renewed in this sequential order, without any benefit from parallelism: C3, C1 and C2.
+### Can I use the same account on different endpoints?
 
-There is no way to control neither the sequential certificate renew order inside each endpoint nor the order in which the account locks are acquired.
+Yes, that is possible. However, you should aware that some certificate authorities (e.g. Let's Encrypt) have policies restricting the use multiple accounts. Please check your CA's documentation.
 
 ### Why is RSA 2048 the default certificate key type?
 
diff --git a/acmed/Cargo.toml b/acmed/Cargo.toml
index 77975a4..b6e72c8 100644
--- a/acmed/Cargo.toml
+++ b/acmed/Cargo.toml
@@ -21,9 +21,11 @@ openssl_vendored = ["crypto_openssl", "acme_common/openssl_vendored", "attohttpc
 
 [dependencies]
 acme_common = { path = "../acme_common" }
+async-lock = "2.6"
 attohttpc = { version = "0.24", default-features = false, features = ["charsets", "json"] }
 bincode = "1.3"
 clap = { version = "4.0", features = ["string"] }
+futures = "0.3"
 glob = "0.3"
 log = "0.4"
 nom = { version = "7.0", default-features = false, features = [] }
@@ -31,6 +33,7 @@ serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
 tinytemplate = "1.2"
 toml = "0.7"
+tokio = { version = "1", features = ["full"] }
 
 [target.'cfg(unix)'.dependencies]
 nix = "0.26"
diff --git a/acmed/src/certificate_manager.rs b/acmed/src/certificate_manager.rs
new file mode 100644
index 0000000..c095c75
--- /dev/null
+++ b/acmed/src/certificate_manager.rs
@@ -0,0 +1,65 @@
+use crate::acme_proto::request_certificate;
+use crate::certificate::Certificate;
+use crate::logs::HasLogger;
+use crate::{AccountSync, EndpointSync};
+use std::time::Duration;
+use tokio::time::sleep;
+
+#[derive(Clone, Debug)]
+pub struct CertificateManager {
+	cert: Certificate,
+}
+
+impl CertificateManager {
+	pub fn new(cert: Certificate) -> Self {
+		Self { cert }
+	}
+
+	pub fn get_id(&self) -> String {
+		self.cert.get_id()
+	}
+
+	pub fn get_account_name(&self) -> String {
+		self.cert.account_name.clone()
+	}
+
+	pub fn get_endpoint_name(&self) -> String {
+		self.cert.endpoint_name.clone()
+	}
+
+	pub async fn renew(
+		&mut self,
+		account_s: AccountSync,
+		endpoint_s: EndpointSync,
+	) -> (&mut Self, AccountSync, EndpointSync) {
+		loop {
+			match self.cert.should_renew() {
+				Ok(true) => break,
+				Ok(false) => {}
+				Err(e) => {
+					self.cert.warn(&e.message);
+				}
+			}
+			sleep(Duration::from_secs(crate::DEFAULT_SLEEP_TIME)).await;
+		}
+		let mut account = account_s.write().await;
+		let mut endpoint = endpoint_s.write().await;
+		let (status, is_success) =
+			match request_certificate(&self.cert, &mut endpoint, &mut account) {
+				Ok(_) => ("success".to_string(), true),
+				Err(e) => {
+					let e = e.prefix("unable to renew the certificate");
+					self.cert.warn(&e.message);
+					(e.message, false)
+				}
+			};
+		match self.cert.call_post_operation_hooks(&status, is_success) {
+			Ok(_) => {}
+			Err(e) => {
+				let e = e.prefix("post-operation hook error");
+				self.cert.warn(&e.message);
+			}
+		};
+		(self, account_s.clone(), endpoint_s.clone())
+	}
+}
diff --git a/acmed/src/main.rs b/acmed/src/main.rs
index 7776ab6..d8dbd37 100644
--- a/acmed/src/main.rs
+++ b/acmed/src/main.rs
@@ -4,12 +4,16 @@ use acme_common::crypto::{
 };
 use acme_common::logs::{set_log_system, DEFAULT_LOG_LEVEL};
 use acme_common::{clean_pid_file, init_server};
+use async_lock::RwLock;
 use clap::{Arg, ArgAction, Command};
 use log::error;
+use std::sync::Arc;
+use tokio::runtime::Builder;
 
 mod account;
 mod acme_proto;
 mod certificate;
+mod certificate_manager;
 mod config;
 mod duration;
 mod endpoint;
@@ -23,6 +27,7 @@ mod storage;
 mod template;
 
 pub const APP_NAME: &str = "ACMEd";
+pub const APP_THREAD_NAME: &str = "acmed-runtime";
 pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
 pub const DEFAULT_ACCOUNTS_DIR: &str = env!("ACMED_DEFAULT_ACCOUNTS_DIR");
 pub const DEFAULT_CERT_DIR: &str = env!("ACMED_DEFAULT_CERT_DIR");
@@ -48,7 +53,19 @@ pub const DEFAULT_HOOK_ALLOW_FAILURE: bool = false;
 pub const MAX_RATE_LIMIT_SLEEP_MILISEC: u64 = 3_600_000;
 pub const MIN_RATE_LIMIT_SLEEP_MILISEC: u64 = 100;
 
+type AccountSync = Arc<RwLock<account::Account>>;
+type EndpointSync = Arc<RwLock<endpoint::Endpoint>>;
+
 fn main() {
+	Builder::new_multi_thread()
+		.enable_all()
+		.thread_name(APP_THREAD_NAME)
+		.build()
+		.unwrap()
+		.block_on(inner_main());
+}
+
+async fn inner_main() {
 	let full_version = format!(
 		"{APP_VERSION} built for {}\n\nCryptographic library:\n - {} {}\nHTTP client library:\n - {} {}",
 		env!("ACMED_TARGET"),
@@ -160,5 +177,5 @@ fn main() {
 			std::process::exit(1);
 		}
 	};
-	srv.run();
+	srv.run().await;
 }
diff --git a/acmed/src/main_event_loop.rs b/acmed/src/main_event_loop.rs
index b554c91..bea398e 100644
--- a/acmed/src/main_event_loop.rs
+++ b/acmed/src/main_event_loop.rs
@@ -1,40 +1,20 @@
 use crate::account::Account;
-use crate::acme_proto::request_certificate;
 use crate::certificate::Certificate;
+use crate::certificate_manager::CertificateManager;
 use crate::config;
 use crate::endpoint::Endpoint;
 use crate::hooks::HookType;
-use crate::logs::HasLogger;
 use crate::storage::FileManager;
+use crate::{AccountSync, EndpointSync};
 use acme_common::error::Error;
+use async_lock::RwLock;
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
 use std::collections::HashMap;
-use std::sync::{Arc, RwLock};
-use std::thread;
-use std::time::Duration;
-
-type AccountSync = Arc<RwLock<Account>>;
-type EndpointSync = Arc<RwLock<Endpoint>>;
-
-fn renew_certificate(crt: &Certificate, endpoint: &mut Endpoint, account: &mut Account) {
-	let (status, is_success) = match request_certificate(crt, endpoint, account) {
-		Ok(_) => ("success".to_string(), true),
-		Err(e) => {
-			let e = e.prefix("unable to renew the certificate");
-			crt.warn(&e.message);
-			(e.message, false)
-		}
-	};
-	match crt.call_post_operation_hooks(&status, is_success) {
-		Ok(_) => {}
-		Err(e) => {
-			let e = e.prefix("post-operation hook error");
-			crt.warn(&e.message);
-		}
-	};
-}
+use std::sync::Arc;
 
 pub struct MainEventLoop {
-	certs: Vec<Certificate>,
+	cert_managers: HashMap<String, CertificateManager>,
 	accounts: HashMap<String, AccountSync>,
 	endpoints: HashMap<String, EndpointSync>,
 }
@@ -62,8 +42,8 @@ impl MainEventLoop {
 		.into_iter()
 		.collect();
 
-		let mut accounts = HashMap::new();
-		for acc in cnf.account.iter() {
+		let mut accounts: HashMap<String, Account> = HashMap::new();
+		for acc in &cnf.account {
 			let fm = FileManager {
 				account_directory: cnf.get_account_dir(),
 				account_name: acc.name.clone(),
@@ -86,11 +66,14 @@ impl MainEventLoop {
 				env: acc.env.clone(),
 			};
 			let account = acc.to_generic(&fm)?;
-			accounts.insert(acc.name.clone(), account);
+			let name = acc.name.clone();
+			accounts.insert(name, account);
 		}
 
-		let mut certs: Vec<Certificate> = Vec::new();
-		let mut endpoints = HashMap::new();
+		// TODO: Virer la création de l'endpoint de ce bloc
+		// TODO: Construire une liste de CertificateManager et non de Certificate
+		let mut endpoints: HashMap<String, Endpoint> = HashMap::new();
+		let mut cert_managers: HashMap<String, CertificateManager> = HashMap::new();
 		for crt in cnf.certificate.iter() {
 			let endpoint = crt.get_endpoint(&cnf, root_certs)?;
 			let endpoint_name = endpoint.name.clone();
@@ -136,7 +119,7 @@ impl MainEventLoop {
 				file_manager: fm,
 			};
 			let crt_id = cert.get_id();
-			if certs.iter().any(|c| c.get_id() == crt_id) {
+			if cert_managers.contains_key(&crt_id) {
 				let msg = format!("{crt_id}: duplicate certificate id");
 				return Err(msg.into());
 			}
@@ -147,12 +130,14 @@ impl MainEventLoop {
 					return Err(msg.into());
 				}
 			};
-			endpoints.entry(endpoint_name).or_insert(endpoint);
-			certs.push(cert);
+			if !endpoints.contains_key(&endpoint.name) {
+				endpoints.insert(endpoint.name.clone(), endpoint);
+			}
+			cert_managers.insert(crt_id, CertificateManager::new(cert));
 		}
 
 		Ok(MainEventLoop {
-			certs,
+			cert_managers,
 			accounts: accounts
 				.iter()
 				.map(|(k, v)| (k.to_owned(), Arc::new(RwLock::new(v.to_owned()))))
@@ -164,46 +149,26 @@ impl MainEventLoop {
 		})
 	}
 
-	pub fn run(&mut self) {
-		loop {
-			self.renew_certificates();
-			thread::sleep(Duration::from_secs(crate::DEFAULT_SLEEP_TIME));
-		}
-	}
-
-	fn renew_certificates(&mut self) {
-		let mut handles = vec![];
-		for (ep_name, endpoint_lock) in self.endpoints.iter_mut() {
-			let mut certs_to_renew = vec![];
-			for crt in self.certs.iter() {
-				if crt.endpoint_name == *ep_name {
-					match crt.should_renew() {
-						Ok(true) => {
-							let crt_arc = Arc::new(crt.clone());
-							certs_to_renew.push(crt_arc);
-						}
-						Ok(false) => {}
-						Err(e) => {
-							crt.warn(&e.message);
-						}
-					}
+	pub async fn run(&mut self) {
+		let mut renewals = FuturesUnordered::new();
+		for (_, crt) in self.cert_managers.iter_mut() {
+			log::trace!("Adding certificate: {}", crt.get_id());
+			if let Some(acc) = self.accounts.get(&crt.get_account_name()) {
+				if let Some(ept) = self.endpoints.get(&crt.get_endpoint_name()) {
+					renewals.push(crt.renew(acc.clone(), ept.clone()));
+				} else {
 				}
+			} else {
 			}
-			let mut accounts_lock = self.accounts.clone();
-			let ep_lock = endpoint_lock.clone();
-			let handle = thread::spawn(move || {
-				let mut endpoint = ep_lock.write().unwrap();
-				for crt in certs_to_renew {
-					if let Some(acc_lock) = accounts_lock.get_mut(&crt.account_name) {
-						let mut account = acc_lock.write().unwrap();
-						renew_certificate(&crt, &mut endpoint, &mut account);
-					};
-				}
-			});
-			handles.push(handle);
 		}
-		for handle in handles {
-			let _ = handle.join();
+		loop {
+			if renewals.is_empty() {
+				log::error!("No certificate found.");
+				return;
+			}
+			if let Some((crt, acc, ept)) = renewals.next().await {
+				renewals.push(crt.renew(acc, ept));
+			}
 		}
 	}
 }