diff --git a/rust/volume_server/src/main.rs b/rust/volume_server/src/main.rs index 39a80d173..ed95976e4 100644 --- a/rust/volume_server/src/main.rs +++ b/rust/volume_server/src/main.rs @@ -294,6 +294,7 @@ fn start_proxy_listener( &mut stream, role, native_http_config.as_deref(), + &backend_addr, ) { Ok(true) => return, Ok(false) => {} @@ -376,6 +377,7 @@ fn try_handle_native_http( stream: &mut TcpStream, role: ListenerRole, config: Option<&NativeHttpConfig>, + backend_http_addr: &str, ) -> io::Result { let parsed = match peek_http_request_headers(stream)? { Some(v) => v, @@ -480,7 +482,12 @@ fn try_handle_native_http( if parsed.path == "/status" { write_native_status_response(stream, &parsed.method, parsed.request_id.as_deref())?; } else { - write_native_healthz_response(stream, &parsed.method, parsed.request_id.as_deref())?; + write_native_healthz_response( + stream, + &parsed.method, + parsed.request_id.as_deref(), + backend_http_addr, + )?; } let _ = stream.shutdown(Shutdown::Both); @@ -537,11 +544,7 @@ fn parse_http_request_headers(data: &[u8], header_len: usize) -> Option bool { ) } +fn normalize_request_path(target: &str) -> String { + let raw = if let Some(rest) = target.strip_prefix("http://") { + rest.split_once('/') + .map(|(_, path)| format!("/{}", path)) + .unwrap_or_else(|| "/".to_string()) + } else if let Some(rest) = target.strip_prefix("https://") { + rest.split_once('/') + .map(|(_, path)| format!("/{}", path)) + .unwrap_or_else(|| "/".to_string()) + } else if target.is_empty() { + "/".to_string() + } else { + target.to_string() + }; + raw.split_once('?') + .map(|(p, _)| p.to_string()) + .unwrap_or(raw) +} + fn is_public_read_method(method: &str) -> bool { matches!(method, "GET" | "HEAD" | "OPTIONS") } @@ -615,20 +637,60 @@ fn write_native_status_response( fn write_native_healthz_response( stream: &mut TcpStream, - method: &str, + _method: &str, request_id: Option<&str>, + backend_http_addr: &str, ) -> io::Result<()> { - let body = b"ok\n"; + let status = match query_backend_healthz_status(backend_http_addr) { + Some(200) => "200 OK", + Some(503) => "503 Service Unavailable", + Some(code) if (100..=599).contains(&code) => { + return write_native_http_response( + stream, + &format!("{} Unknown", code), + "text/plain; charset=utf-8", + b"", + true, + request_id, + ); + } + _ => "503 Service Unavailable", + }; write_native_http_response( stream, - "200 OK", + status, "text/plain; charset=utf-8", - body, - method == "HEAD", + b"", + true, request_id, ) } +fn query_backend_healthz_status(backend_http_addr: &str) -> Option { + let mut upstream = TcpStream::connect(backend_http_addr).ok()?; + let _ = upstream.set_read_timeout(Some(Duration::from_millis(500))); + let _ = upstream.set_write_timeout(Some(Duration::from_millis(500))); + + let request = format!( + "GET /healthz HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n", + backend_http_addr + ); + upstream.write_all(request.as_bytes()).ok()?; + upstream.flush().ok()?; + + let mut buf = [0u8; 1024]; + let n = upstream.read(&mut buf).ok()?; + if n == 0 { + return None; + } + let line = String::from_utf8_lossy(&buf[..n]); + let status_line = line.lines().next()?; + let mut parts = status_line.split_whitespace(); + let _http = parts.next()?; + let code = parts.next()?.parse::().ok()?; + Some(code) +} + fn write_native_options_response( stream: &mut TcpStream, role: ListenerRole,