Browse Source

Match Go gRPC client transport defaults

rust-volume-server
Chris Lu 4 days ago
parent
commit
e9b91aab61
  1. 14
      seaweed-volume/src/server/grpc_client.rs
  2. 30
      seaweed-volume/src/server/grpc_server.rs
  3. 10
      seaweed-volume/src/server/heartbeat.rs

14
seaweed-volume/src/server/grpc_client.rs

@ -1,11 +1,17 @@
use std::error::Error;
use std::fmt;
use std::time::Duration;
use hyper::http::Uri;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Identity};
use crate::config::VolumeServerConfig;
pub const GRPC_MAX_MESSAGE_SIZE: usize = 1 << 30;
const GRPC_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(60);
const GRPC_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
const GRPC_INITIAL_WINDOW_SIZE: u32 = 16 * 1024 * 1024;
#[derive(Clone, Debug)]
pub struct OutgoingGrpcTlsConfig {
cert_pem: String,
@ -71,7 +77,13 @@ pub fn build_grpc_endpoint(
) -> Result<Endpoint, GrpcClientError> {
let uri = grpc_endpoint_uri(grpc_host_port, tls);
let mut endpoint = Channel::from_shared(uri.clone())
.map_err(|e| GrpcClientError(format!("invalid gRPC endpoint {}: {}", uri, e)))?;
.map_err(|e| GrpcClientError(format!("invalid gRPC endpoint {}: {}", uri, e)))?
.http2_keep_alive_interval(GRPC_KEEPALIVE_INTERVAL)
.keep_alive_timeout(GRPC_KEEPALIVE_TIMEOUT)
.keep_alive_while_idle(false)
.initial_stream_window_size(Some(GRPC_INITIAL_WINDOW_SIZE))
.initial_connection_window_size(Some(GRPC_INITIAL_WINDOW_SIZE))
.http2_adaptive_window(false);
if let Some(tls) = tls {
let parsed = uri

30
seaweed-volume/src/server/grpc_server.rs

@ -19,7 +19,7 @@ use crate::pb::volume_server_pb::volume_server_server::VolumeServer;
use crate::storage::needle::needle::{self, Needle};
use crate::storage::types::*;
use super::grpc_client::build_grpc_endpoint;
use super::grpc_client::{build_grpc_endpoint, GRPC_MAX_MESSAGE_SIZE};
use super::volume_server::VolumeServerState;
type BoxStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + 'static>>;
@ -59,7 +59,9 @@ impl VolumeGrpcService {
.connect()
.await
.map_err(|e| Status::internal(format!("connect to master {}: {}", master_url, e)))?;
let mut client = SeaweedClient::new(channel);
let mut client = SeaweedClient::new(channel)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
client
.volume_mark_readonly(master_pb::VolumeMarkReadonlyRequest {
ip: info.ip.clone(),
@ -856,7 +858,9 @@ impl VolumeServer for VolumeGrpcService {
))
})?;
let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel);
let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
// Get file status from source
let vol_info = client
@ -1577,7 +1581,9 @@ impl VolumeServer for VolumeGrpcService {
.await
.map_err(|e| Status::internal(format!("connect to {}: {}", grpc_addr, e)))?;
let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel);
let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
// Call VolumeTailSender on source
let mut stream = client
@ -1841,7 +1847,9 @@ impl VolumeServer for VolumeGrpcService {
))
})?;
let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel);
let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
// Copy each shard
for &shard_id in &req.shard_ids {
@ -3258,7 +3266,9 @@ async fn ping_volume_server_target(
.map_err(|_| "connection timeout".to_string())?
.map_err(|e| e.to_string())?;
let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel);
let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let resp = client
.ping(volume_server_pb::PingRequest {
target: String::new(),
@ -3280,7 +3290,9 @@ async fn ping_master_target(
.map_err(|_| "connection timeout".to_string())?
.map_err(|e| e.to_string())?;
let mut client = master_pb::seaweed_client::SeaweedClient::new(channel);
let mut client = master_pb::seaweed_client::SeaweedClient::new(channel)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let resp = client
.ping(master_pb::PingRequest {
target: String::new(),
@ -3302,7 +3314,9 @@ async fn ping_filer_target(
.map_err(|_| "connection timeout".to_string())?
.map_err(|e| e.to_string())?;
let mut client = filer_pb::seaweed_filer_client::SeaweedFilerClient::new(channel);
let mut client = filer_pb::seaweed_filer_client::SeaweedFilerClient::new(channel)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let resp = client
.ping(filer_pb::PingRequest::default())
.await

10
seaweed-volume/src/server/heartbeat.rs

@ -11,7 +11,7 @@ use std::time::Duration;
use tokio::sync::broadcast;
use tracing::{error, info, warn};
use super::grpc_client::build_grpc_endpoint;
use super::grpc_client::{build_grpc_endpoint, GRPC_MAX_MESSAGE_SIZE};
use super::volume_server::VolumeServerState;
use crate::pb::master_pb;
use crate::pb::master_pb::seaweed_client::SeaweedClient;
@ -222,7 +222,9 @@ async fn try_get_master_configuration(
.timeout(Duration::from_secs(10))
.connect()
.await?;
let mut client = SeaweedClient::new(channel);
let mut client = SeaweedClient::new(channel)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let resp = client
.get_master_configuration(master_pb::GetMasterConfigurationRequest {})
.await?;
@ -247,7 +249,9 @@ async fn do_heartbeat(
.connect()
.await?;
let mut client = SeaweedClient::new(channel);
let mut client = SeaweedClient::new(channel)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let (tx, rx) = tokio::sync::mpsc::channel::<master_pb::Heartbeat>(32);

Loading…
Cancel
Save