diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index da94d464d..f552a8d67 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -75,6 +75,34 @@ impl Drop for TrackedBody { } } +fn finalize_bytes_response( + status: StatusCode, + headers: HeaderMap, + data: Vec, + state: Option>, +) -> Response { + if let Some(state) = state { + let data_len = data.len() as i64; + let new_val = state + .inflight_download_bytes + .fetch_add(data_len, Ordering::Relaxed) + + data_len; + metrics::INFLIGHT_DOWNLOAD_SIZE.set(new_val); + let tracked_body = TrackedBody { + data, + state, + bytes: data_len, + }; + let body = Body::new(tracked_body); + let mut resp = Response::new(body); + *resp.status_mut() = status; + *resp.headers_mut() = headers; + resp + } else { + (status, headers, data).into_response() + } +} + // ============================================================================ // Streaming Body for Large Files // ============================================================================ @@ -826,19 +854,25 @@ async fn get_or_head_handler_inner( return StatusCode::NOT_FOUND.into_response(); } - // Determine if we can stream (large, uncompressed, not manifest, not image needing ops, no range) let bypass_cm = query.cm.as_deref() == Some("false"); - let can_stream = stream_info.is_some() - && n.data_size > STREAMING_THRESHOLD + let track_download = download_guard.is_some(); + let can_direct_source_read = stream_info.is_some() && !n.is_compressed() && !(n.is_chunk_manifest() && !bypass_cm) - && !(is_image && has_image_ops) + && !(is_image && has_image_ops); + + // Determine if we can stream (large, direct-source eligible, no range) + let can_stream = can_direct_source_read + && n.data_size > STREAMING_THRESHOLD && !has_range && method != Method::HEAD; + let can_handle_head_from_meta = can_direct_source_read && method == Method::HEAD; + let can_handle_range_from_source = can_direct_source_read && has_range; + // For chunk manifest or any non-streaming path, we need the full data. // If we can't stream, do a full read now. - if !can_stream { + if !can_stream && !can_handle_head_from_meta && !can_handle_range_from_source { // Re-read with full data let mut n_full = Needle { id: needle_id, @@ -1109,6 +1143,31 @@ async fn get_or_head_handler_inner( } } + if can_handle_head_from_meta { + if let Some(info) = stream_info { + response_headers.insert( + header::CONTENT_LENGTH, + info.data_size.to_string().parse().unwrap(), + ); + return (StatusCode::OK, response_headers).into_response(); + } + } + + if can_handle_range_from_source { + if let (Some(range_header), Some(info)) = + (headers.get(header::RANGE), stream_info) + { + if let Ok(range_str) = range_header.to_str() { + return handle_range_request_from_source( + range_str, + info, + response_headers, + track_download.then(|| state.clone()), + ); + } + } + } + // ---- Buffered path: small files, compressed, images, range requests ---- // Handle compressed data: if needle is compressed, either pass through or decompress @@ -1161,7 +1220,12 @@ async fn get_or_head_handler_inner( // Check Range header if let Some(range_header) = headers.get(header::RANGE) { if let Ok(range_str) = range_header.to_str() { - return handle_range_request(range_str, &data, response_headers); + return handle_range_request( + range_str, + &data, + response_headers, + track_download.then(|| state.clone()), + ); } } @@ -1173,31 +1237,21 @@ async fn get_or_head_handler_inner( return (StatusCode::OK, response_headers).into_response(); } - // If download throttling is active, wrap the body so we track when it's fully sent - if download_guard.is_some() { - let data_len = data.len() as i64; - let new_val = state - .inflight_download_bytes - .fetch_add(data_len, Ordering::Relaxed) - + data_len; - metrics::INFLIGHT_DOWNLOAD_SIZE.set(new_val); - let tracked_body = TrackedBody { - data, - state: state.clone(), - bytes: data_len, - }; - let body = Body::new(tracked_body); - let mut resp = Response::new(body); - *resp.status_mut() = StatusCode::OK; - *resp.headers_mut() = response_headers; - return resp; - } - - (StatusCode::OK, response_headers, data).into_response() + finalize_bytes_response( + StatusCode::OK, + response_headers, + data, + track_download.then(|| state.clone()), + ) } /// Handle HTTP Range requests. Returns 206 Partial Content or 416 Range Not Satisfiable. -fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) -> Response { +fn handle_range_request( + range_str: &str, + data: &[u8], + mut headers: HeaderMap, + state: Option>, +) -> Response { let total = data.len(); // Parse "bytes=start-end" @@ -1281,7 +1335,7 @@ fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) -> header::CONTENT_LENGTH, slice.len().to_string().parse().unwrap(), ); - (StatusCode::PARTIAL_CONTENT, headers, slice.to_vec()).into_response() + finalize_bytes_response(StatusCode::PARTIAL_CONTENT, headers, slice.to_vec(), state) } else { // Multi-range: build multipart/byteranges response let boundary = "SeaweedFSBoundary"; @@ -1317,10 +1371,168 @@ fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) -> header::CONTENT_LENGTH, body.len().to_string().parse().unwrap(), ); - (StatusCode::PARTIAL_CONTENT, headers, body).into_response() + finalize_bytes_response(StatusCode::PARTIAL_CONTENT, headers, body, state) } } +fn handle_range_request_from_source( + range_str: &str, + info: crate::storage::volume::NeedleStreamInfo, + mut headers: HeaderMap, + state: Option>, +) -> Response { + let total = info.data_size as usize; + + let range_spec = match range_str.strip_prefix("bytes=") { + Some(s) => s, + None => { + headers.insert( + header::CONTENT_LENGTH, + info.data_size.to_string().parse().unwrap(), + ); + return (StatusCode::OK, headers).into_response(); + } + }; + + let ranges: Vec<(usize, usize)> = range_spec + .split(',') + .filter_map(|part| { + let part = part.trim(); + if let Some(pos) = part.find('-') { + let start_str = &part[..pos]; + let end_str = &part[pos + 1..]; + + if start_str.is_empty() { + let mut suffix: usize = end_str.parse().ok()?; + if suffix > total { + suffix = total; + } + Some((total - suffix, total - 1)) + } else { + let start: usize = start_str.parse().ok()?; + let end = if end_str.is_empty() { + total - 1 + } else { + end_str.parse().ok()? + }; + Some((start, end)) + } + } else { + None + } + }) + .collect(); + + if ranges.is_empty() { + headers.insert( + header::CONTENT_LENGTH, + info.data_size.to_string().parse().unwrap(), + ); + return (StatusCode::OK, headers).into_response(); + } + + let ranges: Vec<(usize, usize)> = ranges + .into_iter() + .map(|(start, mut end)| { + if end >= total { + end = total - 1; + } + (start, end) + }) + .collect(); + + for &(start, end) in &ranges { + if start >= total || start > end { + headers.insert( + "Content-Range", + format!("bytes */{}", total).parse().unwrap(), + ); + return (StatusCode::RANGE_NOT_SATISFIABLE, headers).into_response(); + } + } + + let combined_bytes: usize = ranges.iter().map(|&(s, e)| e - s + 1).sum(); + if combined_bytes > total { + return (StatusCode::OK, headers).into_response(); + } + + let read_slice = |start: usize, end: usize| -> Result, std::io::Error> { + let mut buf = vec![0u8; end - start + 1]; + info.source + .read_exact_at(&mut buf, info.data_file_offset + start as u64)?; + Ok(buf) + }; + + if ranges.len() == 1 { + let (start, end) = ranges[0]; + let slice = match read_slice(start, end) { + Ok(slice) => slice, + Err(err) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("range read error: {}", err), + ) + .into_response() + } + }; + headers.insert( + "Content-Range", + format!("bytes {}-{}/{}", start, end, total) + .parse() + .unwrap(), + ); + headers.insert( + header::CONTENT_LENGTH, + slice.len().to_string().parse().unwrap(), + ); + return finalize_bytes_response(StatusCode::PARTIAL_CONTENT, headers, slice, state); + } + + let boundary = "SeaweedFSBoundary"; + let content_type = headers + .get(header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .unwrap_or("application/octet-stream") + .to_string(); + + let mut body = Vec::new(); + for (i, &(start, end)) in ranges.iter().enumerate() { + let slice = match read_slice(start, end) { + Ok(slice) => slice, + Err(err) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("range read error: {}", err), + ) + .into_response() + } + }; + if i == 0 { + body.extend_from_slice(format!("--{}\r\n", boundary).as_bytes()); + } else { + body.extend_from_slice(format!("\r\n--{}\r\n", boundary).as_bytes()); + } + body.extend_from_slice(format!("Content-Type: {}\r\n", content_type).as_bytes()); + body.extend_from_slice( + format!("Content-Range: bytes {}-{}/{}\r\n\r\n", start, end, total).as_bytes(), + ); + body.extend_from_slice(&slice); + } + body.extend_from_slice(format!("\r\n--{}--\r\n", boundary).as_bytes()); + + headers.insert( + header::CONTENT_TYPE, + format!("multipart/byteranges; boundary={}", boundary) + .parse() + .unwrap(), + ); + headers.insert( + header::CONTENT_LENGTH, + body.len().to_string().parse().unwrap(), + ); + finalize_bytes_response(StatusCode::PARTIAL_CONTENT, headers, body, state) +} + /// Extract filename from URL path like "/vid/fid/filename.ext" fn extract_filename_from_path(path: &str) -> String { let parts: Vec<&str> = path.trim_start_matches('/').split('/').collect(); @@ -2854,7 +3066,7 @@ mod tests { fn test_handle_range_single() { let data = b"hello world"; let headers = HeaderMap::new(); - let resp = handle_range_request("bytes=0-4", data, headers); + let resp = handle_range_request("bytes=0-4", data, headers, None); assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT); } @@ -2862,7 +3074,7 @@ mod tests { fn test_handle_range_invalid() { let data = b"hello"; let headers = HeaderMap::new(); - let resp = handle_range_request("bytes=999-1000", data, headers); + let resp = handle_range_request("bytes=999-1000", data, headers, None); assert_eq!(resp.status(), StatusCode::RANGE_NOT_SATISFIABLE); }