Browse Source

http: proxy to replicas before download-limit timeout

rust-volume-server
Chris Lu 3 days ago
parent
commit
33f2062e8d
  1. 188
      seaweed-volume/src/server/handlers.rs

188
seaweed-volume/src/server/handlers.rs

@ -468,11 +468,49 @@ struct ProxyRequestInfo {
fid_str: String,
}
fn build_proxy_request_info(
path: &str,
headers: &HeaderMap,
query_string: &str,
) -> Option<ProxyRequestInfo> {
let trimmed = path.trim_start_matches('/');
let (vid_str, fid_str) = if let Some(pos) = trimmed.find(',') {
let raw_fid = &trimmed[pos + 1..];
let fid = if let Some(slash) = raw_fid.find('/') {
&raw_fid[..slash]
} else if let Some(dot) = raw_fid.rfind('.') {
&raw_fid[..dot]
} else {
raw_fid
};
(trimmed[..pos].to_string(), fid.to_string())
} else if let Some(pos) = trimmed.find('/') {
let after = &trimmed[pos + 1..];
let fid_part = if let Some(slash) = after.find('/') {
&after[..slash]
} else {
after
};
(trimmed[..pos].to_string(), fid_part.to_string())
} else {
return None;
};
Some(ProxyRequestInfo {
original_headers: headers.clone(),
original_query: query_string.to_string(),
path: path.to_string(),
vid_str,
fid_str,
})
}
/// Handle proxy or redirect for a non-local volume read.
async fn proxy_or_redirect_to_target(
state: &VolumeServerState,
info: ProxyRequestInfo,
vid: VolumeId,
allow_local_redirect: bool,
) -> Response {
// Look up volume locations from master
let locations = match lookup_volume(
@ -516,6 +554,9 @@ async fn proxy_or_redirect_to_target(
match state.read_mode {
ReadMode::Proxy => proxy_request(state, &info, target).await,
ReadMode::Redirect => redirect_request(&info, target, &state.outgoing_http_scheme),
ReadMode::Local if allow_local_redirect => {
redirect_request(&info, target, &state.outgoing_http_scheme)
}
ReadMode::Local => unreachable!(),
}
}
@ -742,61 +783,56 @@ async fn get_or_head_handler_inner(
return StatusCode::NOT_FOUND.into_response();
}
// Extract vid_str and fid_str from path for redirect URL construction.
// For redirect, fid must be stripped of extension (Go parity: parseURLPath returns raw fid).
let trimmed = path.trim_start_matches('/');
let (vid_str, fid_str) = if let Some(pos) = trimmed.find(',') {
let raw_fid = &trimmed[pos + 1..];
// Strip filename after slash: "fid/filename.ext" -> "fid"
let fid = if let Some(slash) = raw_fid.find('/') {
&raw_fid[..slash]
} else if let Some(dot) = raw_fid.rfind('.') {
// Strip extension: "fid.ext" -> "fid"
&raw_fid[..dot]
} else {
raw_fid
};
(trimmed[..pos].to_string(), fid.to_string())
} else if let Some(pos) = trimmed.find('/') {
let after = &trimmed[pos + 1..];
let fid_part = if let Some(slash) = after.find('/') {
&after[..slash]
} else {
after
};
(trimmed[..pos].to_string(), fid_part.to_string())
} else {
return StatusCode::NOT_FOUND.into_response();
};
let info = ProxyRequestInfo {
original_headers: request.headers().clone(),
original_query: query_string,
path: path.clone(),
vid_str,
fid_str,
let info = match build_proxy_request_info(&path, request.headers(), &query_string) {
Some(info) => info,
None => return StatusCode::NOT_FOUND.into_response(),
};
return proxy_or_redirect_to_target(&state, info, vid).await;
return proxy_or_redirect_to_target(&state, info, vid, false).await;
}
// Download throttling
let download_guard = if state.concurrent_download_limit > 0 {
let timeout = state.inflight_download_data_timeout;
let deadline = tokio::time::Instant::now() + timeout;
let query_string = request.uri().query().unwrap_or("").to_string();
let mut noted_over_limit = false;
loop {
let current = state.inflight_download_bytes.load(Ordering::Relaxed);
if current <= state.concurrent_download_limit {
break;
}
if !noted_over_limit {
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::DOWNLOAD_LIMIT_COND])
.inc();
noted_over_limit = true;
}
// Match Go's checkDownloadLimit: when this request has not already been
// proxied and the local volume has replicas, try another replica first.
let should_try_replica =
!query_string.contains("proxied=true") && !state.master_url.is_empty() && {
let store = state.store.read().unwrap();
store.find_volume(vid).map_or(false, |(_, vol)| {
vol.super_block.replica_placement.get_copy_count() > 1
})
};
if should_try_replica {
if let Some(info) =
build_proxy_request_info(&path, request.headers(), &query_string)
{
return proxy_or_redirect_to_target(&state, info, vid, true).await;
}
}
if tokio::time::timeout_at(deadline, state.download_notify.notified())
.await
.is_err()
{
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::DOWNLOAD_LIMIT_COND])
.inc();
return json_error_with_query(
StatusCode::TOO_MANY_REQUESTS,
"download limit exceeded",
@ -822,10 +858,10 @@ async fn get_or_head_handler_inner(
let has_range = headers.contains_key(header::RANGE);
let ext = extract_extension_from_path(&path);
// Go checks resize and crop extensions separately: resize supports .webp, crop does not.
let has_resize_ops = is_image_resize_ext(&ext)
&& (query.width.is_some() || query.height.is_some());
let has_crop_ops = is_image_crop_ext(&ext)
&& (query.crop_x1.is_some() || query.crop_y1.is_some());
let has_resize_ops =
is_image_resize_ext(&ext) && (query.width.is_some() || query.height.is_some());
let has_crop_ops =
is_image_crop_ext(&ext) && (query.crop_x1.is_some() || query.crop_y1.is_some());
let has_image_ops = has_resize_ops || has_crop_ops;
// Stream info is only available for regular volumes, not EC volumes.
@ -1037,7 +1073,16 @@ async fn get_or_head_handler_inner(
// Pass ETag so chunk manifest responses include it (matches Go: ETag is set on the
// response writer before tryHandleChunkedFile runs).
if n.is_chunk_manifest() && !bypass_cm {
if let Some(resp) = try_expand_chunk_manifest(&state, &n, &headers, &method, &path, &query, &etag, &last_modified_str) {
if let Some(resp) = try_expand_chunk_manifest(
&state,
&n,
&headers,
&method,
&path,
&query,
&etag,
&last_modified_str,
) {
return resp;
}
// If manifest expansion fails (invalid JSON etc.), fall through to raw data
@ -1233,9 +1278,7 @@ async fn get_or_head_handler_inner(
}
if can_handle_range_from_source {
if let (Some(range_header), Some(info)) =
(headers.get(header::RANGE), stream_info)
{
if let (Some(range_header), Some(info)) = (headers.get(header::RANGE), stream_info) {
if let Ok(range_str) = range_header.to_str() {
return handle_range_request_from_source(
range_str,
@ -1359,7 +1402,10 @@ fn parse_range_header(s: &str, size: i64) -> Result<Vec<HttpRange>, &'static str
};
let start_str = part[..pos].trim();
let end_str = part[pos + 1..].trim();
let mut r = HttpRange { start: 0, length: 0 };
let mut r = HttpRange {
start: 0,
length: 0,
};
if start_str.is_empty() {
let mut i = end_str.parse::<i64>().map_err(|_| "invalid range")?;
if i > size {
@ -1667,10 +1713,7 @@ fn format_content_disposition(disposition_type: &str, filename: &str) -> String
} else {
// Non-ASCII: use RFC 2231 encoding with filename* parameter
let encoded = percent_encode_rfc2231(filename);
format!(
"{}; filename*=utf-8''{}",
disposition_type, encoded
)
format!("{}; filename*=utf-8''{}", disposition_type, encoded)
}
}
@ -1822,7 +1865,6 @@ fn is_zero_u32(v: &u32) -> bool {
*v == 0
}
pub async fn post_handler(
State(state): State<Arc<VolumeServerState>>,
request: Request<Body>,
@ -1839,7 +1881,7 @@ pub async fn post_handler(
StatusCode::BAD_REQUEST,
&format!("form parse error: {}", e),
Some(&query),
)
);
}
};
@ -1859,11 +1901,7 @@ pub async fn post_handler(
.unwrap()
.check_jwt_for_file(token.as_deref(), &file_id, true)
{
return json_error_with_query(
StatusCode::UNAUTHORIZED,
"wrong jwt",
Some(&query),
);
return json_error_with_query(StatusCode::UNAUTHORIZED, "wrong jwt", Some(&query));
}
// Upload throttling: check inflight bytes against limit
@ -1929,8 +1967,7 @@ pub async fn post_handler(
.to_string();
// Go only parses multipart form-data for POST requests with form-data content type.
let should_parse_multipart =
method == Method::POST && content_type_str.contains("form-data");
let should_parse_multipart = method == Method::POST && content_type_str.contains("form-data");
// Validate multipart/form-data has a boundary
if should_parse_multipart && !content_type_str.contains("boundary=") {
@ -2146,17 +2183,15 @@ pub async fn post_handler(
let mime_type = if let Some(ref pct) = parsed_content_type {
pct.clone()
} else {
let multipart_fallback = if should_parse_multipart
&& !filename.is_empty()
&& !is_chunk_manifest
{
mime_guess::from_path(&filename)
.first()
.map(|m| m.to_string())
.unwrap_or_default()
} else {
String::new()
};
let multipart_fallback =
if should_parse_multipart && !filename.is_empty() && !is_chunk_manifest {
mime_guess::from_path(&filename)
.first()
.map(|m| m.to_string())
.unwrap_or_default()
} else {
String::new()
};
headers
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
@ -2404,8 +2439,7 @@ pub async fn delete_handler(
) -> Response {
let path = request.uri().path().to_string();
let del_query = request.uri().query().unwrap_or("").to_string();
let del_params: ReadQueryParams =
serde_urlencoded::from_str(&del_query).unwrap_or_default();
let del_params: ReadQueryParams = serde_urlencoded::from_str(&del_query).unwrap_or_default();
let headers = request.headers().clone();
let (vid, needle_id, cookie) = match parse_url_path(&path) {
@ -2428,11 +2462,7 @@ pub async fn delete_handler(
.unwrap()
.check_jwt_for_file(token.as_deref(), &file_id, true)
{
return json_error_with_query(
StatusCode::UNAUTHORIZED,
"wrong jwt",
Some(&del_query),
);
return json_error_with_query(StatusCode::UNAUTHORIZED, "wrong jwt", Some(&del_query));
}
// Check for EC volume first (Go checks hasEcVolume before regular volume in DeleteHandler).
@ -2539,7 +2569,11 @@ pub async fn delete_handler(
Ok(_) => {}
Err(_) => {
let result = DeleteResult { size: 0 };
return json_response_with_params(StatusCode::NOT_FOUND, &result, Some(&del_params));
return json_response_with_params(
StatusCode::NOT_FOUND,
&result,
Some(&del_params),
);
}
}
}

Loading…
Cancel
Save