diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index 1ebeb9a3c..8d3240ed2 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -7,6 +7,7 @@ use seaweed_volume::metrics; use seaweed_volume::pb::volume_server_pb::volume_server_server::VolumeServerServer; use seaweed_volume::security::{Guard, SigningKey}; use seaweed_volume::server::debug::build_debug_router; +use seaweed_volume::server::grpc_client::load_outgoing_grpc_tls; use seaweed_volume::server::grpc_server::VolumeGrpcService; use seaweed_volume::server::volume_server::{ build_metrics_router, RuntimeMetricsConfig, VolumeServerState, @@ -206,6 +207,7 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box Result<(), Box) -> fmt::Result { + f.write_str(&self.0) + } +} + +impl Error for GrpcClientError {} + +pub fn load_outgoing_grpc_tls( + config: &VolumeServerConfig, +) -> Result, GrpcClientError> { + if config.grpc_cert_file.is_empty() + || config.grpc_key_file.is_empty() + || config.grpc_ca_file.is_empty() + { + return Ok(None); + } + + let cert_pem = std::fs::read_to_string(&config.grpc_cert_file).map_err(|e| { + GrpcClientError(format!( + "Failed to read outgoing gRPC cert '{}': {}", + config.grpc_cert_file, e + )) + })?; + let key_pem = std::fs::read_to_string(&config.grpc_key_file).map_err(|e| { + GrpcClientError(format!( + "Failed to read outgoing gRPC key '{}': {}", + config.grpc_key_file, e + )) + })?; + let ca_pem = std::fs::read_to_string(&config.grpc_ca_file).map_err(|e| { + GrpcClientError(format!( + "Failed to read outgoing gRPC CA '{}': {}", + config.grpc_ca_file, e + )) + })?; + + Ok(Some(OutgoingGrpcTlsConfig { + cert_pem, + key_pem, + ca_pem, + })) +} + +pub fn grpc_endpoint_uri(grpc_host_port: &str, tls: Option<&OutgoingGrpcTlsConfig>) -> String { + let scheme = if tls.is_some() { "https" } else { "http" }; + format!("{}://{}", scheme, grpc_host_port) +} + +pub fn build_grpc_endpoint( + grpc_host_port: &str, + tls: Option<&OutgoingGrpcTlsConfig>, +) -> Result { + let uri = grpc_endpoint_uri(grpc_host_port, tls); + let mut endpoint = Channel::from_shared(uri.clone()) + .map_err(|e| GrpcClientError(format!("invalid gRPC endpoint {}: {}", uri, e)))?; + + if let Some(tls) = tls { + let parsed = uri + .parse::() + .map_err(|e| GrpcClientError(format!("invalid gRPC endpoint {}: {}", uri, e)))?; + let host = parsed + .host() + .ok_or_else(|| GrpcClientError(format!("missing host in gRPC endpoint {}", uri)))?; + let tls_config = ClientTlsConfig::new() + .identity(Identity::from_pem( + tls.cert_pem.clone(), + tls.key_pem.clone(), + )) + .ca_certificate(Certificate::from_pem(tls.ca_pem.clone())) + .domain_name(host.to_string()); + endpoint = endpoint.tls_config(tls_config).map_err(|e| { + GrpcClientError(format!("configure gRPC TLS for {} failed: {}", uri, e)) + })?; + } + + Ok(endpoint) +} + +#[cfg(test)] +mod tests { + use super::{build_grpc_endpoint, grpc_endpoint_uri, load_outgoing_grpc_tls}; + use crate::config::{NeedleMapKind, ReadMode, VolumeServerConfig}; + + fn sample_config() -> VolumeServerConfig { + VolumeServerConfig { + port: 8080, + grpc_port: 18080, + public_port: 8080, + ip: "127.0.0.1".to_string(), + bind_ip: String::new(), + public_url: "127.0.0.1:8080".to_string(), + id: String::new(), + masters: vec![], + pre_stop_seconds: 0, + idle_timeout: 0, + data_center: String::new(), + rack: String::new(), + index_type: NeedleMapKind::InMemory, + disk_type: String::new(), + folders: vec![], + folder_max_limits: vec![], + folder_tags: vec![], + min_free_spaces: vec![], + disk_types: vec![], + idx_folder: String::new(), + white_list: vec![], + fix_jpg_orientation: false, + read_mode: ReadMode::Local, + compaction_byte_per_second: 0, + maintenance_byte_per_second: 0, + file_size_limit_bytes: 0, + concurrent_upload_limit: 0, + concurrent_download_limit: 0, + inflight_upload_data_timeout: std::time::Duration::from_secs(0), + inflight_download_data_timeout: std::time::Duration::from_secs(0), + has_slow_read: false, + read_buffer_size_mb: 0, + ldb_timeout: 0, + pprof: false, + metrics_port: 0, + metrics_ip: String::new(), + debug: false, + debug_port: 0, + ui_enabled: false, + jwt_signing_key: vec![], + jwt_signing_expires_seconds: 0, + jwt_read_signing_key: vec![], + jwt_read_signing_expires_seconds: 0, + https_cert_file: String::new(), + https_key_file: String::new(), + https_ca_file: String::new(), + https_client_enabled: false, + https_client_cert_file: String::new(), + https_client_key_file: String::new(), + https_client_ca_file: String::new(), + grpc_cert_file: String::new(), + grpc_key_file: String::new(), + grpc_ca_file: String::new(), + enable_write_queue: false, + security_file: String::new(), + } + } + + #[test] + fn test_grpc_endpoint_uri_uses_https_when_tls_enabled() { + let tls = super::OutgoingGrpcTlsConfig { + cert_pem: "cert".to_string(), + key_pem: "key".to_string(), + ca_pem: "ca".to_string(), + }; + assert_eq!( + grpc_endpoint_uri("master.example.com:19333", Some(&tls)), + "https://master.example.com:19333" + ); + } + + #[test] + fn test_load_outgoing_grpc_tls_requires_cert_key_and_ca() { + let mut config = sample_config(); + config.grpc_cert_file = "/tmp/client.pem".to_string(); + assert!(load_outgoing_grpc_tls(&config).unwrap().is_none()); + } + + #[test] + fn test_build_grpc_endpoint_without_tls_uses_http_scheme() { + let endpoint = build_grpc_endpoint("127.0.0.1:19333", None).unwrap(); + assert_eq!(endpoint.uri().scheme_str(), Some("http")); + } +} diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 3a9d552d3..29b5111b2 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -19,6 +19,7 @@ use crate::pb::volume_server_pb::volume_server_server::VolumeServer; use crate::storage::needle::needle::{self, Needle}; use crate::storage::types::*; +use super::grpc_client::build_grpc_endpoint; use super::volume_server::VolumeServerState; type BoxStream = Pin> + Send + 'static>>; @@ -50,7 +51,7 @@ impl VolumeGrpcService { let grpc_addr = parse_grpc_address(&master_url).map_err(|e| { Status::internal(format!("invalid master address {}: {}", master_url, e)) })?; - let endpoint = tonic::transport::Channel::from_shared(format!("http://{}", grpc_addr)) + let endpoint = build_grpc_endpoint(&grpc_addr, self.state.outgoing_grpc_tls.as_ref()) .map_err(|e| Status::internal(format!("master address {}: {}", master_url, e)))? .connect_timeout(std::time::Duration::from_secs(5)) .timeout(std::time::Duration::from_secs(30)); @@ -842,7 +843,7 @@ impl VolumeServer for VolumeGrpcService { )) })?; - let channel = tonic::transport::Channel::from_shared(format!("http://{}", grpc_addr)) + let channel = build_grpc_endpoint(&grpc_addr, self.state.outgoing_grpc_tls.as_ref()) .map_err(|e| { Status::internal(format!("VolumeCopy volume {} parse source: {}", vid, e)) })? @@ -1570,7 +1571,7 @@ impl VolumeServer for VolumeGrpcService { let grpc_addr = parse_grpc_address(source) .map_err(|e| Status::internal(format!("invalid source address {}: {}", source, e)))?; - let channel = tonic::transport::Channel::from_shared(format!("http://{}", grpc_addr)) + let channel = build_grpc_endpoint(&grpc_addr, self.state.outgoing_grpc_tls.as_ref()) .map_err(|e| Status::internal(format!("parse source: {}", e)))? .connect() .await @@ -1824,7 +1825,7 @@ impl VolumeServer for VolumeGrpcService { )) })?; - let channel = tonic::transport::Channel::from_shared(format!("http://{}", grpc_addr)) + let channel = build_grpc_endpoint(&grpc_addr, self.state.outgoing_grpc_tls.as_ref()) .map_err(|e| { Status::internal(format!( "VolumeEcShardsCopy volume {} parse source: {}", @@ -3191,7 +3192,9 @@ impl VolumeServer for VolumeGrpcService { // Route ping based on target type (matches Go's volume_grpc_admin.go Ping) let remote_time_ns = if req.target_type == "volumeServer" { - match ping_volume_server_target(&req.target).await { + match ping_volume_server_target(&req.target, self.state.outgoing_grpc_tls.as_ref()) + .await + { Ok(t) => t, Err(e) => { return Err(Status::internal(format!( @@ -3202,7 +3205,7 @@ impl VolumeServer for VolumeGrpcService { } } else if req.target_type == "master" { // Connect to target master and call its Ping RPC - match ping_master_target(&req.target).await { + match ping_master_target(&req.target, self.state.outgoing_grpc_tls.as_ref()).await { Ok(t) => t, Err(e) => { return Err(Status::internal(format!( @@ -3212,7 +3215,7 @@ impl VolumeServer for VolumeGrpcService { } } } else if req.target_type == "filer" { - match ping_filer_target(&req.target).await { + match ping_filer_target(&req.target, self.state.outgoing_grpc_tls.as_ref()).await { Ok(t) => t, Err(e) => { return Err(Status::internal(format!( @@ -3235,18 +3238,22 @@ impl VolumeServer for VolumeGrpcService { } } -/// Build a gRPC endpoint URL from a SeaweedFS server address. -fn to_grpc_endpoint(target: &str) -> Result { +/// Build a gRPC endpoint from a SeaweedFS server address. +fn to_grpc_endpoint( + target: &str, + tls: Option<&super::grpc_client::OutgoingGrpcTlsConfig>, +) -> Result { let grpc_host_port = parse_grpc_address(target)?; - Ok(format!("http://{}", grpc_host_port)) + build_grpc_endpoint(&grpc_host_port, tls).map_err(|e| e.to_string()) } /// Ping a remote volume server target by actually calling its Ping RPC (matches Go behavior). -async fn ping_volume_server_target(target: &str) -> Result { - let addr = to_grpc_endpoint(target)?; - let channel = - tonic::transport::Channel::from_shared(addr.clone()).map_err(|e| e.to_string())?; - let channel = tokio::time::timeout(std::time::Duration::from_secs(5), channel.connect()) +async fn ping_volume_server_target( + target: &str, + tls: Option<&super::grpc_client::OutgoingGrpcTlsConfig>, +) -> Result { + let endpoint = to_grpc_endpoint(target, tls)?; + let channel = tokio::time::timeout(std::time::Duration::from_secs(5), endpoint.connect()) .await .map_err(|_| "connection timeout".to_string())? .map_err(|e| e.to_string())?; @@ -3263,11 +3270,12 @@ async fn ping_volume_server_target(target: &str) -> Result { } /// Ping a remote master target by actually calling its Ping RPC (matches Go behavior). -async fn ping_master_target(target: &str) -> Result { - let addr = to_grpc_endpoint(target)?; - let channel = - tonic::transport::Channel::from_shared(addr.clone()).map_err(|e| e.to_string())?; - let channel = tokio::time::timeout(std::time::Duration::from_secs(5), channel.connect()) +async fn ping_master_target( + target: &str, + tls: Option<&super::grpc_client::OutgoingGrpcTlsConfig>, +) -> Result { + let endpoint = to_grpc_endpoint(target, tls)?; + let channel = tokio::time::timeout(std::time::Duration::from_secs(5), endpoint.connect()) .await .map_err(|_| "connection timeout".to_string())? .map_err(|e| e.to_string())?; @@ -3284,10 +3292,12 @@ async fn ping_master_target(target: &str) -> Result { } /// Ping a remote filer target by calling its Ping RPC (matches Go behavior). -async fn ping_filer_target(target: &str) -> Result { - let addr = to_grpc_endpoint(target)?; - let channel = tonic::transport::Channel::from_shared(addr).map_err(|e| e.to_string())?; - let channel = tokio::time::timeout(std::time::Duration::from_secs(5), channel.connect()) +async fn ping_filer_target( + target: &str, + tls: Option<&super::grpc_client::OutgoingGrpcTlsConfig>, +) -> Result { + let endpoint = to_grpc_endpoint(target, tls)?; + let channel = tokio::time::timeout(std::time::Duration::from_secs(5), endpoint.connect()) .await .map_err(|_| "connection timeout".to_string())? .map_err(|e| e.to_string())?; diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index 40e219e97..ed84d253f 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -9,9 +9,9 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast; -use tonic::transport::Channel; use tracing::{error, info, warn}; +use super::grpc_client::build_grpc_endpoint; use super::volume_server::VolumeServerState; use crate::pb::master_pb; use crate::pb::master_pb::seaweed_client::SeaweedClient; @@ -168,16 +168,16 @@ pub async fn run_heartbeat_with_state( } } -/// Convert a master address "host:port" to a gRPC endpoint URL. +/// Convert a master address "host:port" to a gRPC host:port target. /// The Go master uses port + 10000 for gRPC by default. fn to_grpc_address(master_addr: &str) -> String { if let Some((host, port_str)) = master_addr.rsplit_once(':') { if let Ok(port) = port_str.parse::() { let grpc_port = port + 10000; - return format!("http://{}:{}", host, grpc_port); + return format!("{}:{}", host, grpc_port); } } - format!("http://{}", master_addr) + master_addr.to_string() } /// Call GetMasterConfiguration on seed masters before starting the heartbeat loop. @@ -188,7 +188,7 @@ async fn check_with_master(config: &HeartbeatConfig, state: &Arc { let changed = apply_metrics_push_settings( state, @@ -215,8 +215,9 @@ async fn check_with_master(config: &HeartbeatConfig, state: &Arc, ) -> Result> { - let channel = Channel::from_shared(grpc_addr.to_string())? + let channel = build_grpc_endpoint(grpc_addr, tls)? .connect_timeout(Duration::from_secs(5)) .timeout(Duration::from_secs(10)) .connect() @@ -240,7 +241,7 @@ async fn do_heartbeat( pulse: Duration, shutdown_rx: &mut broadcast::Receiver<()>, ) -> Result, Box> { - let channel = Channel::from_shared(grpc_addr.to_string())? + let channel = build_grpc_endpoint(grpc_addr, state.outgoing_grpc_tls.as_ref())? .connect_timeout(Duration::from_secs(5)) .timeout(Duration::from_secs(30)) .connect() @@ -664,6 +665,7 @@ mod tests { self_url: String::new(), http_client: reqwest::Client::new(), outgoing_http_scheme: "http".to_string(), + outgoing_grpc_tls: None, metrics_runtime: std::sync::RwLock::new(Default::default()), metrics_notify: tokio::sync::Notify::new(), has_slow_read: true, diff --git a/seaweed-volume/src/server/mod.rs b/seaweed-volume/src/server/mod.rs index 6c15dd112..6f2768f16 100644 --- a/seaweed-volume/src/server/mod.rs +++ b/seaweed-volume/src/server/mod.rs @@ -1,4 +1,5 @@ pub mod debug; +pub mod grpc_client; pub mod grpc_server; pub mod handlers; pub mod heartbeat; diff --git a/seaweed-volume/src/server/volume_server.rs b/seaweed-volume/src/server/volume_server.rs index 767905b51..64a481ddd 100644 --- a/seaweed-volume/src/server/volume_server.rs +++ b/seaweed-volume/src/server/volume_server.rs @@ -25,6 +25,7 @@ use crate::config::ReadMode; use crate::security::Guard; use crate::storage::store::Store; +use super::grpc_client::OutgoingGrpcTlsConfig; use super::handlers; use super::write_queue::WriteQueue; @@ -81,6 +82,8 @@ pub struct VolumeServerState { pub http_client: reqwest::Client, /// Scheme used for outgoing master and peer HTTP requests ("http" or "https"). pub outgoing_http_scheme: String, + /// Optional client TLS material for outgoing gRPC connections. + pub outgoing_grpc_tls: Option, /// Metrics push settings learned from master heartbeat responses. pub metrics_runtime: std::sync::RwLock, pub metrics_notify: tokio::sync::Notify, diff --git a/seaweed-volume/src/server/write_queue.rs b/seaweed-volume/src/server/write_queue.rs index 6c2386486..3a2aadf58 100644 --- a/seaweed-volume/src/server/write_queue.rs +++ b/seaweed-volume/src/server/write_queue.rs @@ -199,6 +199,7 @@ mod tests { self_url: String::new(), http_client: reqwest::Client::new(), outgoing_http_scheme: "http".to_string(), + outgoing_grpc_tls: None, metrics_runtime: std::sync::RwLock::new(RuntimeMetricsConfig::default()), metrics_notify: tokio::sync::Notify::new(), has_slow_read: true, diff --git a/seaweed-volume/tests/http_integration.rs b/seaweed-volume/tests/http_integration.rs index db1f5b89d..82fd8aa6e 100644 --- a/seaweed-volume/tests/http_integration.rs +++ b/seaweed-volume/tests/http_integration.rs @@ -84,6 +84,7 @@ fn test_state_with_signing_key(signing_key: Vec) -> (Arc, self_url: String::new(), http_client: reqwest::Client::new(), outgoing_http_scheme: "http".to_string(), + outgoing_grpc_tls: None, metrics_runtime: std::sync::RwLock::new( seaweed_volume::server::volume_server::RuntimeMetricsConfig::default(), ),