From 432003897275dbfc774cff208bf94e88cf1f1a60 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2026 10:15:55 -0700 Subject: [PATCH] feat(server): implement synchronous replication in HTTP handlers --- seaweed-volume/src/server/handlers.rs | 130 +++++++++++++++++++++++++- 1 file changed, 127 insertions(+), 3 deletions(-) diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index a666cd1df..2973cfa34 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -299,6 +299,84 @@ async fn lookup_volume( Ok(result.locations.unwrap_or_default()) } +/// Helper to synchronously replicate a request to peer volume servers. +async fn do_replicated_request( + state: &VolumeServerState, + vid: u32, + method: axum::http::Method, + path: &str, + query: &str, + headers: &axum::http::HeaderMap, + body: Option, +) -> Result<(), String> { + let locations = lookup_volume(&state.http_client, &state.master_url, vid) + .await + .map_err(|e| format!("lookup volume failed: {}", e))?; + + let remote_locations: Vec<_> = locations + .into_iter() + .filter(|loc| loc.url != state.self_url && loc.public_url != state.self_url) + .collect(); + + if remote_locations.is_empty() { + return Ok(()); + } + + let new_query = if query.is_empty() { + String::from("type=replicate") + } else { + format!("{}&type=replicate", query) + }; + + let mut futures = Vec::new(); + for loc in remote_locations { + let url = format!("http://{}{}?{}", loc.url, path, new_query); + let client = state.http_client.clone(); + + let mut req_builder = client.request(method.clone(), &url); + + // Forward relevant headers + if let Some(ct) = headers.get(axum::http::header::CONTENT_TYPE) { + req_builder = req_builder.header(axum::http::header::CONTENT_TYPE, ct); + } + if let Some(ce) = headers.get(axum::http::header::CONTENT_ENCODING) { + req_builder = req_builder.header(axum::http::header::CONTENT_ENCODING, ce); + } + if let Some(md5) = headers.get("Content-MD5") { + req_builder = req_builder.header("Content-MD5", md5); + } + if let Some(auth) = headers.get(axum::http::header::AUTHORIZATION) { + req_builder = req_builder.header(axum::http::header::AUTHORIZATION, auth); + } + + if let Some(ref b) = body { + req_builder = req_builder.body(b.clone()); + } + + futures.push(async move { + match req_builder.send().await { + Ok(r) if r.status().is_success() => Ok(()), + Ok(r) => Err(format!("{} returned status {}", url, r.status())), + Err(e) => Err(format!("{} failed: {}", url, e)), + } + }); + } + + let results = futures::future::join_all(futures).await; + let mut errors = Vec::new(); + for res in results { + if let Err(e) = res { + errors.push(e); + } + } + + if !errors.is_empty() { + return Err(errors.join(", ")); + } + + Ok(()) +} + /// Extracted request info needed for proxy/redirect (avoids borrowing Request across await). struct ProxyRequestInfo { original_headers: HeaderMap, @@ -1426,7 +1504,6 @@ pub async fn post_handler( n.set_has_name(); } - // Use the write queue if enabled, otherwise write directly. let write_result = if let Some(wq) = state.write_queue.get() { wq.submit(vid, n.clone()).await } else { @@ -1434,6 +1511,27 @@ pub async fn post_handler( store.write_volume_needle(vid, &mut n) }; + if !is_replicate && write_result.is_ok() && !state.master_url.is_empty() { + if let Err(e) = do_replicated_request( + &state, + vid.0, + Method::POST, + &path, + &query, + &headers, + Some(body.clone()), + ) + .await + { + tracing::error!("replicated write failed: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("replication failed: {}", e), + ) + .into_response(); + } + } + let resp = match write_result { Ok((_offset, _size, is_unchanged)) => { if is_unchanged { @@ -1623,8 +1721,34 @@ pub async fn delete_handler( } } - let mut store = state.store.write().unwrap(); - match store.delete_volume_needle(vid, &mut n) { + let delete_result = { + let mut store = state.store.write().unwrap(); + store.delete_volume_needle(vid, &mut n) + }; + + let is_replicate = request.uri().query().unwrap_or("").split('&').any(|p| p == "type=replicate"); + if !is_replicate && delete_result.is_ok() && !state.master_url.is_empty() { + if let Err(e) = do_replicated_request( + &state, + vid.0, + Method::DELETE, + &path, + request.uri().query().unwrap_or(""), + &headers, + None, + ) + .await + { + tracing::error!("replicated delete failed: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("replication failed: {}", e), + ) + .into_response(); + } + } + + match delete_result { Ok(size) => { if size.0 == 0 { let result = DeleteResult { size: 0 };