|
|
|
@ -4,6 +4,7 @@ |
|
|
|
//! Matches Go's volume_server_handlers_read.go, volume_server_handlers_write.go,
|
|
|
|
//! volume_server_handlers_admin.go.
|
|
|
|
|
|
|
|
use std::collections::HashMap;
|
|
|
|
use std::future::Future;
|
|
|
|
use std::sync::atomic::Ordering;
|
|
|
|
use std::sync::Arc;
|
|
|
|
@ -14,9 +15,11 @@ use axum::http::{header, HeaderMap, Method, Request, StatusCode}; |
|
|
|
use axum::response::{IntoResponse, Response};
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
|
|
|
|
use super::grpc_client::{build_grpc_endpoint, GRPC_MAX_MESSAGE_SIZE};
|
|
|
|
use super::volume_server::{normalize_outgoing_http_url, VolumeServerState};
|
|
|
|
use crate::config::ReadMode;
|
|
|
|
use crate::metrics;
|
|
|
|
use crate::pb::volume_server_pb;
|
|
|
|
use crate::storage::needle::needle::Needle;
|
|
|
|
use crate::storage::types::*;
|
|
|
|
|
|
|
|
@ -329,11 +332,13 @@ fn parse_url_path(path: &str) -> Option<(VolumeId, NeedleId, Cookie)> { |
|
|
|
// ============================================================================
|
|
|
|
|
|
|
|
/// A volume location returned by master lookup.
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
|
|
#[derive(Clone, Debug, Deserialize)]
|
|
|
|
struct VolumeLocation {
|
|
|
|
url: String,
|
|
|
|
#[serde(rename = "publicUrl")]
|
|
|
|
public_url: String,
|
|
|
|
#[serde(rename = "grpcPort", default)]
|
|
|
|
grpc_port: u32,
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Master /dir/lookup response.
|
|
|
|
@ -373,6 +378,120 @@ async fn lookup_volume( |
|
|
|
Ok(result.locations.unwrap_or_default())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn grpc_address_for_location(location: &VolumeLocation) -> Result<String, String> {
|
|
|
|
let raw = location
|
|
|
|
.url
|
|
|
|
.trim_start_matches("http://")
|
|
|
|
.trim_start_matches("https://");
|
|
|
|
|
|
|
|
if location.grpc_port > 0 {
|
|
|
|
let (host, _) = raw
|
|
|
|
.rsplit_once(':')
|
|
|
|
.ok_or_else(|| format!("cannot parse address: {}", location.url))?;
|
|
|
|
return Ok(format!("{}:{}", host, location.grpc_port));
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(colon_idx) = raw.rfind(':') {
|
|
|
|
let port_part = &raw[colon_idx + 1..];
|
|
|
|
if let Some(dot_idx) = port_part.rfind('.') {
|
|
|
|
let host = &raw[..colon_idx];
|
|
|
|
let grpc_port = &port_part[dot_idx + 1..];
|
|
|
|
grpc_port
|
|
|
|
.parse::<u16>()
|
|
|
|
.map_err(|e| format!("invalid grpc port: {}", e))?;
|
|
|
|
return Ok(format!("{}:{}", host, grpc_port));
|
|
|
|
}
|
|
|
|
|
|
|
|
let port: u16 = port_part
|
|
|
|
.parse()
|
|
|
|
.map_err(|e| format!("invalid port: {}", e))?;
|
|
|
|
let host = &raw[..colon_idx];
|
|
|
|
return Ok(format!("{}:{}", host, port as u32 + 10000));
|
|
|
|
}
|
|
|
|
|
|
|
|
Err(format!("cannot parse address: {}", location.url))
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn batch_delete_file_ids(
|
|
|
|
state: &VolumeServerState,
|
|
|
|
file_ids: &[String],
|
|
|
|
) -> Result<(), String> {
|
|
|
|
let mut lookup_cache: HashMap<u32, Vec<VolumeLocation>> = HashMap::new();
|
|
|
|
let mut server_to_file_ids: HashMap<String, Vec<String>> = HashMap::new();
|
|
|
|
|
|
|
|
for file_id in file_ids {
|
|
|
|
let parsed = crate::storage::needle::needle::FileId::parse(file_id)
|
|
|
|
.map_err(|e| format!("chunk delete {}: {}", file_id, e))?;
|
|
|
|
let volume_id = parsed.volume_id.0;
|
|
|
|
|
|
|
|
let locations = if let Some(locations) = lookup_cache.get(&volume_id) {
|
|
|
|
locations.clone()
|
|
|
|
} else {
|
|
|
|
let locations = lookup_volume(
|
|
|
|
&state.http_client,
|
|
|
|
&state.outgoing_http_scheme,
|
|
|
|
&state.master_url,
|
|
|
|
volume_id,
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
.map_err(|e| format!("chunk delete {}: {}", file_id, e))?;
|
|
|
|
if locations.is_empty() {
|
|
|
|
return Err(format!("chunk delete {}: file not found", file_id));
|
|
|
|
}
|
|
|
|
lookup_cache.insert(volume_id, locations.clone());
|
|
|
|
locations
|
|
|
|
};
|
|
|
|
|
|
|
|
for location in locations {
|
|
|
|
let grpc_addr = grpc_address_for_location(&location)
|
|
|
|
.map_err(|e| format!("chunk delete {}: {}", file_id, e))?;
|
|
|
|
server_to_file_ids
|
|
|
|
.entry(grpc_addr)
|
|
|
|
.or_default()
|
|
|
|
.push(file_id.clone());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for (grpc_addr, batch) in server_to_file_ids {
|
|
|
|
let endpoint = build_grpc_endpoint(&grpc_addr, state.outgoing_grpc_tls.as_ref())
|
|
|
|
.map_err(|e| format!("batch delete {}: {}", grpc_addr, e))?;
|
|
|
|
let channel = endpoint
|
|
|
|
.connect()
|
|
|
|
.await
|
|
|
|
.map_err(|e| format!("batch delete {}: {}", grpc_addr, e))?;
|
|
|
|
let mut client =
|
|
|
|
volume_server_pb::volume_server_client::VolumeServerClient::with_interceptor(
|
|
|
|
channel,
|
|
|
|
super::request_id::outgoing_request_id_interceptor,
|
|
|
|
)
|
|
|
|
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
|
|
|
|
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE);
|
|
|
|
|
|
|
|
let response = client
|
|
|
|
.batch_delete(volume_server_pb::BatchDeleteRequest {
|
|
|
|
file_ids: batch.clone(),
|
|
|
|
skip_cookie_check: true,
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(|e| format!("batch delete {}: {}", grpc_addr, e))?
|
|
|
|
.into_inner();
|
|
|
|
|
|
|
|
for result in response.results {
|
|
|
|
if !result.error.is_empty() {
|
|
|
|
return Err(format!("chunk delete {}: {}", result.file_id, result.error));
|
|
|
|
}
|
|
|
|
if result.status >= 400 {
|
|
|
|
return Err(format!(
|
|
|
|
"chunk delete {}: status {}",
|
|
|
|
result.file_id, result.status
|
|
|
|
));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Helper to synchronously replicate a request to peer volume servers.
|
|
|
|
async fn do_replicated_request(
|
|
|
|
state: &VolumeServerState,
|
|
|
|
@ -2589,6 +2708,8 @@ pub async fn delete_handler( |
|
|
|
n.last_modified = del_last_modified;
|
|
|
|
n.set_has_last_modified_date();
|
|
|
|
|
|
|
|
let mut delete_size_override = None;
|
|
|
|
|
|
|
|
// If this is a chunk manifest, delete child chunks first
|
|
|
|
if n.is_chunk_manifest() {
|
|
|
|
let manifest_data = if n.is_compressed() {
|
|
|
|
@ -2605,60 +2726,30 @@ pub async fn delete_handler( |
|
|
|
n.data.clone()
|
|
|
|
};
|
|
|
|
|
|
|
|
if let Ok(manifest) = serde_json::from_slice::<ChunkManifest>(&manifest_data) {
|
|
|
|
// Delete all child chunks first
|
|
|
|
for chunk in &manifest.chunks {
|
|
|
|
let (chunk_vid, chunk_nid, chunk_cookie) = match parse_url_path(&chunk.fid) {
|
|
|
|
Some(p) => p,
|
|
|
|
None => {
|
|
|
|
return json_error_with_query(
|
|
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
|
|
format!("invalid chunk fid: {}", chunk.fid),
|
|
|
|
Some(&del_query),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
let mut chunk_needle = Needle {
|
|
|
|
id: chunk_nid,
|
|
|
|
cookie: chunk_cookie,
|
|
|
|
..Needle::default()
|
|
|
|
};
|
|
|
|
// Read the chunk to validate it exists
|
|
|
|
{
|
|
|
|
let store = state.store.read().unwrap();
|
|
|
|
if let Err(e) = store.read_volume_needle(chunk_vid, &mut chunk_needle) {
|
|
|
|
return json_error_with_query(
|
|
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
|
|
format!("read chunk {}: {}", chunk.fid, e),
|
|
|
|
Some(&del_query),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Delete the chunk
|
|
|
|
let mut store = state.store.write().unwrap();
|
|
|
|
if let Err(e) = store.delete_volume_needle(chunk_vid, &mut chunk_needle) {
|
|
|
|
return json_error_with_query(
|
|
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
|
|
format!("delete chunk {}: {}", chunk.fid, e),
|
|
|
|
Some(&del_query),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Delete the manifest itself
|
|
|
|
let mut store = state.store.write().unwrap();
|
|
|
|
if let Err(e) = store.delete_volume_needle(vid, &mut n) {
|
|
|
|
let manifest = match serde_json::from_slice::<ChunkManifest>(&manifest_data) {
|
|
|
|
Ok(manifest) => manifest,
|
|
|
|
Err(e) => {
|
|
|
|
return json_error_with_query(
|
|
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
|
|
format!("delete manifest: {}", e),
|
|
|
|
format!("Load chunks manifest error: {}", e),
|
|
|
|
Some(&del_query),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
// Return the manifest's declared size (matches Go behavior)
|
|
|
|
let result = DeleteResult {
|
|
|
|
size: manifest.size as i64,
|
|
|
|
};
|
|
|
|
return json_response_with_params(StatusCode::ACCEPTED, &result, Some(&del_params));
|
|
|
|
};
|
|
|
|
|
|
|
|
let child_fids: Vec<String> = manifest
|
|
|
|
.chunks
|
|
|
|
.iter()
|
|
|
|
.map(|chunk| chunk.fid.clone())
|
|
|
|
.collect();
|
|
|
|
if let Err(e) = batch_delete_file_ids(&state, &child_fids).await {
|
|
|
|
return json_error_with_query(
|
|
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
|
|
format!("Delete chunks error: {}", e),
|
|
|
|
Some(&del_query),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
delete_size_override = Some(manifest.size as i64);
|
|
|
|
}
|
|
|
|
|
|
|
|
let delete_result = {
|
|
|
|
@ -2699,7 +2790,7 @@ pub async fn delete_handler( |
|
|
|
match delete_result {
|
|
|
|
Ok(size) => {
|
|
|
|
let result = DeleteResult {
|
|
|
|
size: size.0 as i64,
|
|
|
|
size: delete_size_override.unwrap_or(size.0 as i64),
|
|
|
|
};
|
|
|
|
json_response_with_params(StatusCode::ACCEPTED, &result, Some(&del_params))
|
|
|
|
}
|
|
|
|
@ -3761,6 +3852,7 @@ mod tests { |
|
|
|
let target = VolumeLocation {
|
|
|
|
url: "volume.internal:8080".to_string(),
|
|
|
|
public_url: "volume.public:8080".to_string(),
|
|
|
|
grpc_port: 18080,
|
|
|
|
};
|
|
|
|
|
|
|
|
let response = redirect_request(&info, &target, "https");
|
|
|
|
|