Browse Source

Serve HTTP ranges from backend source

rust-volume-server
Chris Lu 4 days ago
parent
commit
2c26c72706
  1. 276
      seaweed-volume/src/server/handlers.rs

276
seaweed-volume/src/server/handlers.rs

@ -75,6 +75,34 @@ impl Drop for TrackedBody {
}
}
fn finalize_bytes_response(
status: StatusCode,
headers: HeaderMap,
data: Vec<u8>,
state: Option<Arc<VolumeServerState>>,
) -> Response {
if let Some(state) = state {
let data_len = data.len() as i64;
let new_val = state
.inflight_download_bytes
.fetch_add(data_len, Ordering::Relaxed)
+ data_len;
metrics::INFLIGHT_DOWNLOAD_SIZE.set(new_val);
let tracked_body = TrackedBody {
data,
state,
bytes: data_len,
};
let body = Body::new(tracked_body);
let mut resp = Response::new(body);
*resp.status_mut() = status;
*resp.headers_mut() = headers;
resp
} else {
(status, headers, data).into_response()
}
}
// ============================================================================
// Streaming Body for Large Files
// ============================================================================
@ -826,19 +854,25 @@ async fn get_or_head_handler_inner(
return StatusCode::NOT_FOUND.into_response();
}
// Determine if we can stream (large, uncompressed, not manifest, not image needing ops, no range)
let bypass_cm = query.cm.as_deref() == Some("false");
let can_stream = stream_info.is_some()
&& n.data_size > STREAMING_THRESHOLD
let track_download = download_guard.is_some();
let can_direct_source_read = stream_info.is_some()
&& !n.is_compressed()
&& !(n.is_chunk_manifest() && !bypass_cm)
&& !(is_image && has_image_ops)
&& !(is_image && has_image_ops);
// Determine if we can stream (large, direct-source eligible, no range)
let can_stream = can_direct_source_read
&& n.data_size > STREAMING_THRESHOLD
&& !has_range
&& method != Method::HEAD;
let can_handle_head_from_meta = can_direct_source_read && method == Method::HEAD;
let can_handle_range_from_source = can_direct_source_read && has_range;
// For chunk manifest or any non-streaming path, we need the full data.
// If we can't stream, do a full read now.
if !can_stream {
if !can_stream && !can_handle_head_from_meta && !can_handle_range_from_source {
// Re-read with full data
let mut n_full = Needle {
id: needle_id,
@ -1109,6 +1143,31 @@ async fn get_or_head_handler_inner(
}
}
if can_handle_head_from_meta {
if let Some(info) = stream_info {
response_headers.insert(
header::CONTENT_LENGTH,
info.data_size.to_string().parse().unwrap(),
);
return (StatusCode::OK, response_headers).into_response();
}
}
if can_handle_range_from_source {
if let (Some(range_header), Some(info)) =
(headers.get(header::RANGE), stream_info)
{
if let Ok(range_str) = range_header.to_str() {
return handle_range_request_from_source(
range_str,
info,
response_headers,
track_download.then(|| state.clone()),
);
}
}
}
// ---- Buffered path: small files, compressed, images, range requests ----
// Handle compressed data: if needle is compressed, either pass through or decompress
@ -1161,7 +1220,12 @@ async fn get_or_head_handler_inner(
// Check Range header
if let Some(range_header) = headers.get(header::RANGE) {
if let Ok(range_str) = range_header.to_str() {
return handle_range_request(range_str, &data, response_headers);
return handle_range_request(
range_str,
&data,
response_headers,
track_download.then(|| state.clone()),
);
}
}
@ -1173,31 +1237,21 @@ async fn get_or_head_handler_inner(
return (StatusCode::OK, response_headers).into_response();
}
// If download throttling is active, wrap the body so we track when it's fully sent
if download_guard.is_some() {
let data_len = data.len() as i64;
let new_val = state
.inflight_download_bytes
.fetch_add(data_len, Ordering::Relaxed)
+ data_len;
metrics::INFLIGHT_DOWNLOAD_SIZE.set(new_val);
let tracked_body = TrackedBody {
data,
state: state.clone(),
bytes: data_len,
};
let body = Body::new(tracked_body);
let mut resp = Response::new(body);
*resp.status_mut() = StatusCode::OK;
*resp.headers_mut() = response_headers;
return resp;
}
(StatusCode::OK, response_headers, data).into_response()
finalize_bytes_response(
StatusCode::OK,
response_headers,
data,
track_download.then(|| state.clone()),
)
}
/// Handle HTTP Range requests. Returns 206 Partial Content or 416 Range Not Satisfiable.
fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) -> Response {
fn handle_range_request(
range_str: &str,
data: &[u8],
mut headers: HeaderMap,
state: Option<Arc<VolumeServerState>>,
) -> Response {
let total = data.len();
// Parse "bytes=start-end"
@ -1281,7 +1335,7 @@ fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) ->
header::CONTENT_LENGTH,
slice.len().to_string().parse().unwrap(),
);
(StatusCode::PARTIAL_CONTENT, headers, slice.to_vec()).into_response()
finalize_bytes_response(StatusCode::PARTIAL_CONTENT, headers, slice.to_vec(), state)
} else {
// Multi-range: build multipart/byteranges response
let boundary = "SeaweedFSBoundary";
@ -1317,10 +1371,168 @@ fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) ->
header::CONTENT_LENGTH,
body.len().to_string().parse().unwrap(),
);
(StatusCode::PARTIAL_CONTENT, headers, body).into_response()
finalize_bytes_response(StatusCode::PARTIAL_CONTENT, headers, body, state)
}
}
fn handle_range_request_from_source(
range_str: &str,
info: crate::storage::volume::NeedleStreamInfo,
mut headers: HeaderMap,
state: Option<Arc<VolumeServerState>>,
) -> Response {
let total = info.data_size as usize;
let range_spec = match range_str.strip_prefix("bytes=") {
Some(s) => s,
None => {
headers.insert(
header::CONTENT_LENGTH,
info.data_size.to_string().parse().unwrap(),
);
return (StatusCode::OK, headers).into_response();
}
};
let ranges: Vec<(usize, usize)> = range_spec
.split(',')
.filter_map(|part| {
let part = part.trim();
if let Some(pos) = part.find('-') {
let start_str = &part[..pos];
let end_str = &part[pos + 1..];
if start_str.is_empty() {
let mut suffix: usize = end_str.parse().ok()?;
if suffix > total {
suffix = total;
}
Some((total - suffix, total - 1))
} else {
let start: usize = start_str.parse().ok()?;
let end = if end_str.is_empty() {
total - 1
} else {
end_str.parse().ok()?
};
Some((start, end))
}
} else {
None
}
})
.collect();
if ranges.is_empty() {
headers.insert(
header::CONTENT_LENGTH,
info.data_size.to_string().parse().unwrap(),
);
return (StatusCode::OK, headers).into_response();
}
let ranges: Vec<(usize, usize)> = ranges
.into_iter()
.map(|(start, mut end)| {
if end >= total {
end = total - 1;
}
(start, end)
})
.collect();
for &(start, end) in &ranges {
if start >= total || start > end {
headers.insert(
"Content-Range",
format!("bytes */{}", total).parse().unwrap(),
);
return (StatusCode::RANGE_NOT_SATISFIABLE, headers).into_response();
}
}
let combined_bytes: usize = ranges.iter().map(|&(s, e)| e - s + 1).sum();
if combined_bytes > total {
return (StatusCode::OK, headers).into_response();
}
let read_slice = |start: usize, end: usize| -> Result<Vec<u8>, std::io::Error> {
let mut buf = vec![0u8; end - start + 1];
info.source
.read_exact_at(&mut buf, info.data_file_offset + start as u64)?;
Ok(buf)
};
if ranges.len() == 1 {
let (start, end) = ranges[0];
let slice = match read_slice(start, end) {
Ok(slice) => slice,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("range read error: {}", err),
)
.into_response()
}
};
headers.insert(
"Content-Range",
format!("bytes {}-{}/{}", start, end, total)
.parse()
.unwrap(),
);
headers.insert(
header::CONTENT_LENGTH,
slice.len().to_string().parse().unwrap(),
);
return finalize_bytes_response(StatusCode::PARTIAL_CONTENT, headers, slice, state);
}
let boundary = "SeaweedFSBoundary";
let content_type = headers
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("application/octet-stream")
.to_string();
let mut body = Vec::new();
for (i, &(start, end)) in ranges.iter().enumerate() {
let slice = match read_slice(start, end) {
Ok(slice) => slice,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("range read error: {}", err),
)
.into_response()
}
};
if i == 0 {
body.extend_from_slice(format!("--{}\r\n", boundary).as_bytes());
} else {
body.extend_from_slice(format!("\r\n--{}\r\n", boundary).as_bytes());
}
body.extend_from_slice(format!("Content-Type: {}\r\n", content_type).as_bytes());
body.extend_from_slice(
format!("Content-Range: bytes {}-{}/{}\r\n\r\n", start, end, total).as_bytes(),
);
body.extend_from_slice(&slice);
}
body.extend_from_slice(format!("\r\n--{}--\r\n", boundary).as_bytes());
headers.insert(
header::CONTENT_TYPE,
format!("multipart/byteranges; boundary={}", boundary)
.parse()
.unwrap(),
);
headers.insert(
header::CONTENT_LENGTH,
body.len().to_string().parse().unwrap(),
);
finalize_bytes_response(StatusCode::PARTIAL_CONTENT, headers, body, state)
}
/// Extract filename from URL path like "/vid/fid/filename.ext"
fn extract_filename_from_path(path: &str) -> String {
let parts: Vec<&str> = path.trim_start_matches('/').split('/').collect();
@ -2854,7 +3066,7 @@ mod tests {
fn test_handle_range_single() {
let data = b"hello world";
let headers = HeaderMap::new();
let resp = handle_range_request("bytes=0-4", data, headers);
let resp = handle_range_request("bytes=0-4", data, headers, None);
assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT);
}
@ -2862,7 +3074,7 @@ mod tests {
fn test_handle_range_invalid() {
let data = b"hello";
let headers = HeaderMap::new();
let resp = handle_range_request("bytes=999-1000", data, headers);
let resp = handle_range_request("bytes=999-1000", data, headers, None);
assert_eq!(resp.status(), StatusCode::RANGE_NOT_SATISFIABLE);
}

Loading…
Cancel
Save