diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index 95a88b994..8891161fc 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -570,15 +570,11 @@ async fn proxy_request( response_headers.insert(name.clone(), value.clone()); } - let body_bytes = match resp.bytes().await { - Ok(b) => b, - Err(e) => { - tracing::warn!("proxy response read failed: {}", e); - return StatusCode::INTERNAL_SERVER_ERROR.into_response(); - } - }; + // Stream the proxy response body instead of buffering it entirely + let byte_stream = resp.bytes_stream(); + let body = Body::from_stream(byte_stream); - let mut response = Response::new(Body::from(body_bytes)); + let mut response = Response::new(body); *response.status_mut() = status; *response.headers_mut() = response_headers; response @@ -613,6 +609,7 @@ fn redirect_request(info: &ProxyRequestInfo, target: &VolumeLocation, scheme: &s Response::builder() .status(StatusCode::MOVED_PERMANENTLY) .header("Location", &location) + .header("Content-Type", "text/html; charset=utf-8") .body(Body::from(format!( "Moved Permanently.\n\n", location @@ -726,11 +723,12 @@ async fn get_or_head_handler_inner( }; // Check if volume exists locally; if not, proxy/redirect based on read_mode. - // This mirrors Go's hasVolume check in GetOrHeadHandler. + // This mirrors Go's hasVolume + hasEcVolume check in GetOrHeadHandler. // NOTE: The RwLockReadGuard must be dropped before any .await to keep the future Send. let has_volume = state.store.read().unwrap().has_volume(vid); + let has_ec_volume = state.store.read().unwrap().has_ec_volume(vid); - if !has_volume { + if !has_volume && !has_ec_volume { // Check if already proxied (loop prevention) let query_string = request.uri().query().unwrap_or("").to_string(); let is_proxied = query_string.contains("proxied=true"); @@ -803,7 +801,8 @@ async fn get_or_head_handler_inner( None }; - // Read needle — first do a meta-only read to check if streaming is appropriate + // Read needle — branching between regular volume and EC volume paths. + // EC volumes always do a full read (no streaming/meta-only). let mut n = Needle { id: needle_id, cookie, @@ -820,91 +819,155 @@ async fn get_or_head_handler_inner( && (query.crop_x1.is_some() || query.crop_y1.is_some()); let has_image_ops = has_resize_ops || has_crop_ops; - // Try meta-only read first for potential streaming - let store = state.store.read().unwrap(); - let stream_info = store.read_volume_needle_stream_info(vid, &mut n, read_deleted); - let stream_info = match stream_info { - Ok(info) => Some(info), - Err(crate::storage::volume::VolumeError::StreamingUnsupported) => None, - Err(crate::storage::volume::VolumeError::NotFound) => { - metrics::HANDLER_COUNTER - .with_label_values(&[metrics::ERROR_GET_NOT_FOUND]) - .inc(); - return StatusCode::NOT_FOUND.into_response(); + // Stream info is only available for regular volumes, not EC volumes. + let stream_info; + let bypass_cm; + let track_download; + let can_stream; + let can_handle_head_from_meta; + let can_handle_range_from_source; + + if has_ec_volume && !has_volume { + // ---- EC volume read path (always full read, no streaming) ---- + let store = state.store.read().unwrap(); + match store.find_ec_volume(vid) { + Some(ecv) => match ecv.read_ec_shard_needle(needle_id) { + Ok(Some(ec_needle)) => { + n = ec_needle; + } + Ok(None) => { + metrics::HANDLER_COUNTER + .with_label_values(&[metrics::ERROR_GET_NOT_FOUND]) + .inc(); + return StatusCode::NOT_FOUND.into_response(); + } + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + metrics::HANDLER_COUNTER + .with_label_values(&[metrics::ERROR_GET_NOT_FOUND]) + .inc(); + return StatusCode::NOT_FOUND.into_response(); + } + metrics::HANDLER_COUNTER + .with_label_values(&[metrics::ERROR_GET_INTERNAL]) + .inc(); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("read ec error: {}", e), + ) + .into_response(); + } + }, + None => { + metrics::HANDLER_COUNTER + .with_label_values(&[metrics::ERROR_GET_NOT_FOUND]) + .inc(); + return StatusCode::NOT_FOUND.into_response(); + } } - Err(crate::storage::volume::VolumeError::Deleted) => { - metrics::HANDLER_COUNTER - .with_label_values(&[metrics::ERROR_GET_NOT_FOUND]) - .inc(); + drop(store); + + // Validate cookie (matches Go behavior after ReadEcShardNeedle) + if n.cookie != cookie { return StatusCode::NOT_FOUND.into_response(); } - Err(e) => { - metrics::HANDLER_COUNTER - .with_label_values(&[metrics::ERROR_GET_INTERNAL]) - .inc(); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("read error: {}", e), - ) - .into_response(); - } - }; - drop(store); - // Validate cookie - if n.cookie != cookie { - return StatusCode::NOT_FOUND.into_response(); - } + // EC volumes: no streaming support + stream_info = None; + bypass_cm = query.cm.as_deref() == Some("false"); + track_download = download_guard.is_some(); + can_stream = false; + can_handle_head_from_meta = false; + can_handle_range_from_source = false; + } else { + // ---- Regular volume read path (with streaming support) ---- - let bypass_cm = query.cm.as_deref() == Some("false"); - let track_download = download_guard.is_some(); - let can_direct_source_read = stream_info.is_some() - && !n.is_compressed() - && !(n.is_chunk_manifest() && !bypass_cm) - && !has_image_ops; - - // Determine if we can stream (large, direct-source eligible, no range) - let can_stream = can_direct_source_read - && n.data_size > STREAMING_THRESHOLD - && !has_range - && method != Method::HEAD; - - let can_handle_head_from_meta = can_direct_source_read && method == Method::HEAD; - let can_handle_range_from_source = can_direct_source_read && has_range; - - // For chunk manifest or any non-streaming path, we need the full data. - // If we can't stream, do a full read now. - if !can_stream && !can_handle_head_from_meta && !can_handle_range_from_source { - // Re-read with full data - let mut n_full = Needle { - id: needle_id, - cookie, - ..Needle::default() - }; + // Try meta-only read first for potential streaming let store = state.store.read().unwrap(); - match store.read_volume_needle_opt(vid, &mut n_full, read_deleted) { - Ok(count) => { - if count < 0 { - return StatusCode::NOT_FOUND.into_response(); - } - } + let si_result = store.read_volume_needle_stream_info(vid, &mut n, read_deleted); + stream_info = match si_result { + Ok(info) => Some(info), + Err(crate::storage::volume::VolumeError::StreamingUnsupported) => None, Err(crate::storage::volume::VolumeError::NotFound) => { + metrics::HANDLER_COUNTER + .with_label_values(&[metrics::ERROR_GET_NOT_FOUND]) + .inc(); return StatusCode::NOT_FOUND.into_response(); } Err(crate::storage::volume::VolumeError::Deleted) => { + metrics::HANDLER_COUNTER + .with_label_values(&[metrics::ERROR_GET_NOT_FOUND]) + .inc(); return StatusCode::NOT_FOUND.into_response(); } Err(e) => { + metrics::HANDLER_COUNTER + .with_label_values(&[metrics::ERROR_GET_INTERNAL]) + .inc(); return ( StatusCode::INTERNAL_SERVER_ERROR, format!("read error: {}", e), ) .into_response(); } - } + }; drop(store); - // Use the full needle from here (it has the same metadata + data) - n = n_full; + + // Validate cookie + if n.cookie != cookie { + return StatusCode::NOT_FOUND.into_response(); + } + + bypass_cm = query.cm.as_deref() == Some("false"); + track_download = download_guard.is_some(); + let can_direct_source_read = stream_info.is_some() + && !n.is_compressed() + && !(n.is_chunk_manifest() && !bypass_cm) + && !has_image_ops; + + // Determine if we can stream (large, direct-source eligible, no range) + can_stream = can_direct_source_read + && n.data_size > STREAMING_THRESHOLD + && !has_range + && method != Method::HEAD; + + can_handle_head_from_meta = can_direct_source_read && method == Method::HEAD; + can_handle_range_from_source = can_direct_source_read && has_range; + + // For chunk manifest or any non-streaming path, we need the full data. + // If we can't stream, do a full read now. + if !can_stream && !can_handle_head_from_meta && !can_handle_range_from_source { + // Re-read with full data + let mut n_full = Needle { + id: needle_id, + cookie, + ..Needle::default() + }; + let store = state.store.read().unwrap(); + match store.read_volume_needle_opt(vid, &mut n_full, read_deleted) { + Ok(count) => { + if count < 0 { + return StatusCode::NOT_FOUND.into_response(); + } + } + Err(crate::storage::volume::VolumeError::NotFound) => { + return StatusCode::NOT_FOUND.into_response(); + } + Err(crate::storage::volume::VolumeError::Deleted) => { + return StatusCode::NOT_FOUND.into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("read error: {}", e), + ) + .into_response(); + } + } + drop(store); + // Use the full needle from here (it has the same metadata + data) + n = n_full; + } } // Build ETag and Last-Modified BEFORE conditional checks and chunk manifest expansion @@ -2318,6 +2381,77 @@ pub async fn delete_handler( ); } + // Check for EC volume first (Go checks hasEcVolume before regular volume in DeleteHandler). + // Go's flow: FindEcVolume -> DeleteEcShardNeedle(ecVolume, n, cookie) -> writeDeleteResult + // DeleteEcShardNeedle: reads needle (for size + cookie validation), validates cookie, journals delete. + { + let has_ec = state.store.read().unwrap().has_ec_volume(vid); + if has_ec { + // Step 1: Read the EC needle to get its size and validate cookie + let ec_read_result = { + let store = state.store.read().unwrap(); + store + .find_ec_volume(vid) + .map(|ecv| ecv.read_ec_shard_needle(needle_id)) + }; + match ec_read_result { + Some(Ok(Some(ec_needle))) => { + // Step 2: Validate cookie (Go: cookie != 0 && cookie != n.Cookie) + if cookie.0 != 0 && ec_needle.cookie != cookie { + return json_error_with_query( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Deletion Failed: unexpected cookie {:x}", cookie.0), + Some(&del_query), + ); + } + let count = ec_needle.data_size as i64; + // Step 3: Journal the delete + let mut store = state.store.write().unwrap(); + if let Some(ecv) = store.find_ec_volume_mut(vid) { + if let Err(e) = ecv.journal_delete(needle_id) { + return json_error_with_query( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Deletion Failed: {}", e), + Some(&del_query), + ); + } + } + let result = DeleteResult { size: count }; + return json_response_with_params( + StatusCode::ACCEPTED, + &result, + Some(&del_params), + ); + } + Some(Ok(None)) => { + // Needle not found in EC volume + let result = DeleteResult { size: 0 }; + return json_response_with_params( + StatusCode::NOT_FOUND, + &result, + Some(&del_params), + ); + } + Some(Err(e)) => { + return json_error_with_query( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Deletion Failed: {}", e), + Some(&del_query), + ); + } + None => { + // EC volume disappeared between has_ec check and find + let result = DeleteResult { size: 0 }; + return json_response_with_params( + StatusCode::NOT_FOUND, + &result, + Some(&del_params), + ); + } + } + } + } + // H9: Parse custom timestamp from query param; default to now (not 0) let del_ts_str = del_query .split('&')