Browse Source

Honor https.client for outgoing volume HTTP

rust-volume-server
Chris Lu 4 days ago
parent
commit
e5204880f7
  1. 71
      seaweed-volume/src/config.rs
  2. 98
      seaweed-volume/src/main.rs
  3. 27
      seaweed-volume/src/server/grpc_server.rs
  4. 279
      seaweed-volume/src/server/handlers.rs
  5. 19
      seaweed-volume/src/server/heartbeat.rs
  6. 13
      seaweed-volume/src/server/volume_server.rs
  7. 1
      seaweed-volume/src/server/write_queue.rs
  8. 1
      seaweed-volume/tests/http_integration.rs

71
seaweed-volume/src/config.rs

@ -238,6 +238,10 @@ pub struct VolumeServerConfig {
pub https_cert_file: String,
pub https_key_file: String,
pub https_ca_file: String,
pub https_client_enabled: bool,
pub https_client_cert_file: String,
pub https_client_key_file: String,
pub https_client_ca_file: String,
pub grpc_cert_file: String,
pub grpc_key_file: String,
pub grpc_ca_file: String,
@ -773,6 +777,10 @@ fn resolve_config(cli: Cli) -> VolumeServerConfig {
https_cert_file: sec.https_cert_file,
https_key_file: sec.https_key_file,
https_ca_file: sec.https_ca_file,
https_client_enabled: sec.https_client_enabled,
https_client_cert_file: sec.https_client_cert_file,
https_client_key_file: sec.https_client_key_file,
https_client_ca_file: sec.https_client_ca_file,
grpc_cert_file: sec.grpc_cert_file,
grpc_key_file: sec.grpc_key_file,
grpc_ca_file: sec.grpc_ca_file,
@ -797,6 +805,10 @@ pub struct SecurityConfig {
pub https_cert_file: String,
pub https_key_file: String,
pub https_ca_file: String,
pub https_client_enabled: bool,
pub https_client_cert_file: String,
pub https_client_key_file: String,
pub https_client_ca_file: String,
pub grpc_cert_file: String,
pub grpc_key_file: String,
pub grpc_ca_file: String,
@ -822,6 +834,12 @@ const SECURITY_CONFIG_FILE_NAME: &str = "security.toml";
/// cert = "/path/to/cert.pem"
/// key = "/path/to/key.pem"
///
/// [https.client]
/// enabled = true
/// cert = "/path/to/cert.pem"
/// key = "/path/to/key.pem"
/// ca = "/path/to/ca.pem"
///
/// [grpc]
/// ca = "/path/to/ca.pem"
///
@ -852,6 +870,7 @@ pub fn parse_security_config(path: &str) -> SecurityConfig {
None,
JwtSigning,
JwtSigningRead,
HttpsClient,
Grpc,
HttpsVolume,
GrpcVolume,
@ -874,6 +893,10 @@ pub fn parse_security_config(path: &str) -> SecurityConfig {
section = Section::JwtSigning;
continue;
}
if trimmed == "[https.client]" {
section = Section::HttpsClient;
continue;
}
if trimmed == "[grpc]" {
section = Section::Grpc;
continue;
@ -915,6 +938,13 @@ pub fn parse_security_config(path: &str) -> SecurityConfig {
"expires_after_seconds" => cfg.jwt_signing_expires = value.parse().unwrap_or(0),
_ => {}
},
Section::HttpsClient => match key {
"enabled" => cfg.https_client_enabled = value.parse().unwrap_or(false),
"cert" => cfg.https_client_cert_file = value.to_string(),
"key" => cfg.https_client_key_file = value.to_string(),
"ca" => cfg.https_client_ca_file = value.to_string(),
_ => {}
},
Section::Grpc => match key {
"ca" => cfg.grpc_ca_file = value.to_string(),
_ => {}
@ -1022,6 +1052,18 @@ fn apply_env_overrides(cfg: &mut SecurityConfig) {
if let Ok(v) = std::env::var("WEED_HTTPS_VOLUME_CA") {
cfg.https_ca_file = v;
}
if let Ok(v) = std::env::var("WEED_HTTPS_CLIENT_ENABLED") {
cfg.https_client_enabled = v == "true" || v == "1";
}
if let Ok(v) = std::env::var("WEED_HTTPS_CLIENT_CERT") {
cfg.https_client_cert_file = v;
}
if let Ok(v) = std::env::var("WEED_HTTPS_CLIENT_KEY") {
cfg.https_client_key_file = v;
}
if let Ok(v) = std::env::var("WEED_HTTPS_CLIENT_CA") {
cfg.https_client_ca_file = v;
}
if let Ok(v) = std::env::var("WEED_GRPC_VOLUME_CERT") {
cfg.grpc_cert_file = v;
}
@ -1104,6 +1146,10 @@ mod tests {
"WEED_HTTPS_VOLUME_CERT",
"WEED_HTTPS_VOLUME_KEY",
"WEED_HTTPS_VOLUME_CA",
"WEED_HTTPS_CLIENT_ENABLED",
"WEED_HTTPS_CLIENT_CERT",
"WEED_HTTPS_CLIENT_KEY",
"WEED_HTTPS_CLIENT_CA",
"WEED_GRPC_VOLUME_CERT",
"WEED_GRPC_VOLUME_KEY",
"WEED_GRPC_CA",
@ -1353,6 +1399,31 @@ key = "/etc/seaweedfs/volume-key.pem"
});
}
#[test]
fn test_parse_security_config_uses_https_client_settings() {
let _guard = process_state_lock();
let tmp = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
tmp.path(),
r#"
[https.client]
enabled = true
cert = "/etc/seaweedfs/client-cert.pem"
key = "/etc/seaweedfs/client-key.pem"
ca = "/etc/seaweedfs/client-ca.pem"
"#,
)
.unwrap();
with_cleared_security_env(|| {
let cfg = parse_security_config(tmp.path().to_str().unwrap());
assert!(cfg.https_client_enabled);
assert_eq!(cfg.https_client_cert_file, "/etc/seaweedfs/client-cert.pem");
assert_eq!(cfg.https_client_key_file, "/etc/seaweedfs/client-key.pem");
assert_eq!(cfg.https_client_ca_file, "/etc/seaweedfs/client-ca.pem");
});
}
#[test]
fn test_merge_options_file_basic() {
let tmp = tempfile::NamedTempFile::new().unwrap();

98
seaweed-volume/src/main.rs

@ -72,7 +72,9 @@ fn load_rustls_config(cert_path: &str, key_path: &str, ca_path: &str) -> rustls:
.expect("Failed to parse CA certificate PEM");
let mut root_store = rustls::RootCertStore::empty();
for cert in ca_certs {
root_store.add(cert).expect("Failed to add CA certificate to root store");
root_store
.add(cert)
.expect("Failed to add CA certificate to root store");
}
let verifier = rustls::server::WebPkiClientVerifier::builder(Arc::new(root_store))
.build()
@ -89,6 +91,78 @@ fn load_rustls_config(cert_path: &str, key_path: &str, ca_path: &str) -> rustls:
}
}
fn build_outgoing_http_client(
config: &VolumeServerConfig,
) -> Result<(reqwest::Client, String), Box<dyn std::error::Error>> {
let scheme = if config.https_client_enabled {
"https"
} else {
"http"
};
if !config.https_client_enabled {
return Ok((reqwest::Client::new(), scheme.to_string()));
}
let mut builder = reqwest::Client::builder();
if !config.https_client_ca_file.is_empty() {
let ca_pem = std::fs::read(&config.https_client_ca_file).map_err(|e| {
format!(
"Failed to read HTTPS client CA file '{}': {}",
config.https_client_ca_file, e
)
})?;
let cert = reqwest::Certificate::from_pem(&ca_pem).map_err(|e| {
format!(
"Failed to parse HTTPS client CA PEM '{}': {}",
config.https_client_ca_file, e
)
})?;
builder = builder.add_root_certificate(cert);
}
match (
config.https_client_cert_file.is_empty(),
config.https_client_key_file.is_empty(),
) {
(true, true) => {}
(false, false) => {
let cert_pem = std::fs::read(&config.https_client_cert_file).map_err(|e| {
format!(
"Failed to read HTTPS client cert file '{}': {}",
config.https_client_cert_file, e
)
})?;
let key_pem = std::fs::read(&config.https_client_key_file).map_err(|e| {
format!(
"Failed to read HTTPS client key file '{}': {}",
config.https_client_key_file, e
)
})?;
let mut identity_pem = cert_pem;
if !identity_pem.ends_with(b"\n") {
identity_pem.push(b'\n');
}
identity_pem.extend_from_slice(&key_pem);
let identity = reqwest::Identity::from_pem(&identity_pem).map_err(|e| {
format!(
"Failed to parse HTTPS client identity '{}'+ '{}': {}",
config.https_client_cert_file, config.https_client_key_file, e
)
})?;
builder = builder.identity(identity);
}
_ => {
return Err(format!(
"HTTPS client requires both cert and key, got cert='{}' key='{}'",
config.https_client_cert_file, config.https_client_key_file
)
.into());
}
}
Ok((builder.build()?, scheme.to_string()))
}
async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error>> {
// Initialize the store
let mut store = Store::new(config.index_type);
@ -131,6 +205,7 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
);
let master_url = config.masters.first().cloned().unwrap_or_default();
let self_url = format!("{}:{}", config.ip, config.port);
let (http_client, outgoing_http_scheme) = build_outgoing_http_client(&config)?;
let security_file = config.security_file.clone();
let cli_white_list = config.white_list.clone();
@ -163,7 +238,8 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
read_mode: config.read_mode,
master_url,
self_url,
http_client: reqwest::Client::new(),
http_client,
outgoing_http_scheme,
metrics_runtime: std::sync::RwLock::new(RuntimeMetricsConfig::default()),
metrics_notify: tokio::sync::Notify::new(),
has_slow_read: config.has_slow_read,
@ -186,7 +262,9 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
let store = state.store.read().unwrap();
let mut max_vols: i64 = 0;
for loc in &store.locations {
max_vols += loc.max_volume_count.load(std::sync::atomic::Ordering::Relaxed) as i64;
max_vols += loc
.max_volume_count
.load(std::sync::atomic::Ordering::Relaxed) as i64;
}
metrics::MAX_VOLUMES.set(max_vols);
}
@ -282,9 +360,8 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
{
let state_reload = state.clone();
tokio::spawn(async move {
let mut sighup =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
.expect("Failed to install SIGHUP handler");
let mut sighup = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
.expect("Failed to install SIGHUP handler");
loop {
sighup.recv().await;
info!("Received SIGHUP, reloading...");
@ -320,7 +397,11 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
"TLS enabled for HTTP server (cert={}, key={})",
config.https_cert_file, config.https_key_file
);
let tls_config = load_rustls_config(&config.https_cert_file, &config.https_key_file, &config.https_ca_file);
let tls_config = load_rustls_config(
&config.https_cert_file,
&config.https_key_file,
&config.https_ca_file,
);
Some(TlsAcceptor::from(Arc::new(tls_config)))
} else {
None
@ -381,7 +462,8 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
let ca_cert = std::fs::read_to_string(&grpc_ca_file).unwrap_or_else(|e| {
panic!("Failed to read gRPC CA cert '{}': {}", grpc_ca_file, e)
});
tls_config = tls_config.client_ca_root(tonic::transport::Certificate::from_pem(ca_cert));
tls_config =
tls_config.client_ca_root(tonic::transport::Certificate::from_pem(ca_cert));
}
if let Err(e) = tonic::transport::Server::builder()
.tls_config(tls_config)

27
seaweed-volume/src/server/grpc_server.rs

@ -2726,10 +2726,14 @@ impl VolumeServer for VolumeGrpcService {
// Replicate to peers concurrently (matches Go's sync.WaitGroup + goroutines)
if !req.replicas.is_empty() {
let file_id = format!("{},{:x}{:08x}", vid, req.needle_id, req.cookie);
let http_client = reqwest::Client::new();
let http_client = self.state.http_client.clone();
let scheme = self.state.outgoing_http_scheme.clone();
let mut handles = Vec::new();
for replica in &req.replicas {
let url = format!("http://{}/{}?type=replicate", replica.url, file_id);
let raw_target = format!("{}/{}?type=replicate", replica.url, file_id);
let url =
crate::server::volume_server::normalize_outgoing_http_url(&scheme, &raw_target)
.map_err(Status::internal)?;
let data_clone = data.clone();
let client_clone = http_client.clone();
let needle_id = req.needle_id;
@ -2844,7 +2848,11 @@ impl VolumeServer for VolumeGrpcService {
let store = self.state.store.read().unwrap();
let vids: Vec<VolumeId> = if req.volume_ids.is_empty() {
store.locations.iter().flat_map(|loc| loc.ec_volumes().map(|(vid, _)| *vid)).collect()
store
.locations
.iter()
.flat_map(|loc| loc.ec_volumes().map(|(vid, _)| *vid))
.collect()
} else {
req.volume_ids.iter().map(|&id| VolumeId(id)).collect()
};
@ -3094,7 +3102,7 @@ impl VolumeServer for VolumeGrpcService {
crc: n.checksum.0,
ttl: ttl_str,
},
))
));
}
Err(_) => {
return Err(Status::not_found(format!(
@ -3236,15 +3244,14 @@ fn to_grpc_endpoint(target: &str) -> Result<String, String> {
/// Ping a remote volume server target by actually calling its Ping RPC (matches Go behavior).
async fn ping_volume_server_target(target: &str) -> Result<i64, String> {
let addr = to_grpc_endpoint(target)?;
let channel = tonic::transport::Channel::from_shared(addr.clone())
.map_err(|e| e.to_string())?;
let channel =
tonic::transport::Channel::from_shared(addr.clone()).map_err(|e| e.to_string())?;
let channel = tokio::time::timeout(std::time::Duration::from_secs(5), channel.connect())
.await
.map_err(|_| "connection timeout".to_string())?
.map_err(|e| e.to_string())?;
let mut client =
volume_server_pb::volume_server_client::VolumeServerClient::new(channel);
let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel);
let resp = client
.ping(volume_server_pb::PingRequest {
target: String::new(),
@ -3258,8 +3265,8 @@ async fn ping_volume_server_target(target: &str) -> Result<i64, String> {
/// Ping a remote master target by actually calling its Ping RPC (matches Go behavior).
async fn ping_master_target(target: &str) -> Result<i64, String> {
let addr = to_grpc_endpoint(target)?;
let channel = tonic::transport::Channel::from_shared(addr.clone())
.map_err(|e| e.to_string())?;
let channel =
tonic::transport::Channel::from_shared(addr.clone()).map_err(|e| e.to_string())?;
let channel = tokio::time::timeout(std::time::Duration::from_secs(5), channel.connect())
.await
.map_err(|_| "connection timeout".to_string())?

279
seaweed-volume/src/server/handlers.rs

@ -14,7 +14,7 @@ use axum::http::{header, HeaderMap, Method, Request, StatusCode};
use axum::response::{IntoResponse, Response};
use serde::{Deserialize, Serialize};
use super::volume_server::VolumeServerState;
use super::volume_server::{normalize_outgoing_http_url, VolumeServerState};
use crate::config::ReadMode;
use crate::metrics;
use crate::storage::needle::needle::Needle;
@ -325,10 +325,14 @@ struct LookupResult {
/// Look up volume locations from the master via HTTP /dir/lookup.
async fn lookup_volume(
client: &reqwest::Client,
scheme: &str,
master_url: &str,
volume_id: u32,
) -> Result<Vec<VolumeLocation>, String> {
let url = format!("http://{}/dir/lookup?volumeId={}", master_url, volume_id);
let url = normalize_outgoing_http_url(
scheme,
&format!("{}/dir/lookup?volumeId={}", master_url, volume_id),
)?;
let resp = client
.get(&url)
.send()
@ -356,9 +360,14 @@ async fn do_replicated_request(
headers: &axum::http::HeaderMap,
body: Option<bytes::Bytes>,
) -> Result<(), String> {
let locations = lookup_volume(&state.http_client, &state.master_url, vid)
.await
.map_err(|e| format!("lookup volume failed: {}", e))?;
let locations = lookup_volume(
&state.http_client,
&state.outgoing_http_scheme,
&state.master_url,
vid,
)
.await
.map_err(|e| format!("lookup volume failed: {}", e))?;
let remote_locations: Vec<_> = locations
.into_iter()
@ -377,7 +386,10 @@ async fn do_replicated_request(
let mut futures = Vec::new();
for loc in remote_locations {
let url = format!("http://{}{}?{}", loc.url, path, new_query);
let url = normalize_outgoing_http_url(
&state.outgoing_http_scheme,
&format!("{}{}?{}", loc.url, path, new_query),
)?;
let client = state.http_client.clone();
let mut req_builder = client.request(method.clone(), &url);
@ -440,7 +452,14 @@ async fn proxy_or_redirect_to_target(
vid: VolumeId,
) -> Response {
// Look up volume locations from master
let locations = match lookup_volume(&state.http_client, &state.master_url, vid.0).await {
let locations = match lookup_volume(
&state.http_client,
&state.outgoing_http_scheme,
&state.master_url,
vid.0,
)
.await
{
Ok(locs) => locs,
Err(e) => {
tracing::warn!("volume lookup failed for {}: {}", vid.0, e);
@ -473,7 +492,7 @@ async fn proxy_or_redirect_to_target(
match state.read_mode {
ReadMode::Proxy => proxy_request(state, &info, target).await,
ReadMode::Redirect => redirect_request(&info, target),
ReadMode::Redirect => redirect_request(&info, target, &state.outgoing_http_scheme),
ReadMode::Local => unreachable!(),
}
}
@ -485,18 +504,23 @@ async fn proxy_request(
target: &VolumeLocation,
) -> Response {
// Build target URL, adding proxied=true query param
let scheme = "http";
let target_host = &target.url;
let path = info.path.trim_start_matches('/');
let target_url = if info.original_query.is_empty() {
format!("{}://{}/{}?proxied=true", scheme, target_host, path)
let raw_target = if info.original_query.is_empty() {
format!("{}/{}?proxied=true", target.url, path)
} else {
format!(
"{}://{}/{}?{}&proxied=true",
scheme, target_host, path, info.original_query
"{}/{}?{}&proxied=true",
target.url, path, info.original_query
)
};
let target_url = match normalize_outgoing_http_url(&state.outgoing_http_scheme, &raw_target) {
Ok(url) => url,
Err(e) => {
tracing::warn!("proxy target url {} invalid: {}", raw_target, e);
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
};
// Build the proxy request
let mut req_builder = state.http_client.get(&target_url);
@ -542,10 +566,7 @@ async fn proxy_request(
}
/// Return a redirect response to the target volume server.
fn redirect_request(info: &ProxyRequestInfo, target: &VolumeLocation) -> Response {
let scheme = "http";
let target_host = &target.public_url;
fn redirect_request(info: &ProxyRequestInfo, target: &VolumeLocation, scheme: &str) -> Response {
// Build query string: preserve collection, add proxied=true, drop readDeleted (Go parity)
let mut query_params = Vec::new();
if !info.original_query.is_empty() {
@ -561,10 +582,14 @@ fn redirect_request(info: &ProxyRequestInfo, target: &VolumeLocation) -> Respons
query_params.push("proxied=true".to_string());
let query = query_params.join("&");
let location = format!(
"{}://{}/{},{}?{}",
scheme, target_host, &info.vid_str, &info.fid_str, query
let raw_target = format!(
"{}/{},{}?{}",
target.public_url, &info.vid_str, &info.fid_str, query
);
let location = match normalize_outgoing_http_url(scheme, &raw_target) {
Ok(url) => url,
Err(_) => return StatusCode::INTERNAL_SERVER_ERROR.into_response(),
};
Response::builder()
.status(StatusCode::MOVED_PERMANENTLY)
@ -661,11 +686,12 @@ async fn get_or_head_handler_inner(
// so invalid paths with JWT enabled return 401, not 400.
let file_id = extract_file_id(&path);
let token = extract_jwt(&headers, request.uri());
if let Err(e) = state
.guard
.read()
.unwrap()
.check_jwt_for_file(token.as_deref(), &file_id, false)
if let Err(e) =
state
.guard
.read()
.unwrap()
.check_jwt_for_file(token.as_deref(), &file_id, false)
{
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
}
@ -881,8 +907,7 @@ async fn get_or_head_handler_inner(
let mut not_modified_headers = HeaderMap::new();
not_modified_headers.insert(header::ETAG, etag.parse().unwrap());
if let Some(ref lm) = last_modified_str {
not_modified_headers
.insert(header::LAST_MODIFIED, lm.parse().unwrap());
not_modified_headers.insert(header::LAST_MODIFIED, lm.parse().unwrap());
}
return (StatusCode::NOT_MODIFIED, not_modified_headers).into_response();
}
@ -1099,8 +1124,8 @@ async fn get_or_head_handler_inner(
let mut data = n.data;
// Check if image operations are needed — must decompress first regardless of Accept-Encoding
let needs_image_ops = is_image
&& (query.width.is_some() || query.height.is_some() || query.mode.is_some());
let needs_image_ops =
is_image && (query.width.is_some() || query.height.is_some() || query.mode.is_some());
if is_compressed {
if needs_image_ops {
@ -1432,7 +1457,8 @@ pub async fn post_handler(
let path = request.uri().path().to_string();
let query = request.uri().query().unwrap_or("").to_string();
let headers = request.headers().clone();
let query_fields: Vec<(String, String)> = serde_urlencoded::from_str(&query).unwrap_or_default();
let query_fields: Vec<(String, String)> =
serde_urlencoded::from_str(&query).unwrap_or_default();
let (vid, needle_id, cookie) = match parse_url_path(&path) {
Some(parsed) => parsed,
@ -1557,90 +1583,89 @@ pub async fn post_handler(
parsed_content_encoding,
parsed_content_md5,
multipart_form_fields,
) =
if content_type_str.starts_with("multipart/form-data") {
// Extract boundary from Content-Type
let boundary = content_type_str
.split(';')
.find_map(|part| {
let part = part.trim();
if let Some(val) = part.strip_prefix("boundary=") {
Some(val.trim_matches('"').to_string())
} else {
None
}
})
.unwrap_or_default();
) = if content_type_str.starts_with("multipart/form-data") {
// Extract boundary from Content-Type
let boundary = content_type_str
.split(';')
.find_map(|part| {
let part = part.trim();
if let Some(val) = part.strip_prefix("boundary=") {
Some(val.trim_matches('"').to_string())
} else {
None
}
})
.unwrap_or_default();
let mut multipart = multer::Multipart::new(
futures::stream::once(async { Ok::<_, std::io::Error>(body.clone()) }),
boundary,
);
let mut multipart = multer::Multipart::new(
futures::stream::once(async { Ok::<_, std::io::Error>(body.clone()) }),
boundary,
);
let mut file_data: Option<Vec<u8>> = None;
let mut file_name: Option<String> = None;
let mut file_content_type: Option<String> = None;
let mut file_content_encoding: Option<String> = None;
let mut file_content_md5: Option<String> = None;
let mut form_fields = std::collections::HashMap::new();
while let Ok(Some(field)) = multipart.next_field().await {
let field_name = field.name().map(|s| s.to_string());
let fname = field.file_name().map(|s| {
// Clean Windows backslashes
let cleaned = s.replace('\\', "/");
cleaned.rsplit('/').next().unwrap_or(&cleaned).to_string()
});
let fct = field.content_type().map(|m| m.to_string());
let field_headers = field.headers().clone();
let fce = field_headers
.get(header::CONTENT_ENCODING)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let fmd5 = field_headers
.get("Content-MD5")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
if let Ok(data) = field.bytes().await {
if file_data.is_none() && fname.is_some() {
// First file field
file_data = Some(data.to_vec());
file_name = fname;
file_content_type = fct;
file_content_encoding = fce;
file_content_md5 = fmd5;
} else if let Some(name) = field_name {
form_fields
.entry(name)
.or_insert_with(|| String::from_utf8_lossy(&data).to_string());
}
let mut file_data: Option<Vec<u8>> = None;
let mut file_name: Option<String> = None;
let mut file_content_type: Option<String> = None;
let mut file_content_encoding: Option<String> = None;
let mut file_content_md5: Option<String> = None;
let mut form_fields = std::collections::HashMap::new();
while let Ok(Some(field)) = multipart.next_field().await {
let field_name = field.name().map(|s| s.to_string());
let fname = field.file_name().map(|s| {
// Clean Windows backslashes
let cleaned = s.replace('\\', "/");
cleaned.rsplit('/').next().unwrap_or(&cleaned).to_string()
});
let fct = field.content_type().map(|m| m.to_string());
let field_headers = field.headers().clone();
let fce = field_headers
.get(header::CONTENT_ENCODING)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let fmd5 = field_headers
.get("Content-MD5")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
if let Ok(data) = field.bytes().await {
if file_data.is_none() && fname.is_some() {
// First file field
file_data = Some(data.to_vec());
file_name = fname;
file_content_type = fct;
file_content_encoding = fce;
file_content_md5 = fmd5;
} else if let Some(name) = field_name {
form_fields
.entry(name)
.or_insert_with(|| String::from_utf8_lossy(&data).to_string());
}
}
}
if let Some(data) = file_data {
(
data,
file_name.unwrap_or_default(),
file_content_type,
file_content_encoding,
file_content_md5,
form_fields,
)
} else {
// No file field found, use raw body
(body.to_vec(), String::new(), None, None, None, form_fields)
}
} else {
if let Some(data) = file_data {
(
body.to_vec(),
String::new(),
None,
None,
None,
std::collections::HashMap::new(),
data,
file_name.unwrap_or_default(),
file_content_type,
file_content_encoding,
file_content_md5,
form_fields,
)
};
} else {
// No file field found, use raw body
(body.to_vec(), String::new(), None, None, None, form_fields)
}
} else {
(
body.to_vec(),
String::new(),
None,
None,
None,
std::collections::HashMap::new(),
)
};
let form_value = |name: &str| {
query_fields
@ -2639,9 +2664,9 @@ fn json_error_with_query(
let body = serde_json::json!({"error": msg.into()});
let (is_pretty, callback) = if let Some(q) = query {
let pretty = q.split('&').any(|p| {
p.starts_with("pretty=") && p.len() > "pretty=".len()
});
let pretty = q
.split('&')
.any(|p| p.starts_with("pretty=") && p.len() > "pretty=".len());
let cb = q
.split('&')
.find_map(|p| p.strip_prefix("callback="))
@ -2941,4 +2966,36 @@ mod tests {
128 * 1024
);
}
#[test]
fn test_normalize_outgoing_http_url_rewrites_scheme() {
let url = normalize_outgoing_http_url(
"https",
"http://master.example.com:9333/dir/lookup?volumeId=7",
)
.unwrap();
assert_eq!(url, "https://master.example.com:9333/dir/lookup?volumeId=7");
}
#[test]
fn test_redirect_request_uses_outgoing_http_scheme() {
let info = ProxyRequestInfo {
original_headers: HeaderMap::new(),
original_query: "collection=photos&readDeleted=true".to_string(),
path: "/3,01637037d6".to_string(),
vid_str: "3".to_string(),
fid_str: "01637037d6".to_string(),
};
let target = VolumeLocation {
url: "volume.internal:8080".to_string(),
public_url: "volume.public:8080".to_string(),
};
let response = redirect_request(&info, &target, "https");
assert_eq!(response.status(), StatusCode::MOVED_PERMANENTLY);
assert_eq!(
response.headers().get(header::LOCATION).unwrap(),
"https://volume.public:8080/3,01637037d6?collection=photos&proxied=true"
);
}
}

19
seaweed-volume/src/server/heartbeat.rs

@ -632,7 +632,13 @@ mod tests {
fn test_state_with_store(store: Store) -> Arc<VolumeServerState> {
Arc::new(VolumeServerState {
store: RwLock::new(store),
guard: RwLock::new(Guard::new(&[], SigningKey(vec![]), 0, SigningKey(vec![]), 0)),
guard: RwLock::new(Guard::new(
&[],
SigningKey(vec![]),
0,
SigningKey(vec![]),
0,
)),
is_stopping: RwLock::new(false),
maintenance: std::sync::atomic::AtomicBool::new(false),
state_version: std::sync::atomic::AtomicU32::new(0),
@ -657,6 +663,7 @@ mod tests {
master_url: String::new(),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),
metrics_runtime: std::sync::RwLock::new(Default::default()),
metrics_notify: tokio::sync::Notify::new(),
has_slow_read: true,
@ -684,7 +691,15 @@ mod tests {
)
.unwrap();
store
.add_volume(VolumeId(7), "pics", None, None, 0, DiskType::HardDrive, Version::current())
.add_volume(
VolumeId(7),
"pics",
None,
None,
0,
DiskType::HardDrive,
Version::current(),
)
.unwrap();
let heartbeat = build_heartbeat(&test_config(), &store);

13
seaweed-volume/src/server/volume_server.rs

@ -79,6 +79,8 @@ pub struct VolumeServerState {
pub self_url: String,
/// HTTP client for proxy requests and master lookups.
pub http_client: reqwest::Client,
/// Scheme used for outgoing master and peer HTTP requests ("http" or "https").
pub outgoing_http_scheme: String,
/// Metrics push settings learned from master heartbeat responses.
pub metrics_runtime: std::sync::RwLock<RuntimeMetricsConfig>,
pub metrics_notify: tokio::sync::Notify,
@ -105,6 +107,17 @@ pub fn build_metrics_router() -> Router {
Router::new().route("/metrics", get(handlers::metrics_handler))
}
pub fn normalize_outgoing_http_url(scheme: &str, raw_target: &str) -> Result<String, String> {
if raw_target.starts_with("http://") || raw_target.starts_with("https://") {
let mut url = reqwest::Url::parse(raw_target)
.map_err(|e| format!("invalid url {}: {}", raw_target, e))?;
url.set_scheme(scheme)
.map_err(|_| format!("invalid scheme {}", scheme))?;
return Ok(url.to_string());
}
Ok(format!("{}://{}", scheme, raw_target))
}
/// Middleware: set Server header, echo x-amz-request-id, set CORS if Origin present.
async fn common_headers_middleware(request: Request, next: Next) -> Response {
let origin = request.headers().get("origin").cloned();

1
seaweed-volume/src/server/write_queue.rs

@ -198,6 +198,7 @@ mod tests {
master_url: String::new(),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),
metrics_runtime: std::sync::RwLock::new(RuntimeMetricsConfig::default()),
metrics_notify: tokio::sync::Notify::new(),
has_slow_read: true,

1
seaweed-volume/tests/http_integration.rs

@ -83,6 +83,7 @@ fn test_state_with_signing_key(signing_key: Vec<u8>) -> (Arc<VolumeServerState>,
master_url: String::new(),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),
metrics_runtime: std::sync::RwLock::new(
seaweed_volume::server::volume_server::RuntimeMetricsConfig::default(),
),

Loading…
Cancel
Save