Browse Source

Format pending Rust server updates

rust-volume-server
Chris Lu 1 week ago
parent
commit
b7a8c3fddb
  1. 43
      seaweed-volume/src/server/grpc_server.rs
  2. 21
      seaweed-volume/src/server/handlers.rs
  3. 8
      seaweed-volume/src/server/heartbeat.rs
  4. 11
      seaweed-volume/src/server/request_id.rs
  5. 1
      seaweed-volume/src/server/server_stats.rs
  6. 16
      seaweed-volume/src/server/ui.rs

43
seaweed-volume/src/server/grpc_server.rs

@ -103,8 +103,8 @@ impl VolumeGrpcService {
channel,
super::request_id::outgoing_request_id_interceptor,
)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
client
.volume_mark_readonly(master_pb::VolumeMarkReadonlyRequest {
ip: info.ip.clone(),
@ -901,12 +901,13 @@ impl VolumeServer for VolumeGrpcService {
))
})?;
let mut client = volume_server_pb::volume_server_client::VolumeServerClient::with_interceptor(
channel,
super::request_id::outgoing_request_id_interceptor,
)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let mut client =
volume_server_pb::volume_server_client::VolumeServerClient::with_interceptor(
channel,
super::request_id::outgoing_request_id_interceptor,
)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
// Get file status from source
let vol_info = client
@ -1636,12 +1637,13 @@ impl VolumeServer for VolumeGrpcService {
.await
.map_err(|e| Status::internal(format!("connect to {}: {}", grpc_addr, e)))?;
let mut client = volume_server_pb::volume_server_client::VolumeServerClient::with_interceptor(
channel,
super::request_id::outgoing_request_id_interceptor,
)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let mut client =
volume_server_pb::volume_server_client::VolumeServerClient::with_interceptor(
channel,
super::request_id::outgoing_request_id_interceptor,
)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
// Call VolumeTailSender on source
let mut stream = client
@ -1905,12 +1907,13 @@ impl VolumeServer for VolumeGrpcService {
))
})?;
let mut client = volume_server_pb::volume_server_client::VolumeServerClient::with_interceptor(
channel,
super::request_id::outgoing_request_id_interceptor,
)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let mut client =
volume_server_pb::volume_server_client::VolumeServerClient::with_interceptor(
channel,
super::request_id::outgoing_request_id_interceptor,
)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
// Copy each shard
for &shard_id in &req.shard_ids {

21
seaweed-volume/src/server/handlers.rs

@ -1797,14 +1797,12 @@ pub async fn post_handler(
// Fix JPEG orientation from EXIF data before storing (matches Go behavior).
// Only for non-compressed uploads that are JPEG files.
let body_data = if state.fix_jpg_orientation
&& !is_gzipped
&& crate::images::is_jpeg(&mime_type, &path)
{
crate::images::fix_jpg_orientation(&body_data_raw)
} else {
body_data_raw
};
let body_data =
if state.fix_jpg_orientation && !is_gzipped && crate::images::is_jpeg(&mime_type, &path) {
crate::images::fix_jpg_orientation(&body_data_raw)
} else {
body_data_raw
};
// H3: Capture original data size BEFORE auto-compression
let original_data_size = body_data.len() as u32;
@ -2405,7 +2403,12 @@ pub async fn stats_disk_handler(
pub async fn favicon_handler() -> Response {
let asset = super::ui::favicon_asset();
(StatusCode::OK, [(header::CONTENT_TYPE, asset.content_type)], asset.bytes).into_response()
(
StatusCode::OK,
[(header::CONTENT_TYPE, asset.content_type)],
asset.bytes,
)
.into_response()
}
pub async fn static_asset_handler(Path(path): Path<String>) -> Response {

8
seaweed-volume/src/server/heartbeat.rs

@ -226,8 +226,8 @@ async fn try_get_master_configuration(
channel,
super::request_id::outgoing_request_id_interceptor,
)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let resp = client
.get_master_configuration(master_pb::GetMasterConfigurationRequest {})
.await?;
@ -256,8 +256,8 @@ async fn do_heartbeat(
channel,
super::request_id::outgoing_request_id_interceptor,
)
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let (tx, rx) = tokio::sync::mpsc::channel::<master_pb::Heartbeat>(32);

11
seaweed-volume/src/server/request_id.rs

@ -78,9 +78,7 @@ pub fn current_request_id() -> Option<String> {
CURRENT_REQUEST_ID.try_with(Clone::clone).ok()
}
pub fn outgoing_request_id_interceptor(
mut request: Request<()>,
) -> Result<Request<()>, Status> {
pub fn outgoing_request_id_interceptor(mut request: Request<()>) -> Result<Request<()>, Status> {
if let Some(request_id) = current_request_id() {
let value = MetadataValue::try_from(request_id.as_str())
.map_err(|_| Status::internal("invalid scoped request id"))?;
@ -112,9 +110,10 @@ mod tests {
#[tokio::test]
async fn test_scope_request_id_exposes_current_value() {
let request_id = "req-123".to_string();
let current = scope_request_id(request_id.clone(), async move {
current_request_id().unwrap()
})
let current = scope_request_id(
request_id.clone(),
async move { current_request_id().unwrap() },
)
.await;
assert_eq!(current, request_id);
}

1
seaweed-volume/src/server/server_stats.rs

@ -238,4 +238,3 @@ pub fn reset_for_tests() {
let mut inner = SERVER_STATS.inner.lock().unwrap();
*inner = ServerStatsInner::default();
}

16
seaweed-volume/src/server/ui.rs

@ -365,7 +365,14 @@ pub fn render_volume_server_html(state: &VolumeServerState) -> String {
)
}
fn collect_ui_data(store: &Store) -> (Vec<UiDiskRow>, Vec<UiVolumeRow>, Vec<UiVolumeRow>, Vec<UiEcVolumeRow>) {
fn collect_ui_data(
store: &Store,
) -> (
Vec<UiDiskRow>,
Vec<UiVolumeRow>,
Vec<UiVolumeRow>,
Vec<UiEcVolumeRow>,
) {
let mut disk_rows = Vec::new();
let mut volumes = Vec::new();
let mut remote_volumes = Vec::new();
@ -484,7 +491,11 @@ fn bytes_to_human_readable(bytes: u64) -> String {
exp += 1;
}
format!("{:.2} {}iB", bytes as f64 / div as f64, ["K", "M", "G", "T", "P", "E"][exp])
format!(
"{:.2} {}iB",
bytes as f64 / div as f64,
["K", "M", "G", "T", "P", "E"][exp]
)
}
fn escape_html(input: &str) -> String {
@ -494,4 +505,3 @@ fn escape_html(input: &str) -> String {
.replace('>', "&gt;")
.replace('"', "&quot;")
}
Loading…
Cancel
Save