Browse Source

Honor Go HTTP idle timeout

rust-volume-server
Chris Lu 5 days ago
parent
commit
7e496b2a86
  1. 11
      seaweed-volume/Cargo.lock
  2. 1
      seaweed-volume/Cargo.toml
  3. 103
      seaweed-volume/src/main.rs

11
seaweed-volume/Cargo.lock

@ -4120,6 +4120,16 @@ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "tokio-io-timeout"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bd86198d9ee903fedd2f9a2e72014287c0d9167e4ae43b5853007205dda1b76"
dependencies = [
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "2.6.1" version = "2.6.1"
@ -4752,6 +4762,7 @@ dependencies = [
"tempfile", "tempfile",
"thiserror 1.0.69", "thiserror 1.0.69",
"tokio", "tokio",
"tokio-io-timeout",
"tokio-rustls 0.26.4", "tokio-rustls 0.26.4",
"tokio-stream", "tokio-stream",
"toml", "toml",

1
seaweed-volume/Cargo.toml

@ -22,6 +22,7 @@ default = []
# Async runtime # Async runtime
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1" tokio-stream = "0.1"
tokio-io-timeout = "1"
# gRPC + protobuf # gRPC + protobuf
tonic = { version = "0.12", features = ["tls"] } tonic = { version = "0.12", features = ["tls"] }

103
seaweed-volume/src/main.rs

@ -230,6 +230,21 @@ fn build_volume_grpc_service(
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE) .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
} }
fn apply_idle_timeout<S>(
stream: S,
idle_timeout: std::time::Duration,
) -> std::pin::Pin<Box<tokio_io_timeout::TimeoutStream<S>>>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite,
{
let mut stream = tokio_io_timeout::TimeoutStream::new(stream);
if !idle_timeout.is_zero() {
stream.set_read_timeout(Some(idle_timeout));
stream.set_write_timeout(Some(idle_timeout));
}
Box::pin(stream)
}
async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error>> { async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error>> {
// Initialize the store // Initialize the store
let mut store = Store::new(config.index_type); let mut store = Store::new(config.index_type);
@ -374,6 +389,7 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
let public_port = config.public_port; let public_port = config.public_port;
let needs_public = public_port != config.port; let needs_public = public_port != config.port;
let http_idle_timeout = std::time::Duration::from_secs(config.idle_timeout as u64);
let grpc_addr = format!("{}:{}", config.bind_ip, config.grpc_port); let grpc_addr = format!("{}:{}", config.bind_ip, config.grpc_port);
let grpc_tls_acceptor = build_grpc_server_tls_acceptor( let grpc_tls_acceptor = build_grpc_server_tls_acceptor(
@ -489,22 +505,29 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
let http_handle = if let Some(tls_acceptor) = https_tls_acceptor.clone() { let http_handle = if let Some(tls_acceptor) = https_tls_acceptor.clone() {
let mut shutdown_rx = shutdown_tx.subscribe(); let mut shutdown_rx = shutdown_tx.subscribe();
tokio::spawn(async move { tokio::spawn(async move {
serve_https(admin_listener, admin_router, tls_acceptor, async move {
let _ = shutdown_rx.recv().await;
})
serve_https(
admin_listener,
admin_router,
tls_acceptor,
http_idle_timeout,
async move {
let _ = shutdown_rx.recv().await;
},
)
.await; .await;
}) })
} else { } else {
let mut shutdown_rx = shutdown_tx.subscribe(); let mut shutdown_rx = shutdown_tx.subscribe();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = axum::serve(admin_listener, admin_router)
.with_graceful_shutdown(async move {
serve_http(
admin_listener,
admin_router,
http_idle_timeout,
async move {
let _ = shutdown_rx.recv().await; let _ = shutdown_rx.recv().await;
})
.await
{
error!("HTTP server error: {}", e);
}
},
)
.await;
}) })
}; };
@ -608,14 +631,10 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
info!("Public HTTP server listening on {}", public_addr); info!("Public HTTP server listening on {}", public_addr);
let mut shutdown_rx = shutdown_tx.subscribe(); let mut shutdown_rx = shutdown_tx.subscribe();
Some(tokio::spawn(async move { Some(tokio::spawn(async move {
if let Err(e) = axum::serve(listener, public_router)
.with_graceful_shutdown(async move {
let _ = shutdown_rx.recv().await;
})
.await
{
error!("Public HTTP server error: {}", e);
}
serve_http(listener, public_router, http_idle_timeout, async move {
let _ = shutdown_rx.recv().await;
})
.await;
})) }))
} else { } else {
None None
@ -758,10 +777,56 @@ fn grpc_tls_incoming(
/// Serve an axum Router over TLS using tokio-rustls. /// Serve an axum Router over TLS using tokio-rustls.
/// Accepts TCP connections, performs TLS handshake, then serves HTTP over the encrypted stream. /// Accepts TCP connections, performs TLS handshake, then serves HTTP over the encrypted stream.
async fn serve_http<F>(
tcp_listener: tokio::net::TcpListener,
app: axum::Router,
idle_timeout: std::time::Duration,
shutdown_signal: F,
) where
F: std::future::Future<Output = ()> + Send + 'static,
{
use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::server::conn::auto::Builder as HttpBuilder;
use hyper_util::service::TowerToHyperService;
use tower::Service;
let mut make_svc = app.into_make_service();
tokio::pin!(shutdown_signal);
loop {
tokio::select! {
_ = &mut shutdown_signal => {
info!("HTTP server shutting down");
break;
}
result = tcp_listener.accept() => {
match result {
Ok((tcp_stream, remote_addr)) => {
let tower_svc = make_svc.call(remote_addr).await.expect("infallible");
let hyper_svc = TowerToHyperService::new(tower_svc);
tokio::spawn(async move {
let io = TokioIo::new(apply_idle_timeout(tcp_stream, idle_timeout));
let builder = HttpBuilder::new(TokioExecutor::new());
if let Err(e) = builder.serve_connection(io, hyper_svc).await {
tracing::debug!("HTTP connection error: {}", e);
}
});
}
Err(e) => {
error!("Failed to accept TCP connection: {}", e);
}
}
}
}
}
}
async fn serve_https<F>( async fn serve_https<F>(
tcp_listener: tokio::net::TcpListener, tcp_listener: tokio::net::TcpListener,
app: axum::Router, app: axum::Router,
tls_acceptor: TlsAcceptor, tls_acceptor: TlsAcceptor,
idle_timeout: std::time::Duration,
shutdown_signal: F, shutdown_signal: F,
) where ) where
F: std::future::Future<Output = ()> + Send + 'static, F: std::future::Future<Output = ()> + Send + 'static,
@ -790,7 +855,7 @@ async fn serve_https<F>(
tokio::spawn(async move { tokio::spawn(async move {
match tls_acceptor.accept(tcp_stream).await { match tls_acceptor.accept(tcp_stream).await {
Ok(tls_stream) => { Ok(tls_stream) => {
let io = TokioIo::new(tls_stream);
let io = TokioIo::new(apply_idle_timeout(tls_stream, idle_timeout));
let builder = HttpBuilder::new(TokioExecutor::new()); let builder = HttpBuilder::new(TokioExecutor::new());
if let Err(e) = builder.serve_connection(io, hyper_svc).await { if let Err(e) = builder.serve_connection(io, hyper_svc).await {
tracing::debug!("HTTPS connection error: {}", e); tracing::debug!("HTTPS connection error: {}", e);

Loading…
Cancel
Save