Browse Source

Add EC volume read and delete support in HTTP handlers

rust-volume-server
Chris Lu 3 days ago
parent
commit
11b9ac8efc
  1. 286
      seaweed-volume/src/server/handlers.rs

286
seaweed-volume/src/server/handlers.rs

@ -570,15 +570,11 @@ async fn proxy_request(
response_headers.insert(name.clone(), value.clone());
}
let body_bytes = match resp.bytes().await {
Ok(b) => b,
Err(e) => {
tracing::warn!("proxy response read failed: {}", e);
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
};
// Stream the proxy response body instead of buffering it entirely
let byte_stream = resp.bytes_stream();
let body = Body::from_stream(byte_stream);
let mut response = Response::new(Body::from(body_bytes));
let mut response = Response::new(body);
*response.status_mut() = status;
*response.headers_mut() = response_headers;
response
@ -613,6 +609,7 @@ fn redirect_request(info: &ProxyRequestInfo, target: &VolumeLocation, scheme: &s
Response::builder()
.status(StatusCode::MOVED_PERMANENTLY)
.header("Location", &location)
.header("Content-Type", "text/html; charset=utf-8")
.body(Body::from(format!(
"<a href=\"{}\">Moved Permanently</a>.\n\n",
location
@ -726,11 +723,12 @@ async fn get_or_head_handler_inner(
};
// Check if volume exists locally; if not, proxy/redirect based on read_mode.
// This mirrors Go's hasVolume check in GetOrHeadHandler.
// This mirrors Go's hasVolume + hasEcVolume check in GetOrHeadHandler.
// NOTE: The RwLockReadGuard must be dropped before any .await to keep the future Send.
let has_volume = state.store.read().unwrap().has_volume(vid);
let has_ec_volume = state.store.read().unwrap().has_ec_volume(vid);
if !has_volume {
if !has_volume && !has_ec_volume {
// Check if already proxied (loop prevention)
let query_string = request.uri().query().unwrap_or("").to_string();
let is_proxied = query_string.contains("proxied=true");
@ -803,7 +801,8 @@ async fn get_or_head_handler_inner(
None
};
// Read needle — first do a meta-only read to check if streaming is appropriate
// Read needle — branching between regular volume and EC volume paths.
// EC volumes always do a full read (no streaming/meta-only).
let mut n = Needle {
id: needle_id,
cookie,
@ -820,91 +819,155 @@ async fn get_or_head_handler_inner(
&& (query.crop_x1.is_some() || query.crop_y1.is_some());
let has_image_ops = has_resize_ops || has_crop_ops;
// Try meta-only read first for potential streaming
let store = state.store.read().unwrap();
let stream_info = store.read_volume_needle_stream_info(vid, &mut n, read_deleted);
let stream_info = match stream_info {
Ok(info) => Some(info),
Err(crate::storage::volume::VolumeError::StreamingUnsupported) => None,
Err(crate::storage::volume::VolumeError::NotFound) => {
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::ERROR_GET_NOT_FOUND])
.inc();
return StatusCode::NOT_FOUND.into_response();
// Stream info is only available for regular volumes, not EC volumes.
let stream_info;
let bypass_cm;
let track_download;
let can_stream;
let can_handle_head_from_meta;
let can_handle_range_from_source;
if has_ec_volume && !has_volume {
// ---- EC volume read path (always full read, no streaming) ----
let store = state.store.read().unwrap();
match store.find_ec_volume(vid) {
Some(ecv) => match ecv.read_ec_shard_needle(needle_id) {
Ok(Some(ec_needle)) => {
n = ec_needle;
}
Ok(None) => {
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::ERROR_GET_NOT_FOUND])
.inc();
return StatusCode::NOT_FOUND.into_response();
}
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::ERROR_GET_NOT_FOUND])
.inc();
return StatusCode::NOT_FOUND.into_response();
}
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::ERROR_GET_INTERNAL])
.inc();
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("read ec error: {}", e),
)
.into_response();
}
},
None => {
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::ERROR_GET_NOT_FOUND])
.inc();
return StatusCode::NOT_FOUND.into_response();
}
}
Err(crate::storage::volume::VolumeError::Deleted) => {
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::ERROR_GET_NOT_FOUND])
.inc();
drop(store);
// Validate cookie (matches Go behavior after ReadEcShardNeedle)
if n.cookie != cookie {
return StatusCode::NOT_FOUND.into_response();
}
Err(e) => {
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::ERROR_GET_INTERNAL])
.inc();
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("read error: {}", e),
)
.into_response();
}
};
drop(store);
// Validate cookie
if n.cookie != cookie {
return StatusCode::NOT_FOUND.into_response();
}
// EC volumes: no streaming support
stream_info = None;
bypass_cm = query.cm.as_deref() == Some("false");
track_download = download_guard.is_some();
can_stream = false;
can_handle_head_from_meta = false;
can_handle_range_from_source = false;
} else {
// ---- Regular volume read path (with streaming support) ----
let bypass_cm = query.cm.as_deref() == Some("false");
let track_download = download_guard.is_some();
let can_direct_source_read = stream_info.is_some()
&& !n.is_compressed()
&& !(n.is_chunk_manifest() && !bypass_cm)
&& !has_image_ops;
// Determine if we can stream (large, direct-source eligible, no range)
let can_stream = can_direct_source_read
&& n.data_size > STREAMING_THRESHOLD
&& !has_range
&& method != Method::HEAD;
let can_handle_head_from_meta = can_direct_source_read && method == Method::HEAD;
let can_handle_range_from_source = can_direct_source_read && has_range;
// For chunk manifest or any non-streaming path, we need the full data.
// If we can't stream, do a full read now.
if !can_stream && !can_handle_head_from_meta && !can_handle_range_from_source {
// Re-read with full data
let mut n_full = Needle {
id: needle_id,
cookie,
..Needle::default()
};
// Try meta-only read first for potential streaming
let store = state.store.read().unwrap();
match store.read_volume_needle_opt(vid, &mut n_full, read_deleted) {
Ok(count) => {
if count < 0 {
return StatusCode::NOT_FOUND.into_response();
}
}
let si_result = store.read_volume_needle_stream_info(vid, &mut n, read_deleted);
stream_info = match si_result {
Ok(info) => Some(info),
Err(crate::storage::volume::VolumeError::StreamingUnsupported) => None,
Err(crate::storage::volume::VolumeError::NotFound) => {
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::ERROR_GET_NOT_FOUND])
.inc();
return StatusCode::NOT_FOUND.into_response();
}
Err(crate::storage::volume::VolumeError::Deleted) => {
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::ERROR_GET_NOT_FOUND])
.inc();
return StatusCode::NOT_FOUND.into_response();
}
Err(e) => {
metrics::HANDLER_COUNTER
.with_label_values(&[metrics::ERROR_GET_INTERNAL])
.inc();
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("read error: {}", e),
)
.into_response();
}
}
};
drop(store);
// Use the full needle from here (it has the same metadata + data)
n = n_full;
// Validate cookie
if n.cookie != cookie {
return StatusCode::NOT_FOUND.into_response();
}
bypass_cm = query.cm.as_deref() == Some("false");
track_download = download_guard.is_some();
let can_direct_source_read = stream_info.is_some()
&& !n.is_compressed()
&& !(n.is_chunk_manifest() && !bypass_cm)
&& !has_image_ops;
// Determine if we can stream (large, direct-source eligible, no range)
can_stream = can_direct_source_read
&& n.data_size > STREAMING_THRESHOLD
&& !has_range
&& method != Method::HEAD;
can_handle_head_from_meta = can_direct_source_read && method == Method::HEAD;
can_handle_range_from_source = can_direct_source_read && has_range;
// For chunk manifest or any non-streaming path, we need the full data.
// If we can't stream, do a full read now.
if !can_stream && !can_handle_head_from_meta && !can_handle_range_from_source {
// Re-read with full data
let mut n_full = Needle {
id: needle_id,
cookie,
..Needle::default()
};
let store = state.store.read().unwrap();
match store.read_volume_needle_opt(vid, &mut n_full, read_deleted) {
Ok(count) => {
if count < 0 {
return StatusCode::NOT_FOUND.into_response();
}
}
Err(crate::storage::volume::VolumeError::NotFound) => {
return StatusCode::NOT_FOUND.into_response();
}
Err(crate::storage::volume::VolumeError::Deleted) => {
return StatusCode::NOT_FOUND.into_response();
}
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("read error: {}", e),
)
.into_response();
}
}
drop(store);
// Use the full needle from here (it has the same metadata + data)
n = n_full;
}
}
// Build ETag and Last-Modified BEFORE conditional checks and chunk manifest expansion
@ -2318,6 +2381,77 @@ pub async fn delete_handler(
);
}
// Check for EC volume first (Go checks hasEcVolume before regular volume in DeleteHandler).
// Go's flow: FindEcVolume -> DeleteEcShardNeedle(ecVolume, n, cookie) -> writeDeleteResult
// DeleteEcShardNeedle: reads needle (for size + cookie validation), validates cookie, journals delete.
{
let has_ec = state.store.read().unwrap().has_ec_volume(vid);
if has_ec {
// Step 1: Read the EC needle to get its size and validate cookie
let ec_read_result = {
let store = state.store.read().unwrap();
store
.find_ec_volume(vid)
.map(|ecv| ecv.read_ec_shard_needle(needle_id))
};
match ec_read_result {
Some(Ok(Some(ec_needle))) => {
// Step 2: Validate cookie (Go: cookie != 0 && cookie != n.Cookie)
if cookie.0 != 0 && ec_needle.cookie != cookie {
return json_error_with_query(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Deletion Failed: unexpected cookie {:x}", cookie.0),
Some(&del_query),
);
}
let count = ec_needle.data_size as i64;
// Step 3: Journal the delete
let mut store = state.store.write().unwrap();
if let Some(ecv) = store.find_ec_volume_mut(vid) {
if let Err(e) = ecv.journal_delete(needle_id) {
return json_error_with_query(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Deletion Failed: {}", e),
Some(&del_query),
);
}
}
let result = DeleteResult { size: count };
return json_response_with_params(
StatusCode::ACCEPTED,
&result,
Some(&del_params),
);
}
Some(Ok(None)) => {
// Needle not found in EC volume
let result = DeleteResult { size: 0 };
return json_response_with_params(
StatusCode::NOT_FOUND,
&result,
Some(&del_params),
);
}
Some(Err(e)) => {
return json_error_with_query(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Deletion Failed: {}", e),
Some(&del_query),
);
}
None => {
// EC volume disappeared between has_ec check and find
let result = DeleteResult { size: 0 };
return json_response_with_params(
StatusCode::NOT_FOUND,
&result,
Some(&del_params),
);
}
}
}
}
// H9: Parse custom timestamp from query param; default to now (not 0)
let del_ts_str = del_query
.split('&')

Loading…
Cancel
Save