diff --git a/seaweed-volume/src/server/grpc_client.rs b/seaweed-volume/src/server/grpc_client.rs index bdc372b42..ebe49397a 100644 --- a/seaweed-volume/src/server/grpc_client.rs +++ b/seaweed-volume/src/server/grpc_client.rs @@ -1,11 +1,17 @@ use std::error::Error; use std::fmt; +use std::time::Duration; use hyper::http::Uri; use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Identity}; use crate::config::VolumeServerConfig; +pub const GRPC_MAX_MESSAGE_SIZE: usize = 1 << 30; +const GRPC_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(60); +const GRPC_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20); +const GRPC_INITIAL_WINDOW_SIZE: u32 = 16 * 1024 * 1024; + #[derive(Clone, Debug)] pub struct OutgoingGrpcTlsConfig { cert_pem: String, @@ -71,7 +77,13 @@ pub fn build_grpc_endpoint( ) -> Result { let uri = grpc_endpoint_uri(grpc_host_port, tls); let mut endpoint = Channel::from_shared(uri.clone()) - .map_err(|e| GrpcClientError(format!("invalid gRPC endpoint {}: {}", uri, e)))?; + .map_err(|e| GrpcClientError(format!("invalid gRPC endpoint {}: {}", uri, e)))? + .http2_keep_alive_interval(GRPC_KEEPALIVE_INTERVAL) + .keep_alive_timeout(GRPC_KEEPALIVE_TIMEOUT) + .keep_alive_while_idle(false) + .initial_stream_window_size(Some(GRPC_INITIAL_WINDOW_SIZE)) + .initial_connection_window_size(Some(GRPC_INITIAL_WINDOW_SIZE)) + .http2_adaptive_window(false); if let Some(tls) = tls { let parsed = uri diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 29b5111b2..adc3514bf 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -19,7 +19,7 @@ use crate::pb::volume_server_pb::volume_server_server::VolumeServer; use crate::storage::needle::needle::{self, Needle}; use crate::storage::types::*; -use super::grpc_client::build_grpc_endpoint; +use super::grpc_client::{build_grpc_endpoint, GRPC_MAX_MESSAGE_SIZE}; use super::volume_server::VolumeServerState; type BoxStream = Pin> + Send + 'static>>; @@ -59,7 +59,9 @@ impl VolumeGrpcService { .connect() .await .map_err(|e| Status::internal(format!("connect to master {}: {}", master_url, e)))?; - let mut client = SeaweedClient::new(channel); + let mut client = SeaweedClient::new(channel) + .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE) + .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE); client .volume_mark_readonly(master_pb::VolumeMarkReadonlyRequest { ip: info.ip.clone(), @@ -856,7 +858,9 @@ impl VolumeServer for VolumeGrpcService { )) })?; - let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel); + let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel) + .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE) + .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE); // Get file status from source let vol_info = client @@ -1577,7 +1581,9 @@ impl VolumeServer for VolumeGrpcService { .await .map_err(|e| Status::internal(format!("connect to {}: {}", grpc_addr, e)))?; - let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel); + let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel) + .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE) + .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE); // Call VolumeTailSender on source let mut stream = client @@ -1841,7 +1847,9 @@ impl VolumeServer for VolumeGrpcService { )) })?; - let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel); + let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel) + .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE) + .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE); // Copy each shard for &shard_id in &req.shard_ids { @@ -3258,7 +3266,9 @@ async fn ping_volume_server_target( .map_err(|_| "connection timeout".to_string())? .map_err(|e| e.to_string())?; - let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel); + let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel) + .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE) + .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE); let resp = client .ping(volume_server_pb::PingRequest { target: String::new(), @@ -3280,7 +3290,9 @@ async fn ping_master_target( .map_err(|_| "connection timeout".to_string())? .map_err(|e| e.to_string())?; - let mut client = master_pb::seaweed_client::SeaweedClient::new(channel); + let mut client = master_pb::seaweed_client::SeaweedClient::new(channel) + .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE) + .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE); let resp = client .ping(master_pb::PingRequest { target: String::new(), @@ -3302,7 +3314,9 @@ async fn ping_filer_target( .map_err(|_| "connection timeout".to_string())? .map_err(|e| e.to_string())?; - let mut client = filer_pb::seaweed_filer_client::SeaweedFilerClient::new(channel); + let mut client = filer_pb::seaweed_filer_client::SeaweedFilerClient::new(channel) + .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE) + .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE); let resp = client .ping(filer_pb::PingRequest::default()) .await diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index ed84d253f..dfe1477c1 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -11,7 +11,7 @@ use std::time::Duration; use tokio::sync::broadcast; use tracing::{error, info, warn}; -use super::grpc_client::build_grpc_endpoint; +use super::grpc_client::{build_grpc_endpoint, GRPC_MAX_MESSAGE_SIZE}; use super::volume_server::VolumeServerState; use crate::pb::master_pb; use crate::pb::master_pb::seaweed_client::SeaweedClient; @@ -222,7 +222,9 @@ async fn try_get_master_configuration( .timeout(Duration::from_secs(10)) .connect() .await?; - let mut client = SeaweedClient::new(channel); + let mut client = SeaweedClient::new(channel) + .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE) + .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE); let resp = client .get_master_configuration(master_pb::GetMasterConfigurationRequest {}) .await?; @@ -247,7 +249,9 @@ async fn do_heartbeat( .connect() .await?; - let mut client = SeaweedClient::new(channel); + let mut client = SeaweedClient::new(channel) + .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE) + .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE); let (tx, rx) = tokio::sync::mpsc::channel::(32);