diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index 0a5440f48..963a03fa5 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -468,11 +468,49 @@ struct ProxyRequestInfo { fid_str: String, } +fn build_proxy_request_info( + path: &str, + headers: &HeaderMap, + query_string: &str, +) -> Option { + let trimmed = path.trim_start_matches('/'); + let (vid_str, fid_str) = if let Some(pos) = trimmed.find(',') { + let raw_fid = &trimmed[pos + 1..]; + let fid = if let Some(slash) = raw_fid.find('/') { + &raw_fid[..slash] + } else if let Some(dot) = raw_fid.rfind('.') { + &raw_fid[..dot] + } else { + raw_fid + }; + (trimmed[..pos].to_string(), fid.to_string()) + } else if let Some(pos) = trimmed.find('/') { + let after = &trimmed[pos + 1..]; + let fid_part = if let Some(slash) = after.find('/') { + &after[..slash] + } else { + after + }; + (trimmed[..pos].to_string(), fid_part.to_string()) + } else { + return None; + }; + + Some(ProxyRequestInfo { + original_headers: headers.clone(), + original_query: query_string.to_string(), + path: path.to_string(), + vid_str, + fid_str, + }) +} + /// Handle proxy or redirect for a non-local volume read. async fn proxy_or_redirect_to_target( state: &VolumeServerState, info: ProxyRequestInfo, vid: VolumeId, + allow_local_redirect: bool, ) -> Response { // Look up volume locations from master let locations = match lookup_volume( @@ -516,6 +554,9 @@ async fn proxy_or_redirect_to_target( match state.read_mode { ReadMode::Proxy => proxy_request(state, &info, target).await, ReadMode::Redirect => redirect_request(&info, target, &state.outgoing_http_scheme), + ReadMode::Local if allow_local_redirect => { + redirect_request(&info, target, &state.outgoing_http_scheme) + } ReadMode::Local => unreachable!(), } } @@ -742,61 +783,56 @@ async fn get_or_head_handler_inner( return StatusCode::NOT_FOUND.into_response(); } - // Extract vid_str and fid_str from path for redirect URL construction. // For redirect, fid must be stripped of extension (Go parity: parseURLPath returns raw fid). - let trimmed = path.trim_start_matches('/'); - let (vid_str, fid_str) = if let Some(pos) = trimmed.find(',') { - let raw_fid = &trimmed[pos + 1..]; - // Strip filename after slash: "fid/filename.ext" -> "fid" - let fid = if let Some(slash) = raw_fid.find('/') { - &raw_fid[..slash] - } else if let Some(dot) = raw_fid.rfind('.') { - // Strip extension: "fid.ext" -> "fid" - &raw_fid[..dot] - } else { - raw_fid - }; - (trimmed[..pos].to_string(), fid.to_string()) - } else if let Some(pos) = trimmed.find('/') { - let after = &trimmed[pos + 1..]; - let fid_part = if let Some(slash) = after.find('/') { - &after[..slash] - } else { - after - }; - (trimmed[..pos].to_string(), fid_part.to_string()) - } else { - return StatusCode::NOT_FOUND.into_response(); - }; - - let info = ProxyRequestInfo { - original_headers: request.headers().clone(), - original_query: query_string, - path: path.clone(), - vid_str, - fid_str, + let info = match build_proxy_request_info(&path, request.headers(), &query_string) { + Some(info) => info, + None => return StatusCode::NOT_FOUND.into_response(), }; - return proxy_or_redirect_to_target(&state, info, vid).await; + return proxy_or_redirect_to_target(&state, info, vid, false).await; } // Download throttling let download_guard = if state.concurrent_download_limit > 0 { let timeout = state.inflight_download_data_timeout; let deadline = tokio::time::Instant::now() + timeout; + let query_string = request.uri().query().unwrap_or("").to_string(); + let mut noted_over_limit = false; loop { let current = state.inflight_download_bytes.load(Ordering::Relaxed); if current <= state.concurrent_download_limit { break; } + + if !noted_over_limit { + metrics::HANDLER_COUNTER + .with_label_values(&[metrics::DOWNLOAD_LIMIT_COND]) + .inc(); + noted_over_limit = true; + } + + // Match Go's checkDownloadLimit: when this request has not already been + // proxied and the local volume has replicas, try another replica first. + let should_try_replica = + !query_string.contains("proxied=true") && !state.master_url.is_empty() && { + let store = state.store.read().unwrap(); + store.find_volume(vid).map_or(false, |(_, vol)| { + vol.super_block.replica_placement.get_copy_count() > 1 + }) + }; + if should_try_replica { + if let Some(info) = + build_proxy_request_info(&path, request.headers(), &query_string) + { + return proxy_or_redirect_to_target(&state, info, vid, true).await; + } + } + if tokio::time::timeout_at(deadline, state.download_notify.notified()) .await .is_err() { - metrics::HANDLER_COUNTER - .with_label_values(&[metrics::DOWNLOAD_LIMIT_COND]) - .inc(); return json_error_with_query( StatusCode::TOO_MANY_REQUESTS, "download limit exceeded", @@ -822,10 +858,10 @@ async fn get_or_head_handler_inner( let has_range = headers.contains_key(header::RANGE); let ext = extract_extension_from_path(&path); // Go checks resize and crop extensions separately: resize supports .webp, crop does not. - let has_resize_ops = is_image_resize_ext(&ext) - && (query.width.is_some() || query.height.is_some()); - let has_crop_ops = is_image_crop_ext(&ext) - && (query.crop_x1.is_some() || query.crop_y1.is_some()); + let has_resize_ops = + is_image_resize_ext(&ext) && (query.width.is_some() || query.height.is_some()); + let has_crop_ops = + is_image_crop_ext(&ext) && (query.crop_x1.is_some() || query.crop_y1.is_some()); let has_image_ops = has_resize_ops || has_crop_ops; // Stream info is only available for regular volumes, not EC volumes. @@ -1037,7 +1073,16 @@ async fn get_or_head_handler_inner( // Pass ETag so chunk manifest responses include it (matches Go: ETag is set on the // response writer before tryHandleChunkedFile runs). if n.is_chunk_manifest() && !bypass_cm { - if let Some(resp) = try_expand_chunk_manifest(&state, &n, &headers, &method, &path, &query, &etag, &last_modified_str) { + if let Some(resp) = try_expand_chunk_manifest( + &state, + &n, + &headers, + &method, + &path, + &query, + &etag, + &last_modified_str, + ) { return resp; } // If manifest expansion fails (invalid JSON etc.), fall through to raw data @@ -1233,9 +1278,7 @@ async fn get_or_head_handler_inner( } if can_handle_range_from_source { - if let (Some(range_header), Some(info)) = - (headers.get(header::RANGE), stream_info) - { + if let (Some(range_header), Some(info)) = (headers.get(header::RANGE), stream_info) { if let Ok(range_str) = range_header.to_str() { return handle_range_request_from_source( range_str, @@ -1359,7 +1402,10 @@ fn parse_range_header(s: &str, size: i64) -> Result, &'static str }; let start_str = part[..pos].trim(); let end_str = part[pos + 1..].trim(); - let mut r = HttpRange { start: 0, length: 0 }; + let mut r = HttpRange { + start: 0, + length: 0, + }; if start_str.is_empty() { let mut i = end_str.parse::().map_err(|_| "invalid range")?; if i > size { @@ -1667,10 +1713,7 @@ fn format_content_disposition(disposition_type: &str, filename: &str) -> String } else { // Non-ASCII: use RFC 2231 encoding with filename* parameter let encoded = percent_encode_rfc2231(filename); - format!( - "{}; filename*=utf-8''{}", - disposition_type, encoded - ) + format!("{}; filename*=utf-8''{}", disposition_type, encoded) } } @@ -1822,7 +1865,6 @@ fn is_zero_u32(v: &u32) -> bool { *v == 0 } - pub async fn post_handler( State(state): State>, request: Request, @@ -1839,7 +1881,7 @@ pub async fn post_handler( StatusCode::BAD_REQUEST, &format!("form parse error: {}", e), Some(&query), - ) + ); } }; @@ -1859,11 +1901,7 @@ pub async fn post_handler( .unwrap() .check_jwt_for_file(token.as_deref(), &file_id, true) { - return json_error_with_query( - StatusCode::UNAUTHORIZED, - "wrong jwt", - Some(&query), - ); + return json_error_with_query(StatusCode::UNAUTHORIZED, "wrong jwt", Some(&query)); } // Upload throttling: check inflight bytes against limit @@ -1929,8 +1967,7 @@ pub async fn post_handler( .to_string(); // Go only parses multipart form-data for POST requests with form-data content type. - let should_parse_multipart = - method == Method::POST && content_type_str.contains("form-data"); + let should_parse_multipart = method == Method::POST && content_type_str.contains("form-data"); // Validate multipart/form-data has a boundary if should_parse_multipart && !content_type_str.contains("boundary=") { @@ -2146,17 +2183,15 @@ pub async fn post_handler( let mime_type = if let Some(ref pct) = parsed_content_type { pct.clone() } else { - let multipart_fallback = if should_parse_multipart - && !filename.is_empty() - && !is_chunk_manifest - { - mime_guess::from_path(&filename) - .first() - .map(|m| m.to_string()) - .unwrap_or_default() - } else { - String::new() - }; + let multipart_fallback = + if should_parse_multipart && !filename.is_empty() && !is_chunk_manifest { + mime_guess::from_path(&filename) + .first() + .map(|m| m.to_string()) + .unwrap_or_default() + } else { + String::new() + }; headers .get(header::CONTENT_TYPE) .and_then(|v| v.to_str().ok()) @@ -2404,8 +2439,7 @@ pub async fn delete_handler( ) -> Response { let path = request.uri().path().to_string(); let del_query = request.uri().query().unwrap_or("").to_string(); - let del_params: ReadQueryParams = - serde_urlencoded::from_str(&del_query).unwrap_or_default(); + let del_params: ReadQueryParams = serde_urlencoded::from_str(&del_query).unwrap_or_default(); let headers = request.headers().clone(); let (vid, needle_id, cookie) = match parse_url_path(&path) { @@ -2428,11 +2462,7 @@ pub async fn delete_handler( .unwrap() .check_jwt_for_file(token.as_deref(), &file_id, true) { - return json_error_with_query( - StatusCode::UNAUTHORIZED, - "wrong jwt", - Some(&del_query), - ); + return json_error_with_query(StatusCode::UNAUTHORIZED, "wrong jwt", Some(&del_query)); } // Check for EC volume first (Go checks hasEcVolume before regular volume in DeleteHandler). @@ -2539,7 +2569,11 @@ pub async fn delete_handler( Ok(_) => {} Err(_) => { let result = DeleteResult { size: 0 }; - return json_response_with_params(StatusCode::NOT_FOUND, &result, Some(&del_params)); + return json_response_with_params( + StatusCode::NOT_FOUND, + &result, + Some(&del_params), + ); } } }