|
|
|
@ -299,6 +299,84 @@ async fn lookup_volume( |
|
|
|
Ok(result.locations.unwrap_or_default())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Helper to synchronously replicate a request to peer volume servers.
|
|
|
|
async fn do_replicated_request(
|
|
|
|
state: &VolumeServerState,
|
|
|
|
vid: u32,
|
|
|
|
method: axum::http::Method,
|
|
|
|
path: &str,
|
|
|
|
query: &str,
|
|
|
|
headers: &axum::http::HeaderMap,
|
|
|
|
body: Option<bytes::Bytes>,
|
|
|
|
) -> Result<(), String> {
|
|
|
|
let locations = lookup_volume(&state.http_client, &state.master_url, vid)
|
|
|
|
.await
|
|
|
|
.map_err(|e| format!("lookup volume failed: {}", e))?;
|
|
|
|
|
|
|
|
let remote_locations: Vec<_> = locations
|
|
|
|
.into_iter()
|
|
|
|
.filter(|loc| loc.url != state.self_url && loc.public_url != state.self_url)
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
if remote_locations.is_empty() {
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
|
|
|
|
let new_query = if query.is_empty() {
|
|
|
|
String::from("type=replicate")
|
|
|
|
} else {
|
|
|
|
format!("{}&type=replicate", query)
|
|
|
|
};
|
|
|
|
|
|
|
|
let mut futures = Vec::new();
|
|
|
|
for loc in remote_locations {
|
|
|
|
let url = format!("http://{}{}?{}", loc.url, path, new_query);
|
|
|
|
let client = state.http_client.clone();
|
|
|
|
|
|
|
|
let mut req_builder = client.request(method.clone(), &url);
|
|
|
|
|
|
|
|
// Forward relevant headers
|
|
|
|
if let Some(ct) = headers.get(axum::http::header::CONTENT_TYPE) {
|
|
|
|
req_builder = req_builder.header(axum::http::header::CONTENT_TYPE, ct);
|
|
|
|
}
|
|
|
|
if let Some(ce) = headers.get(axum::http::header::CONTENT_ENCODING) {
|
|
|
|
req_builder = req_builder.header(axum::http::header::CONTENT_ENCODING, ce);
|
|
|
|
}
|
|
|
|
if let Some(md5) = headers.get("Content-MD5") {
|
|
|
|
req_builder = req_builder.header("Content-MD5", md5);
|
|
|
|
}
|
|
|
|
if let Some(auth) = headers.get(axum::http::header::AUTHORIZATION) {
|
|
|
|
req_builder = req_builder.header(axum::http::header::AUTHORIZATION, auth);
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(ref b) = body {
|
|
|
|
req_builder = req_builder.body(b.clone());
|
|
|
|
}
|
|
|
|
|
|
|
|
futures.push(async move {
|
|
|
|
match req_builder.send().await {
|
|
|
|
Ok(r) if r.status().is_success() => Ok(()),
|
|
|
|
Ok(r) => Err(format!("{} returned status {}", url, r.status())),
|
|
|
|
Err(e) => Err(format!("{} failed: {}", url, e)),
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
let results = futures::future::join_all(futures).await;
|
|
|
|
let mut errors = Vec::new();
|
|
|
|
for res in results {
|
|
|
|
if let Err(e) = res {
|
|
|
|
errors.push(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if !errors.is_empty() {
|
|
|
|
return Err(errors.join(", "));
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Extracted request info needed for proxy/redirect (avoids borrowing Request across await).
|
|
|
|
struct ProxyRequestInfo {
|
|
|
|
original_headers: HeaderMap,
|
|
|
|
@ -1426,7 +1504,6 @@ pub async fn post_handler( |
|
|
|
n.set_has_name();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Use the write queue if enabled, otherwise write directly.
|
|
|
|
let write_result = if let Some(wq) = state.write_queue.get() {
|
|
|
|
wq.submit(vid, n.clone()).await
|
|
|
|
} else {
|
|
|
|
@ -1434,6 +1511,27 @@ pub async fn post_handler( |
|
|
|
store.write_volume_needle(vid, &mut n)
|
|
|
|
};
|
|
|
|
|
|
|
|
if !is_replicate && write_result.is_ok() && !state.master_url.is_empty() {
|
|
|
|
if let Err(e) = do_replicated_request(
|
|
|
|
&state,
|
|
|
|
vid.0,
|
|
|
|
Method::POST,
|
|
|
|
&path,
|
|
|
|
&query,
|
|
|
|
&headers,
|
|
|
|
Some(body.clone()),
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
{
|
|
|
|
tracing::error!("replicated write failed: {}", e);
|
|
|
|
return (
|
|
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
|
|
format!("replication failed: {}", e),
|
|
|
|
)
|
|
|
|
.into_response();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let resp = match write_result {
|
|
|
|
Ok((_offset, _size, is_unchanged)) => {
|
|
|
|
if is_unchanged {
|
|
|
|
@ -1623,8 +1721,34 @@ pub async fn delete_handler( |
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut store = state.store.write().unwrap();
|
|
|
|
match store.delete_volume_needle(vid, &mut n) {
|
|
|
|
let delete_result = {
|
|
|
|
let mut store = state.store.write().unwrap();
|
|
|
|
store.delete_volume_needle(vid, &mut n)
|
|
|
|
};
|
|
|
|
|
|
|
|
let is_replicate = request.uri().query().unwrap_or("").split('&').any(|p| p == "type=replicate");
|
|
|
|
if !is_replicate && delete_result.is_ok() && !state.master_url.is_empty() {
|
|
|
|
if let Err(e) = do_replicated_request(
|
|
|
|
&state,
|
|
|
|
vid.0,
|
|
|
|
Method::DELETE,
|
|
|
|
&path,
|
|
|
|
request.uri().query().unwrap_or(""),
|
|
|
|
&headers,
|
|
|
|
None,
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
{
|
|
|
|
tracing::error!("replicated delete failed: {}", e);
|
|
|
|
return (
|
|
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
|
|
format!("replication failed: {}", e),
|
|
|
|
)
|
|
|
|
.into_response();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
match delete_result {
|
|
|
|
Ok(size) => {
|
|
|
|
if size.0 == 0 {
|
|
|
|
let result = DeleteResult { size: 0 };
|
|
|
|
|