From e5204880f7974635b1ec294b02f5dad52e9b0313 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Mar 2026 12:20:39 -0700 Subject: [PATCH] Honor https.client for outgoing volume HTTP --- seaweed-volume/src/config.rs | 71 ++++++ seaweed-volume/src/main.rs | 98 +++++++- seaweed-volume/src/server/grpc_server.rs | 27 +- seaweed-volume/src/server/handlers.rs | 279 +++++++++++++-------- seaweed-volume/src/server/heartbeat.rs | 19 +- seaweed-volume/src/server/volume_server.rs | 13 + seaweed-volume/src/server/write_queue.rs | 1 + seaweed-volume/tests/http_integration.rs | 1 + 8 files changed, 378 insertions(+), 131 deletions(-) diff --git a/seaweed-volume/src/config.rs b/seaweed-volume/src/config.rs index d2c6c6ba5..31aacac11 100644 --- a/seaweed-volume/src/config.rs +++ b/seaweed-volume/src/config.rs @@ -238,6 +238,10 @@ pub struct VolumeServerConfig { pub https_cert_file: String, pub https_key_file: String, pub https_ca_file: String, + pub https_client_enabled: bool, + pub https_client_cert_file: String, + pub https_client_key_file: String, + pub https_client_ca_file: String, pub grpc_cert_file: String, pub grpc_key_file: String, pub grpc_ca_file: String, @@ -773,6 +777,10 @@ fn resolve_config(cli: Cli) -> VolumeServerConfig { https_cert_file: sec.https_cert_file, https_key_file: sec.https_key_file, https_ca_file: sec.https_ca_file, + https_client_enabled: sec.https_client_enabled, + https_client_cert_file: sec.https_client_cert_file, + https_client_key_file: sec.https_client_key_file, + https_client_ca_file: sec.https_client_ca_file, grpc_cert_file: sec.grpc_cert_file, grpc_key_file: sec.grpc_key_file, grpc_ca_file: sec.grpc_ca_file, @@ -797,6 +805,10 @@ pub struct SecurityConfig { pub https_cert_file: String, pub https_key_file: String, pub https_ca_file: String, + pub https_client_enabled: bool, + pub https_client_cert_file: String, + pub https_client_key_file: String, + pub https_client_ca_file: String, pub grpc_cert_file: String, pub grpc_key_file: String, pub grpc_ca_file: String, @@ -822,6 +834,12 @@ const SECURITY_CONFIG_FILE_NAME: &str = "security.toml"; /// cert = "/path/to/cert.pem" /// key = "/path/to/key.pem" /// +/// [https.client] +/// enabled = true +/// cert = "/path/to/cert.pem" +/// key = "/path/to/key.pem" +/// ca = "/path/to/ca.pem" +/// /// [grpc] /// ca = "/path/to/ca.pem" /// @@ -852,6 +870,7 @@ pub fn parse_security_config(path: &str) -> SecurityConfig { None, JwtSigning, JwtSigningRead, + HttpsClient, Grpc, HttpsVolume, GrpcVolume, @@ -874,6 +893,10 @@ pub fn parse_security_config(path: &str) -> SecurityConfig { section = Section::JwtSigning; continue; } + if trimmed == "[https.client]" { + section = Section::HttpsClient; + continue; + } if trimmed == "[grpc]" { section = Section::Grpc; continue; @@ -915,6 +938,13 @@ pub fn parse_security_config(path: &str) -> SecurityConfig { "expires_after_seconds" => cfg.jwt_signing_expires = value.parse().unwrap_or(0), _ => {} }, + Section::HttpsClient => match key { + "enabled" => cfg.https_client_enabled = value.parse().unwrap_or(false), + "cert" => cfg.https_client_cert_file = value.to_string(), + "key" => cfg.https_client_key_file = value.to_string(), + "ca" => cfg.https_client_ca_file = value.to_string(), + _ => {} + }, Section::Grpc => match key { "ca" => cfg.grpc_ca_file = value.to_string(), _ => {} @@ -1022,6 +1052,18 @@ fn apply_env_overrides(cfg: &mut SecurityConfig) { if let Ok(v) = std::env::var("WEED_HTTPS_VOLUME_CA") { cfg.https_ca_file = v; } + if let Ok(v) = std::env::var("WEED_HTTPS_CLIENT_ENABLED") { + cfg.https_client_enabled = v == "true" || v == "1"; + } + if let Ok(v) = std::env::var("WEED_HTTPS_CLIENT_CERT") { + cfg.https_client_cert_file = v; + } + if let Ok(v) = std::env::var("WEED_HTTPS_CLIENT_KEY") { + cfg.https_client_key_file = v; + } + if let Ok(v) = std::env::var("WEED_HTTPS_CLIENT_CA") { + cfg.https_client_ca_file = v; + } if let Ok(v) = std::env::var("WEED_GRPC_VOLUME_CERT") { cfg.grpc_cert_file = v; } @@ -1104,6 +1146,10 @@ mod tests { "WEED_HTTPS_VOLUME_CERT", "WEED_HTTPS_VOLUME_KEY", "WEED_HTTPS_VOLUME_CA", + "WEED_HTTPS_CLIENT_ENABLED", + "WEED_HTTPS_CLIENT_CERT", + "WEED_HTTPS_CLIENT_KEY", + "WEED_HTTPS_CLIENT_CA", "WEED_GRPC_VOLUME_CERT", "WEED_GRPC_VOLUME_KEY", "WEED_GRPC_CA", @@ -1353,6 +1399,31 @@ key = "/etc/seaweedfs/volume-key.pem" }); } + #[test] + fn test_parse_security_config_uses_https_client_settings() { + let _guard = process_state_lock(); + let tmp = tempfile::NamedTempFile::new().unwrap(); + std::fs::write( + tmp.path(), + r#" +[https.client] +enabled = true +cert = "/etc/seaweedfs/client-cert.pem" +key = "/etc/seaweedfs/client-key.pem" +ca = "/etc/seaweedfs/client-ca.pem" +"#, + ) + .unwrap(); + + with_cleared_security_env(|| { + let cfg = parse_security_config(tmp.path().to_str().unwrap()); + assert!(cfg.https_client_enabled); + assert_eq!(cfg.https_client_cert_file, "/etc/seaweedfs/client-cert.pem"); + assert_eq!(cfg.https_client_key_file, "/etc/seaweedfs/client-key.pem"); + assert_eq!(cfg.https_client_ca_file, "/etc/seaweedfs/client-ca.pem"); + }); + } + #[test] fn test_merge_options_file_basic() { let tmp = tempfile::NamedTempFile::new().unwrap(); diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index 44810e509..1ebeb9a3c 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -72,7 +72,9 @@ fn load_rustls_config(cert_path: &str, key_path: &str, ca_path: &str) -> rustls: .expect("Failed to parse CA certificate PEM"); let mut root_store = rustls::RootCertStore::empty(); for cert in ca_certs { - root_store.add(cert).expect("Failed to add CA certificate to root store"); + root_store + .add(cert) + .expect("Failed to add CA certificate to root store"); } let verifier = rustls::server::WebPkiClientVerifier::builder(Arc::new(root_store)) .build() @@ -89,6 +91,78 @@ fn load_rustls_config(cert_path: &str, key_path: &str, ca_path: &str) -> rustls: } } +fn build_outgoing_http_client( + config: &VolumeServerConfig, +) -> Result<(reqwest::Client, String), Box> { + let scheme = if config.https_client_enabled { + "https" + } else { + "http" + }; + if !config.https_client_enabled { + return Ok((reqwest::Client::new(), scheme.to_string())); + } + + let mut builder = reqwest::Client::builder(); + if !config.https_client_ca_file.is_empty() { + let ca_pem = std::fs::read(&config.https_client_ca_file).map_err(|e| { + format!( + "Failed to read HTTPS client CA file '{}': {}", + config.https_client_ca_file, e + ) + })?; + let cert = reqwest::Certificate::from_pem(&ca_pem).map_err(|e| { + format!( + "Failed to parse HTTPS client CA PEM '{}': {}", + config.https_client_ca_file, e + ) + })?; + builder = builder.add_root_certificate(cert); + } + + match ( + config.https_client_cert_file.is_empty(), + config.https_client_key_file.is_empty(), + ) { + (true, true) => {} + (false, false) => { + let cert_pem = std::fs::read(&config.https_client_cert_file).map_err(|e| { + format!( + "Failed to read HTTPS client cert file '{}': {}", + config.https_client_cert_file, e + ) + })?; + let key_pem = std::fs::read(&config.https_client_key_file).map_err(|e| { + format!( + "Failed to read HTTPS client key file '{}': {}", + config.https_client_key_file, e + ) + })?; + let mut identity_pem = cert_pem; + if !identity_pem.ends_with(b"\n") { + identity_pem.push(b'\n'); + } + identity_pem.extend_from_slice(&key_pem); + let identity = reqwest::Identity::from_pem(&identity_pem).map_err(|e| { + format!( + "Failed to parse HTTPS client identity '{}'+ '{}': {}", + config.https_client_cert_file, config.https_client_key_file, e + ) + })?; + builder = builder.identity(identity); + } + _ => { + return Err(format!( + "HTTPS client requires both cert and key, got cert='{}' key='{}'", + config.https_client_cert_file, config.https_client_key_file + ) + .into()); + } + } + + Ok((builder.build()?, scheme.to_string())) +} + async fn run(config: VolumeServerConfig) -> Result<(), Box> { // Initialize the store let mut store = Store::new(config.index_type); @@ -131,6 +205,7 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box = if req.volume_ids.is_empty() { - store.locations.iter().flat_map(|loc| loc.ec_volumes().map(|(vid, _)| *vid)).collect() + store + .locations + .iter() + .flat_map(|loc| loc.ec_volumes().map(|(vid, _)| *vid)) + .collect() } else { req.volume_ids.iter().map(|&id| VolumeId(id)).collect() }; @@ -3094,7 +3102,7 @@ impl VolumeServer for VolumeGrpcService { crc: n.checksum.0, ttl: ttl_str, }, - )) + )); } Err(_) => { return Err(Status::not_found(format!( @@ -3236,15 +3244,14 @@ fn to_grpc_endpoint(target: &str) -> Result { /// Ping a remote volume server target by actually calling its Ping RPC (matches Go behavior). async fn ping_volume_server_target(target: &str) -> Result { let addr = to_grpc_endpoint(target)?; - let channel = tonic::transport::Channel::from_shared(addr.clone()) - .map_err(|e| e.to_string())?; + let channel = + tonic::transport::Channel::from_shared(addr.clone()).map_err(|e| e.to_string())?; let channel = tokio::time::timeout(std::time::Duration::from_secs(5), channel.connect()) .await .map_err(|_| "connection timeout".to_string())? .map_err(|e| e.to_string())?; - let mut client = - volume_server_pb::volume_server_client::VolumeServerClient::new(channel); + let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel); let resp = client .ping(volume_server_pb::PingRequest { target: String::new(), @@ -3258,8 +3265,8 @@ async fn ping_volume_server_target(target: &str) -> Result { /// Ping a remote master target by actually calling its Ping RPC (matches Go behavior). async fn ping_master_target(target: &str) -> Result { let addr = to_grpc_endpoint(target)?; - let channel = tonic::transport::Channel::from_shared(addr.clone()) - .map_err(|e| e.to_string())?; + let channel = + tonic::transport::Channel::from_shared(addr.clone()).map_err(|e| e.to_string())?; let channel = tokio::time::timeout(std::time::Duration::from_secs(5), channel.connect()) .await .map_err(|_| "connection timeout".to_string())? diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index 0ebb2409b..deb531551 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -14,7 +14,7 @@ use axum::http::{header, HeaderMap, Method, Request, StatusCode}; use axum::response::{IntoResponse, Response}; use serde::{Deserialize, Serialize}; -use super::volume_server::VolumeServerState; +use super::volume_server::{normalize_outgoing_http_url, VolumeServerState}; use crate::config::ReadMode; use crate::metrics; use crate::storage::needle::needle::Needle; @@ -325,10 +325,14 @@ struct LookupResult { /// Look up volume locations from the master via HTTP /dir/lookup. async fn lookup_volume( client: &reqwest::Client, + scheme: &str, master_url: &str, volume_id: u32, ) -> Result, String> { - let url = format!("http://{}/dir/lookup?volumeId={}", master_url, volume_id); + let url = normalize_outgoing_http_url( + scheme, + &format!("{}/dir/lookup?volumeId={}", master_url, volume_id), + )?; let resp = client .get(&url) .send() @@ -356,9 +360,14 @@ async fn do_replicated_request( headers: &axum::http::HeaderMap, body: Option, ) -> Result<(), String> { - let locations = lookup_volume(&state.http_client, &state.master_url, vid) - .await - .map_err(|e| format!("lookup volume failed: {}", e))?; + let locations = lookup_volume( + &state.http_client, + &state.outgoing_http_scheme, + &state.master_url, + vid, + ) + .await + .map_err(|e| format!("lookup volume failed: {}", e))?; let remote_locations: Vec<_> = locations .into_iter() @@ -377,7 +386,10 @@ async fn do_replicated_request( let mut futures = Vec::new(); for loc in remote_locations { - let url = format!("http://{}{}?{}", loc.url, path, new_query); + let url = normalize_outgoing_http_url( + &state.outgoing_http_scheme, + &format!("{}{}?{}", loc.url, path, new_query), + )?; let client = state.http_client.clone(); let mut req_builder = client.request(method.clone(), &url); @@ -440,7 +452,14 @@ async fn proxy_or_redirect_to_target( vid: VolumeId, ) -> Response { // Look up volume locations from master - let locations = match lookup_volume(&state.http_client, &state.master_url, vid.0).await { + let locations = match lookup_volume( + &state.http_client, + &state.outgoing_http_scheme, + &state.master_url, + vid.0, + ) + .await + { Ok(locs) => locs, Err(e) => { tracing::warn!("volume lookup failed for {}: {}", vid.0, e); @@ -473,7 +492,7 @@ async fn proxy_or_redirect_to_target( match state.read_mode { ReadMode::Proxy => proxy_request(state, &info, target).await, - ReadMode::Redirect => redirect_request(&info, target), + ReadMode::Redirect => redirect_request(&info, target, &state.outgoing_http_scheme), ReadMode::Local => unreachable!(), } } @@ -485,18 +504,23 @@ async fn proxy_request( target: &VolumeLocation, ) -> Response { // Build target URL, adding proxied=true query param - let scheme = "http"; - let target_host = &target.url; let path = info.path.trim_start_matches('/'); - let target_url = if info.original_query.is_empty() { - format!("{}://{}/{}?proxied=true", scheme, target_host, path) + let raw_target = if info.original_query.is_empty() { + format!("{}/{}?proxied=true", target.url, path) } else { format!( - "{}://{}/{}?{}&proxied=true", - scheme, target_host, path, info.original_query + "{}/{}?{}&proxied=true", + target.url, path, info.original_query ) }; + let target_url = match normalize_outgoing_http_url(&state.outgoing_http_scheme, &raw_target) { + Ok(url) => url, + Err(e) => { + tracing::warn!("proxy target url {} invalid: {}", raw_target, e); + return StatusCode::INTERNAL_SERVER_ERROR.into_response(); + } + }; // Build the proxy request let mut req_builder = state.http_client.get(&target_url); @@ -542,10 +566,7 @@ async fn proxy_request( } /// Return a redirect response to the target volume server. -fn redirect_request(info: &ProxyRequestInfo, target: &VolumeLocation) -> Response { - let scheme = "http"; - let target_host = &target.public_url; - +fn redirect_request(info: &ProxyRequestInfo, target: &VolumeLocation, scheme: &str) -> Response { // Build query string: preserve collection, add proxied=true, drop readDeleted (Go parity) let mut query_params = Vec::new(); if !info.original_query.is_empty() { @@ -561,10 +582,14 @@ fn redirect_request(info: &ProxyRequestInfo, target: &VolumeLocation) -> Respons query_params.push("proxied=true".to_string()); let query = query_params.join("&"); - let location = format!( - "{}://{}/{},{}?{}", - scheme, target_host, &info.vid_str, &info.fid_str, query + let raw_target = format!( + "{}/{},{}?{}", + target.public_url, &info.vid_str, &info.fid_str, query ); + let location = match normalize_outgoing_http_url(scheme, &raw_target) { + Ok(url) => url, + Err(_) => return StatusCode::INTERNAL_SERVER_ERROR.into_response(), + }; Response::builder() .status(StatusCode::MOVED_PERMANENTLY) @@ -661,11 +686,12 @@ async fn get_or_head_handler_inner( // so invalid paths with JWT enabled return 401, not 400. let file_id = extract_file_id(&path); let token = extract_jwt(&headers, request.uri()); - if let Err(e) = state - .guard - .read() - .unwrap() - .check_jwt_for_file(token.as_deref(), &file_id, false) + if let Err(e) = + state + .guard + .read() + .unwrap() + .check_jwt_for_file(token.as_deref(), &file_id, false) { return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response(); } @@ -881,8 +907,7 @@ async fn get_or_head_handler_inner( let mut not_modified_headers = HeaderMap::new(); not_modified_headers.insert(header::ETAG, etag.parse().unwrap()); if let Some(ref lm) = last_modified_str { - not_modified_headers - .insert(header::LAST_MODIFIED, lm.parse().unwrap()); + not_modified_headers.insert(header::LAST_MODIFIED, lm.parse().unwrap()); } return (StatusCode::NOT_MODIFIED, not_modified_headers).into_response(); } @@ -1099,8 +1124,8 @@ async fn get_or_head_handler_inner( let mut data = n.data; // Check if image operations are needed — must decompress first regardless of Accept-Encoding - let needs_image_ops = is_image - && (query.width.is_some() || query.height.is_some() || query.mode.is_some()); + let needs_image_ops = + is_image && (query.width.is_some() || query.height.is_some() || query.mode.is_some()); if is_compressed { if needs_image_ops { @@ -1432,7 +1457,8 @@ pub async fn post_handler( let path = request.uri().path().to_string(); let query = request.uri().query().unwrap_or("").to_string(); let headers = request.headers().clone(); - let query_fields: Vec<(String, String)> = serde_urlencoded::from_str(&query).unwrap_or_default(); + let query_fields: Vec<(String, String)> = + serde_urlencoded::from_str(&query).unwrap_or_default(); let (vid, needle_id, cookie) = match parse_url_path(&path) { Some(parsed) => parsed, @@ -1557,90 +1583,89 @@ pub async fn post_handler( parsed_content_encoding, parsed_content_md5, multipart_form_fields, - ) = - if content_type_str.starts_with("multipart/form-data") { - // Extract boundary from Content-Type - let boundary = content_type_str - .split(';') - .find_map(|part| { - let part = part.trim(); - if let Some(val) = part.strip_prefix("boundary=") { - Some(val.trim_matches('"').to_string()) - } else { - None - } - }) - .unwrap_or_default(); + ) = if content_type_str.starts_with("multipart/form-data") { + // Extract boundary from Content-Type + let boundary = content_type_str + .split(';') + .find_map(|part| { + let part = part.trim(); + if let Some(val) = part.strip_prefix("boundary=") { + Some(val.trim_matches('"').to_string()) + } else { + None + } + }) + .unwrap_or_default(); - let mut multipart = multer::Multipart::new( - futures::stream::once(async { Ok::<_, std::io::Error>(body.clone()) }), - boundary, - ); + let mut multipart = multer::Multipart::new( + futures::stream::once(async { Ok::<_, std::io::Error>(body.clone()) }), + boundary, + ); - let mut file_data: Option> = None; - let mut file_name: Option = None; - let mut file_content_type: Option = None; - let mut file_content_encoding: Option = None; - let mut file_content_md5: Option = None; - let mut form_fields = std::collections::HashMap::new(); - - while let Ok(Some(field)) = multipart.next_field().await { - let field_name = field.name().map(|s| s.to_string()); - let fname = field.file_name().map(|s| { - // Clean Windows backslashes - let cleaned = s.replace('\\', "/"); - cleaned.rsplit('/').next().unwrap_or(&cleaned).to_string() - }); - let fct = field.content_type().map(|m| m.to_string()); - let field_headers = field.headers().clone(); - let fce = field_headers - .get(header::CONTENT_ENCODING) - .and_then(|v| v.to_str().ok()) - .map(|s| s.to_string()); - let fmd5 = field_headers - .get("Content-MD5") - .and_then(|v| v.to_str().ok()) - .map(|s| s.to_string()); - - if let Ok(data) = field.bytes().await { - if file_data.is_none() && fname.is_some() { - // First file field - file_data = Some(data.to_vec()); - file_name = fname; - file_content_type = fct; - file_content_encoding = fce; - file_content_md5 = fmd5; - } else if let Some(name) = field_name { - form_fields - .entry(name) - .or_insert_with(|| String::from_utf8_lossy(&data).to_string()); - } + let mut file_data: Option> = None; + let mut file_name: Option = None; + let mut file_content_type: Option = None; + let mut file_content_encoding: Option = None; + let mut file_content_md5: Option = None; + let mut form_fields = std::collections::HashMap::new(); + + while let Ok(Some(field)) = multipart.next_field().await { + let field_name = field.name().map(|s| s.to_string()); + let fname = field.file_name().map(|s| { + // Clean Windows backslashes + let cleaned = s.replace('\\', "/"); + cleaned.rsplit('/').next().unwrap_or(&cleaned).to_string() + }); + let fct = field.content_type().map(|m| m.to_string()); + let field_headers = field.headers().clone(); + let fce = field_headers + .get(header::CONTENT_ENCODING) + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + let fmd5 = field_headers + .get("Content-MD5") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + + if let Ok(data) = field.bytes().await { + if file_data.is_none() && fname.is_some() { + // First file field + file_data = Some(data.to_vec()); + file_name = fname; + file_content_type = fct; + file_content_encoding = fce; + file_content_md5 = fmd5; + } else if let Some(name) = field_name { + form_fields + .entry(name) + .or_insert_with(|| String::from_utf8_lossy(&data).to_string()); } } + } - if let Some(data) = file_data { - ( - data, - file_name.unwrap_or_default(), - file_content_type, - file_content_encoding, - file_content_md5, - form_fields, - ) - } else { - // No file field found, use raw body - (body.to_vec(), String::new(), None, None, None, form_fields) - } - } else { + if let Some(data) = file_data { ( - body.to_vec(), - String::new(), - None, - None, - None, - std::collections::HashMap::new(), + data, + file_name.unwrap_or_default(), + file_content_type, + file_content_encoding, + file_content_md5, + form_fields, ) - }; + } else { + // No file field found, use raw body + (body.to_vec(), String::new(), None, None, None, form_fields) + } + } else { + ( + body.to_vec(), + String::new(), + None, + None, + None, + std::collections::HashMap::new(), + ) + }; let form_value = |name: &str| { query_fields @@ -2639,9 +2664,9 @@ fn json_error_with_query( let body = serde_json::json!({"error": msg.into()}); let (is_pretty, callback) = if let Some(q) = query { - let pretty = q.split('&').any(|p| { - p.starts_with("pretty=") && p.len() > "pretty=".len() - }); + let pretty = q + .split('&') + .any(|p| p.starts_with("pretty=") && p.len() > "pretty=".len()); let cb = q .split('&') .find_map(|p| p.strip_prefix("callback=")) @@ -2941,4 +2966,36 @@ mod tests { 128 * 1024 ); } + + #[test] + fn test_normalize_outgoing_http_url_rewrites_scheme() { + let url = normalize_outgoing_http_url( + "https", + "http://master.example.com:9333/dir/lookup?volumeId=7", + ) + .unwrap(); + assert_eq!(url, "https://master.example.com:9333/dir/lookup?volumeId=7"); + } + + #[test] + fn test_redirect_request_uses_outgoing_http_scheme() { + let info = ProxyRequestInfo { + original_headers: HeaderMap::new(), + original_query: "collection=photos&readDeleted=true".to_string(), + path: "/3,01637037d6".to_string(), + vid_str: "3".to_string(), + fid_str: "01637037d6".to_string(), + }; + let target = VolumeLocation { + url: "volume.internal:8080".to_string(), + public_url: "volume.public:8080".to_string(), + }; + + let response = redirect_request(&info, &target, "https"); + assert_eq!(response.status(), StatusCode::MOVED_PERMANENTLY); + assert_eq!( + response.headers().get(header::LOCATION).unwrap(), + "https://volume.public:8080/3,01637037d6?collection=photos&proxied=true" + ); + } } diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index 154a23b3f..40e219e97 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -632,7 +632,13 @@ mod tests { fn test_state_with_store(store: Store) -> Arc { Arc::new(VolumeServerState { store: RwLock::new(store), - guard: RwLock::new(Guard::new(&[], SigningKey(vec![]), 0, SigningKey(vec![]), 0)), + guard: RwLock::new(Guard::new( + &[], + SigningKey(vec![]), + 0, + SigningKey(vec![]), + 0, + )), is_stopping: RwLock::new(false), maintenance: std::sync::atomic::AtomicBool::new(false), state_version: std::sync::atomic::AtomicU32::new(0), @@ -657,6 +663,7 @@ mod tests { master_url: String::new(), self_url: String::new(), http_client: reqwest::Client::new(), + outgoing_http_scheme: "http".to_string(), metrics_runtime: std::sync::RwLock::new(Default::default()), metrics_notify: tokio::sync::Notify::new(), has_slow_read: true, @@ -684,7 +691,15 @@ mod tests { ) .unwrap(); store - .add_volume(VolumeId(7), "pics", None, None, 0, DiskType::HardDrive, Version::current()) + .add_volume( + VolumeId(7), + "pics", + None, + None, + 0, + DiskType::HardDrive, + Version::current(), + ) .unwrap(); let heartbeat = build_heartbeat(&test_config(), &store); diff --git a/seaweed-volume/src/server/volume_server.rs b/seaweed-volume/src/server/volume_server.rs index 41ef95553..767905b51 100644 --- a/seaweed-volume/src/server/volume_server.rs +++ b/seaweed-volume/src/server/volume_server.rs @@ -79,6 +79,8 @@ pub struct VolumeServerState { pub self_url: String, /// HTTP client for proxy requests and master lookups. pub http_client: reqwest::Client, + /// Scheme used for outgoing master and peer HTTP requests ("http" or "https"). + pub outgoing_http_scheme: String, /// Metrics push settings learned from master heartbeat responses. pub metrics_runtime: std::sync::RwLock, pub metrics_notify: tokio::sync::Notify, @@ -105,6 +107,17 @@ pub fn build_metrics_router() -> Router { Router::new().route("/metrics", get(handlers::metrics_handler)) } +pub fn normalize_outgoing_http_url(scheme: &str, raw_target: &str) -> Result { + if raw_target.starts_with("http://") || raw_target.starts_with("https://") { + let mut url = reqwest::Url::parse(raw_target) + .map_err(|e| format!("invalid url {}: {}", raw_target, e))?; + url.set_scheme(scheme) + .map_err(|_| format!("invalid scheme {}", scheme))?; + return Ok(url.to_string()); + } + Ok(format!("{}://{}", scheme, raw_target)) +} + /// Middleware: set Server header, echo x-amz-request-id, set CORS if Origin present. async fn common_headers_middleware(request: Request, next: Next) -> Response { let origin = request.headers().get("origin").cloned(); diff --git a/seaweed-volume/src/server/write_queue.rs b/seaweed-volume/src/server/write_queue.rs index 40435f171..6c2386486 100644 --- a/seaweed-volume/src/server/write_queue.rs +++ b/seaweed-volume/src/server/write_queue.rs @@ -198,6 +198,7 @@ mod tests { master_url: String::new(), self_url: String::new(), http_client: reqwest::Client::new(), + outgoing_http_scheme: "http".to_string(), metrics_runtime: std::sync::RwLock::new(RuntimeMetricsConfig::default()), metrics_notify: tokio::sync::Notify::new(), has_slow_read: true, diff --git a/seaweed-volume/tests/http_integration.rs b/seaweed-volume/tests/http_integration.rs index 68f453a70..db1f5b89d 100644 --- a/seaweed-volume/tests/http_integration.rs +++ b/seaweed-volume/tests/http_integration.rs @@ -83,6 +83,7 @@ fn test_state_with_signing_key(signing_key: Vec) -> (Arc, master_url: String::new(), self_url: String::new(), http_client: reqwest::Client::new(), + outgoing_http_scheme: "http".to_string(), metrics_runtime: std::sync::RwLock::new( seaweed_volume::server::volume_server::RuntimeMetricsConfig::default(), ),