diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index 963a03fa5..4a10f9fa7 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -4,6 +4,7 @@ //! Matches Go's volume_server_handlers_read.go, volume_server_handlers_write.go, //! volume_server_handlers_admin.go. +use std::collections::HashMap; use std::future::Future; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -14,9 +15,11 @@ use axum::http::{header, HeaderMap, Method, Request, StatusCode}; use axum::response::{IntoResponse, Response}; use serde::{Deserialize, Serialize}; +use super::grpc_client::{build_grpc_endpoint, GRPC_MAX_MESSAGE_SIZE}; use super::volume_server::{normalize_outgoing_http_url, VolumeServerState}; use crate::config::ReadMode; use crate::metrics; +use crate::pb::volume_server_pb; use crate::storage::needle::needle::Needle; use crate::storage::types::*; @@ -329,11 +332,13 @@ fn parse_url_path(path: &str) -> Option<(VolumeId, NeedleId, Cookie)> { // ============================================================================ /// A volume location returned by master lookup. -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] struct VolumeLocation { url: String, #[serde(rename = "publicUrl")] public_url: String, + #[serde(rename = "grpcPort", default)] + grpc_port: u32, } /// Master /dir/lookup response. @@ -373,6 +378,120 @@ async fn lookup_volume( Ok(result.locations.unwrap_or_default()) } +fn grpc_address_for_location(location: &VolumeLocation) -> Result { + let raw = location + .url + .trim_start_matches("http://") + .trim_start_matches("https://"); + + if location.grpc_port > 0 { + let (host, _) = raw + .rsplit_once(':') + .ok_or_else(|| format!("cannot parse address: {}", location.url))?; + return Ok(format!("{}:{}", host, location.grpc_port)); + } + + if let Some(colon_idx) = raw.rfind(':') { + let port_part = &raw[colon_idx + 1..]; + if let Some(dot_idx) = port_part.rfind('.') { + let host = &raw[..colon_idx]; + let grpc_port = &port_part[dot_idx + 1..]; + grpc_port + .parse::() + .map_err(|e| format!("invalid grpc port: {}", e))?; + return Ok(format!("{}:{}", host, grpc_port)); + } + + let port: u16 = port_part + .parse() + .map_err(|e| format!("invalid port: {}", e))?; + let host = &raw[..colon_idx]; + return Ok(format!("{}:{}", host, port as u32 + 10000)); + } + + Err(format!("cannot parse address: {}", location.url)) +} + +async fn batch_delete_file_ids( + state: &VolumeServerState, + file_ids: &[String], +) -> Result<(), String> { + let mut lookup_cache: HashMap> = HashMap::new(); + let mut server_to_file_ids: HashMap> = HashMap::new(); + + for file_id in file_ids { + let parsed = crate::storage::needle::needle::FileId::parse(file_id) + .map_err(|e| format!("chunk delete {}: {}", file_id, e))?; + let volume_id = parsed.volume_id.0; + + let locations = if let Some(locations) = lookup_cache.get(&volume_id) { + locations.clone() + } else { + let locations = lookup_volume( + &state.http_client, + &state.outgoing_http_scheme, + &state.master_url, + volume_id, + ) + .await + .map_err(|e| format!("chunk delete {}: {}", file_id, e))?; + if locations.is_empty() { + return Err(format!("chunk delete {}: file not found", file_id)); + } + lookup_cache.insert(volume_id, locations.clone()); + locations + }; + + for location in locations { + let grpc_addr = grpc_address_for_location(&location) + .map_err(|e| format!("chunk delete {}: {}", file_id, e))?; + server_to_file_ids + .entry(grpc_addr) + .or_default() + .push(file_id.clone()); + } + } + + for (grpc_addr, batch) in server_to_file_ids { + let endpoint = build_grpc_endpoint(&grpc_addr, state.outgoing_grpc_tls.as_ref()) + .map_err(|e| format!("batch delete {}: {}", grpc_addr, e))?; + let channel = endpoint + .connect() + .await + .map_err(|e| format!("batch delete {}: {}", grpc_addr, e))?; + let mut client = + volume_server_pb::volume_server_client::VolumeServerClient::with_interceptor( + channel, + super::request_id::outgoing_request_id_interceptor, + ) + .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE) + .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE); + + let response = client + .batch_delete(volume_server_pb::BatchDeleteRequest { + file_ids: batch.clone(), + skip_cookie_check: true, + }) + .await + .map_err(|e| format!("batch delete {}: {}", grpc_addr, e))? + .into_inner(); + + for result in response.results { + if !result.error.is_empty() { + return Err(format!("chunk delete {}: {}", result.file_id, result.error)); + } + if result.status >= 400 { + return Err(format!( + "chunk delete {}: status {}", + result.file_id, result.status + )); + } + } + } + + Ok(()) +} + /// Helper to synchronously replicate a request to peer volume servers. async fn do_replicated_request( state: &VolumeServerState, @@ -2589,6 +2708,8 @@ pub async fn delete_handler( n.last_modified = del_last_modified; n.set_has_last_modified_date(); + let mut delete_size_override = None; + // If this is a chunk manifest, delete child chunks first if n.is_chunk_manifest() { let manifest_data = if n.is_compressed() { @@ -2605,60 +2726,30 @@ pub async fn delete_handler( n.data.clone() }; - if let Ok(manifest) = serde_json::from_slice::(&manifest_data) { - // Delete all child chunks first - for chunk in &manifest.chunks { - let (chunk_vid, chunk_nid, chunk_cookie) = match parse_url_path(&chunk.fid) { - Some(p) => p, - None => { - return json_error_with_query( - StatusCode::INTERNAL_SERVER_ERROR, - format!("invalid chunk fid: {}", chunk.fid), - Some(&del_query), - ); - } - }; - let mut chunk_needle = Needle { - id: chunk_nid, - cookie: chunk_cookie, - ..Needle::default() - }; - // Read the chunk to validate it exists - { - let store = state.store.read().unwrap(); - if let Err(e) = store.read_volume_needle(chunk_vid, &mut chunk_needle) { - return json_error_with_query( - StatusCode::INTERNAL_SERVER_ERROR, - format!("read chunk {}: {}", chunk.fid, e), - Some(&del_query), - ); - } - } - // Delete the chunk - let mut store = state.store.write().unwrap(); - if let Err(e) = store.delete_volume_needle(chunk_vid, &mut chunk_needle) { - return json_error_with_query( - StatusCode::INTERNAL_SERVER_ERROR, - format!("delete chunk {}: {}", chunk.fid, e), - Some(&del_query), - ); - } - } - // Delete the manifest itself - let mut store = state.store.write().unwrap(); - if let Err(e) = store.delete_volume_needle(vid, &mut n) { + let manifest = match serde_json::from_slice::(&manifest_data) { + Ok(manifest) => manifest, + Err(e) => { return json_error_with_query( StatusCode::INTERNAL_SERVER_ERROR, - format!("delete manifest: {}", e), + format!("Load chunks manifest error: {}", e), Some(&del_query), ); } - // Return the manifest's declared size (matches Go behavior) - let result = DeleteResult { - size: manifest.size as i64, - }; - return json_response_with_params(StatusCode::ACCEPTED, &result, Some(&del_params)); + }; + + let child_fids: Vec = manifest + .chunks + .iter() + .map(|chunk| chunk.fid.clone()) + .collect(); + if let Err(e) = batch_delete_file_ids(&state, &child_fids).await { + return json_error_with_query( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Delete chunks error: {}", e), + Some(&del_query), + ); } + delete_size_override = Some(manifest.size as i64); } let delete_result = { @@ -2699,7 +2790,7 @@ pub async fn delete_handler( match delete_result { Ok(size) => { let result = DeleteResult { - size: size.0 as i64, + size: delete_size_override.unwrap_or(size.0 as i64), }; json_response_with_params(StatusCode::ACCEPTED, &result, Some(&del_params)) } @@ -3761,6 +3852,7 @@ mod tests { let target = VolumeLocation { url: "volume.internal:8080".to_string(), public_url: "volume.public:8080".to_string(), + grpc_port: 18080, }; let response = redirect_request(&info, &target, "https");