From 7e6e0261abeacf652fde922215557cecdcab0772 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Feb 2026 01:22:06 -0800 Subject: [PATCH] feat(rust-volume-server): serve native /status and /healthz in native mode --- rust/volume_server/src/main.rs | 219 ++++++++++++++++++++++++++++++++- 1 file changed, 214 insertions(+), 5 deletions(-) diff --git a/rust/volume_server/src/main.rs b/rust/volume_server/src/main.rs index 981d04c68..6db60ddc8 100644 --- a/rust/volume_server/src/main.rs +++ b/rust/volume_server/src/main.rs @@ -1,5 +1,5 @@ use std::env; -use std::io; +use std::io::{self, Read, Write}; use std::net::{Shutdown, TcpListener, TcpStream}; use std::path::PathBuf; use std::process::{Child, Command, ExitCode, ExitStatus}; @@ -51,6 +51,14 @@ struct BackendPorts { struct ProxySpec { frontend_addr: String, backend_addr: String, + role: ListenerRole, +} + +#[derive(Clone, Copy, PartialEq, Eq)] +enum ListenerRole { + HttpAdmin, + Grpc, + HttpPublic, } fn main() -> ExitCode { @@ -132,6 +140,13 @@ fn run_exec_mode(forwarded: &[String]) -> Result<(), String> { } fn run_proxy_mode(forwarded: &[String]) -> Result<(), String> { + run_supervised_mode(forwarded, false) +} + +fn run_supervised_mode( + forwarded: &[String], + enable_native_http_admin_control: bool, +) -> Result<(), String> { install_signal_handlers(); let weed_binary = resolve_weed_binary()?; @@ -154,7 +169,9 @@ fn run_proxy_mode(forwarded: &[String]) -> Result<(), String> { let mut handles = Vec::new(); for spec in build_listener_specs(&frontend, &backend) { - handles.push(start_proxy_listener(spec)); + let enable_native_http = + enable_native_http_admin_control && spec.role == ListenerRole::HttpAdmin; + handles.push(start_proxy_listener(spec, enable_native_http)); } let mut terminated_by_signal = false; @@ -192,9 +209,9 @@ fn run_proxy_mode(forwarded: &[String]) -> Result<(), String> { fn run_native_mode(forwarded: &[String]) -> Result<(), String> { eprintln!( - "weed-volume-rs: native mode bootstrap active; delegating to Go backend while Rust handlers are implemented" + "weed-volume-rs: native mode bootstrap active; serving Rust /status and /healthz, delegating remaining handlers to Go backend" ); - run_proxy_mode(forwarded) + run_supervised_mode(forwarded, true) } fn spawn_backend(weed_binary: &PathBuf, backend_args: &[String]) -> Result { @@ -214,7 +231,7 @@ fn terminate_child(child: &mut Child) { let _ = child.kill(); } -fn start_proxy_listener(spec: ProxySpec) -> thread::JoinHandle<()> { +fn start_proxy_listener(spec: ProxySpec, enable_native_http: bool) -> thread::JoinHandle<()> { thread::spawn(move || { let listener = match TcpListener::bind(&spec.frontend_addr) { Ok(v) => v, @@ -240,7 +257,22 @@ fn start_proxy_listener(spec: ProxySpec) -> thread::JoinHandle<()> { match listener.accept() { Ok((stream, _)) => { let backend_addr = spec.backend_addr.clone(); + let native_http_enabled = enable_native_http; thread::spawn(move || { + let mut stream = stream; + if native_http_enabled { + match try_handle_native_admin_http(&mut stream) { + Ok(true) => return, + Ok(false) => {} + Err(err) => { + eprintln!( + "weed-volume-rs: native admin HTTP handler failed, falling back to backend proxy: {}", + err + ); + } + } + } + if let Err(err) = proxy_connection(stream, &backend_addr) { eprintln!( "weed-volume-rs: proxy connection to {} failed: {}", @@ -290,20 +322,197 @@ fn build_listener_specs(frontend: &FrontendPorts, backend: &BackendPorts) -> Vec specs.push(ProxySpec { frontend_addr: format!("{}:{}", frontend.bind_ip, frontend.http_port), backend_addr: format!("{}:{}", backend.bind_ip, backend.http_port), + role: ListenerRole::HttpAdmin, }); specs.push(ProxySpec { frontend_addr: format!("{}:{}", frontend.bind_ip, frontend.grpc_port), backend_addr: format!("{}:{}", backend.bind_ip, backend.grpc_port), + role: ListenerRole::Grpc, }); if frontend.public_port != frontend.http_port { specs.push(ProxySpec { frontend_addr: format!("{}:{}", frontend.bind_ip, frontend.public_port), backend_addr: format!("{}:{}", backend.bind_ip, backend.public_port), + role: ListenerRole::HttpPublic, }); } specs } +fn try_handle_native_admin_http(stream: &mut TcpStream) -> io::Result { + let parsed = match peek_http_request_headers(stream)? { + Some(v) => v, + None => return Ok(false), + }; + + let is_control = parsed.path == "/status" || parsed.path == "/healthz"; + if !is_control { + return Ok(false); + } + + consume_bytes(stream, parsed.header_len)?; + + if parsed.path == "/status" { + write_native_status_response(stream, &parsed.method, parsed.request_id.as_deref())?; + } else { + write_native_healthz_response(stream, &parsed.method, parsed.request_id.as_deref())?; + } + + let _ = stream.shutdown(Shutdown::Both); + Ok(true) +} + +struct ParsedHttpRequest { + method: String, + path: String, + request_id: Option, + header_len: usize, +} + +fn peek_http_request_headers(stream: &TcpStream) -> io::Result> { + const MAX_HEADER_BYTES: usize = 8192; + const MAX_PEEK_ATTEMPTS: usize = 25; + + let mut buf = [0u8; MAX_HEADER_BYTES]; + for _ in 0..MAX_PEEK_ATTEMPTS { + let n = stream.peek(&mut buf)?; + if n == 0 { + return Ok(None); + } + let data = &buf[..n]; + if let Some(end) = find_header_terminator(data) { + return Ok(parse_http_request_headers(data, end + 4)); + } + if n == MAX_HEADER_BYTES { + return Ok(None); + } + thread::sleep(Duration::from_millis(4)); + } + Ok(None) +} + +fn find_header_terminator(data: &[u8]) -> Option { + if data.len() < 4 { + return None; + } + for i in 0..=(data.len() - 4) { + if data[i..i + 4] == *b"\r\n\r\n" { + return Some(i); + } + } + None +} + +fn parse_http_request_headers(data: &[u8], header_len: usize) -> Option { + let header_text = String::from_utf8_lossy(&data[..header_len]); + let mut lines = header_text.split("\r\n"); + let request_line = lines.next()?; + let mut request_parts = request_line.split_whitespace(); + let method = request_parts.next()?.to_string(); + let raw_path = request_parts.next()?.to_string(); + + if method != "GET" && method != "HEAD" { + return None; + } + let path = raw_path + .split_once('?') + .map(|(p, _)| p.to_string()) + .unwrap_or(raw_path); + + let mut request_id = None; + for line in lines { + if line.is_empty() { + break; + } + if let Some((name, value)) = line.split_once(':') { + if name.eq_ignore_ascii_case("x-amz-request-id") { + request_id = Some(value.trim().to_string()); + } + } + } + + Some(ParsedHttpRequest { + method, + path, + request_id, + header_len, + }) +} + +fn consume_bytes(stream: &mut TcpStream, mut remaining: usize) -> io::Result<()> { + let mut discard = [0u8; 1024]; + while remaining > 0 { + let to_read = remaining.min(discard.len()); + let n = stream.read(&mut discard[..to_read])?; + if n == 0 { + break; + } + remaining -= n; + } + Ok(()) +} + +fn write_native_status_response( + stream: &mut TcpStream, + method: &str, + request_id: Option<&str>, +) -> io::Result<()> { + let body = br#"{"Version":"rust-native-bootstrap","DiskStatuses":[],"Volumes":[]}"#; + write_native_http_response( + stream, + "200 OK", + "application/json", + body, + method == "HEAD", + request_id, + ) +} + +fn write_native_healthz_response( + stream: &mut TcpStream, + method: &str, + request_id: Option<&str>, +) -> io::Result<()> { + let body = b"ok\n"; + write_native_http_response( + stream, + "200 OK", + "text/plain; charset=utf-8", + body, + method == "HEAD", + request_id, + ) +} + +fn write_native_http_response( + stream: &mut TcpStream, + status: &str, + content_type: &str, + body: &[u8], + omit_body: bool, + request_id: Option<&str>, +) -> io::Result<()> { + let mut response = format!( + "HTTP/1.1 {}\r\nServer: SeaweedFS Volume (rust-native-bootstrap)\r\nConnection: close\r\nContent-Type: {}\r\nContent-Length: {}\r\n", + status, + content_type, + body.len() + ); + if let Some(request_id_value) = request_id { + response.push_str("x-amz-request-id: "); + response.push_str(request_id_value); + response.push_str("\r\n"); + } + response.push_str("\r\n"); + + stream.write_all(response.as_bytes())?; + if !omit_body { + stream.write_all(body)?; + } + stream.flush()?; + Ok(()) +} + fn parse_frontend_ports(args: &[String]) -> Result { let bind_ip = extract_flag(args, "-ip").unwrap_or_else(|| "127.0.0.1".to_string()); let http_port = parse_port(