|
|
|
@ -1,5 +1,5 @@ |
|
|
|
use std::env;
|
|
|
|
use std::io;
|
|
|
|
use std::io::{self, Read, Write};
|
|
|
|
use std::net::{Shutdown, TcpListener, TcpStream};
|
|
|
|
use std::path::PathBuf;
|
|
|
|
use std::process::{Child, Command, ExitCode, ExitStatus};
|
|
|
|
@ -51,6 +51,14 @@ struct BackendPorts { |
|
|
|
struct ProxySpec {
|
|
|
|
frontend_addr: String,
|
|
|
|
backend_addr: String,
|
|
|
|
role: ListenerRole,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone, Copy, PartialEq, Eq)]
|
|
|
|
enum ListenerRole {
|
|
|
|
HttpAdmin,
|
|
|
|
Grpc,
|
|
|
|
HttpPublic,
|
|
|
|
}
|
|
|
|
|
|
|
|
fn main() -> ExitCode {
|
|
|
|
@ -132,6 +140,13 @@ fn run_exec_mode(forwarded: &[String]) -> Result<(), String> { |
|
|
|
}
|
|
|
|
|
|
|
|
fn run_proxy_mode(forwarded: &[String]) -> Result<(), String> {
|
|
|
|
run_supervised_mode(forwarded, false)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn run_supervised_mode(
|
|
|
|
forwarded: &[String],
|
|
|
|
enable_native_http_admin_control: bool,
|
|
|
|
) -> Result<(), String> {
|
|
|
|
install_signal_handlers();
|
|
|
|
|
|
|
|
let weed_binary = resolve_weed_binary()?;
|
|
|
|
@ -154,7 +169,9 @@ fn run_proxy_mode(forwarded: &[String]) -> Result<(), String> { |
|
|
|
|
|
|
|
let mut handles = Vec::new();
|
|
|
|
for spec in build_listener_specs(&frontend, &backend) {
|
|
|
|
handles.push(start_proxy_listener(spec));
|
|
|
|
let enable_native_http =
|
|
|
|
enable_native_http_admin_control && spec.role == ListenerRole::HttpAdmin;
|
|
|
|
handles.push(start_proxy_listener(spec, enable_native_http));
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut terminated_by_signal = false;
|
|
|
|
@ -192,9 +209,9 @@ fn run_proxy_mode(forwarded: &[String]) -> Result<(), String> { |
|
|
|
|
|
|
|
fn run_native_mode(forwarded: &[String]) -> Result<(), String> {
|
|
|
|
eprintln!(
|
|
|
|
"weed-volume-rs: native mode bootstrap active; delegating to Go backend while Rust handlers are implemented"
|
|
|
|
"weed-volume-rs: native mode bootstrap active; serving Rust /status and /healthz, delegating remaining handlers to Go backend"
|
|
|
|
);
|
|
|
|
run_proxy_mode(forwarded)
|
|
|
|
run_supervised_mode(forwarded, true)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn spawn_backend(weed_binary: &PathBuf, backend_args: &[String]) -> Result<Child, String> {
|
|
|
|
@ -214,7 +231,7 @@ fn terminate_child(child: &mut Child) { |
|
|
|
let _ = child.kill();
|
|
|
|
}
|
|
|
|
|
|
|
|
fn start_proxy_listener(spec: ProxySpec) -> thread::JoinHandle<()> {
|
|
|
|
fn start_proxy_listener(spec: ProxySpec, enable_native_http: bool) -> thread::JoinHandle<()> {
|
|
|
|
thread::spawn(move || {
|
|
|
|
let listener = match TcpListener::bind(&spec.frontend_addr) {
|
|
|
|
Ok(v) => v,
|
|
|
|
@ -240,7 +257,22 @@ fn start_proxy_listener(spec: ProxySpec) -> thread::JoinHandle<()> { |
|
|
|
match listener.accept() {
|
|
|
|
Ok((stream, _)) => {
|
|
|
|
let backend_addr = spec.backend_addr.clone();
|
|
|
|
let native_http_enabled = enable_native_http;
|
|
|
|
thread::spawn(move || {
|
|
|
|
let mut stream = stream;
|
|
|
|
if native_http_enabled {
|
|
|
|
match try_handle_native_admin_http(&mut stream) {
|
|
|
|
Ok(true) => return,
|
|
|
|
Ok(false) => {}
|
|
|
|
Err(err) => {
|
|
|
|
eprintln!(
|
|
|
|
"weed-volume-rs: native admin HTTP handler failed, falling back to backend proxy: {}",
|
|
|
|
err
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Err(err) = proxy_connection(stream, &backend_addr) {
|
|
|
|
eprintln!(
|
|
|
|
"weed-volume-rs: proxy connection to {} failed: {}",
|
|
|
|
@ -290,20 +322,197 @@ fn build_listener_specs(frontend: &FrontendPorts, backend: &BackendPorts) -> Vec |
|
|
|
specs.push(ProxySpec {
|
|
|
|
frontend_addr: format!("{}:{}", frontend.bind_ip, frontend.http_port),
|
|
|
|
backend_addr: format!("{}:{}", backend.bind_ip, backend.http_port),
|
|
|
|
role: ListenerRole::HttpAdmin,
|
|
|
|
});
|
|
|
|
specs.push(ProxySpec {
|
|
|
|
frontend_addr: format!("{}:{}", frontend.bind_ip, frontend.grpc_port),
|
|
|
|
backend_addr: format!("{}:{}", backend.bind_ip, backend.grpc_port),
|
|
|
|
role: ListenerRole::Grpc,
|
|
|
|
});
|
|
|
|
if frontend.public_port != frontend.http_port {
|
|
|
|
specs.push(ProxySpec {
|
|
|
|
frontend_addr: format!("{}:{}", frontend.bind_ip, frontend.public_port),
|
|
|
|
backend_addr: format!("{}:{}", backend.bind_ip, backend.public_port),
|
|
|
|
role: ListenerRole::HttpPublic,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
specs
|
|
|
|
}
|
|
|
|
|
|
|
|
fn try_handle_native_admin_http(stream: &mut TcpStream) -> io::Result<bool> {
|
|
|
|
let parsed = match peek_http_request_headers(stream)? {
|
|
|
|
Some(v) => v,
|
|
|
|
None => return Ok(false),
|
|
|
|
};
|
|
|
|
|
|
|
|
let is_control = parsed.path == "/status" || parsed.path == "/healthz";
|
|
|
|
if !is_control {
|
|
|
|
return Ok(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
consume_bytes(stream, parsed.header_len)?;
|
|
|
|
|
|
|
|
if parsed.path == "/status" {
|
|
|
|
write_native_status_response(stream, &parsed.method, parsed.request_id.as_deref())?;
|
|
|
|
} else {
|
|
|
|
write_native_healthz_response(stream, &parsed.method, parsed.request_id.as_deref())?;
|
|
|
|
}
|
|
|
|
|
|
|
|
let _ = stream.shutdown(Shutdown::Both);
|
|
|
|
Ok(true)
|
|
|
|
}
|
|
|
|
|
|
|
|
struct ParsedHttpRequest {
|
|
|
|
method: String,
|
|
|
|
path: String,
|
|
|
|
request_id: Option<String>,
|
|
|
|
header_len: usize,
|
|
|
|
}
|
|
|
|
|
|
|
|
fn peek_http_request_headers(stream: &TcpStream) -> io::Result<Option<ParsedHttpRequest>> {
|
|
|
|
const MAX_HEADER_BYTES: usize = 8192;
|
|
|
|
const MAX_PEEK_ATTEMPTS: usize = 25;
|
|
|
|
|
|
|
|
let mut buf = [0u8; MAX_HEADER_BYTES];
|
|
|
|
for _ in 0..MAX_PEEK_ATTEMPTS {
|
|
|
|
let n = stream.peek(&mut buf)?;
|
|
|
|
if n == 0 {
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
let data = &buf[..n];
|
|
|
|
if let Some(end) = find_header_terminator(data) {
|
|
|
|
return Ok(parse_http_request_headers(data, end + 4));
|
|
|
|
}
|
|
|
|
if n == MAX_HEADER_BYTES {
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
thread::sleep(Duration::from_millis(4));
|
|
|
|
}
|
|
|
|
Ok(None)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn find_header_terminator(data: &[u8]) -> Option<usize> {
|
|
|
|
if data.len() < 4 {
|
|
|
|
return None;
|
|
|
|
}
|
|
|
|
for i in 0..=(data.len() - 4) {
|
|
|
|
if data[i..i + 4] == *b"\r\n\r\n" {
|
|
|
|
return Some(i);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
None
|
|
|
|
}
|
|
|
|
|
|
|
|
fn parse_http_request_headers(data: &[u8], header_len: usize) -> Option<ParsedHttpRequest> {
|
|
|
|
let header_text = String::from_utf8_lossy(&data[..header_len]);
|
|
|
|
let mut lines = header_text.split("\r\n");
|
|
|
|
let request_line = lines.next()?;
|
|
|
|
let mut request_parts = request_line.split_whitespace();
|
|
|
|
let method = request_parts.next()?.to_string();
|
|
|
|
let raw_path = request_parts.next()?.to_string();
|
|
|
|
|
|
|
|
if method != "GET" && method != "HEAD" {
|
|
|
|
return None;
|
|
|
|
}
|
|
|
|
let path = raw_path
|
|
|
|
.split_once('?')
|
|
|
|
.map(|(p, _)| p.to_string())
|
|
|
|
.unwrap_or(raw_path);
|
|
|
|
|
|
|
|
let mut request_id = None;
|
|
|
|
for line in lines {
|
|
|
|
if line.is_empty() {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
if let Some((name, value)) = line.split_once(':') {
|
|
|
|
if name.eq_ignore_ascii_case("x-amz-request-id") {
|
|
|
|
request_id = Some(value.trim().to_string());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Some(ParsedHttpRequest {
|
|
|
|
method,
|
|
|
|
path,
|
|
|
|
request_id,
|
|
|
|
header_len,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
fn consume_bytes(stream: &mut TcpStream, mut remaining: usize) -> io::Result<()> {
|
|
|
|
let mut discard = [0u8; 1024];
|
|
|
|
while remaining > 0 {
|
|
|
|
let to_read = remaining.min(discard.len());
|
|
|
|
let n = stream.read(&mut discard[..to_read])?;
|
|
|
|
if n == 0 {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
remaining -= n;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn write_native_status_response(
|
|
|
|
stream: &mut TcpStream,
|
|
|
|
method: &str,
|
|
|
|
request_id: Option<&str>,
|
|
|
|
) -> io::Result<()> {
|
|
|
|
let body = br#"{"Version":"rust-native-bootstrap","DiskStatuses":[],"Volumes":[]}"#;
|
|
|
|
write_native_http_response(
|
|
|
|
stream,
|
|
|
|
"200 OK",
|
|
|
|
"application/json",
|
|
|
|
body,
|
|
|
|
method == "HEAD",
|
|
|
|
request_id,
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn write_native_healthz_response(
|
|
|
|
stream: &mut TcpStream,
|
|
|
|
method: &str,
|
|
|
|
request_id: Option<&str>,
|
|
|
|
) -> io::Result<()> {
|
|
|
|
let body = b"ok\n";
|
|
|
|
write_native_http_response(
|
|
|
|
stream,
|
|
|
|
"200 OK",
|
|
|
|
"text/plain; charset=utf-8",
|
|
|
|
body,
|
|
|
|
method == "HEAD",
|
|
|
|
request_id,
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn write_native_http_response(
|
|
|
|
stream: &mut TcpStream,
|
|
|
|
status: &str,
|
|
|
|
content_type: &str,
|
|
|
|
body: &[u8],
|
|
|
|
omit_body: bool,
|
|
|
|
request_id: Option<&str>,
|
|
|
|
) -> io::Result<()> {
|
|
|
|
let mut response = format!(
|
|
|
|
"HTTP/1.1 {}\r\nServer: SeaweedFS Volume (rust-native-bootstrap)\r\nConnection: close\r\nContent-Type: {}\r\nContent-Length: {}\r\n",
|
|
|
|
status,
|
|
|
|
content_type,
|
|
|
|
body.len()
|
|
|
|
);
|
|
|
|
if let Some(request_id_value) = request_id {
|
|
|
|
response.push_str("x-amz-request-id: ");
|
|
|
|
response.push_str(request_id_value);
|
|
|
|
response.push_str("\r\n");
|
|
|
|
}
|
|
|
|
response.push_str("\r\n");
|
|
|
|
|
|
|
|
stream.write_all(response.as_bytes())?;
|
|
|
|
if !omit_body {
|
|
|
|
stream.write_all(body)?;
|
|
|
|
}
|
|
|
|
stream.flush()?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn parse_frontend_ports(args: &[String]) -> Result<FrontendPorts, String> {
|
|
|
|
let bind_ip = extract_flag(args, "-ip").unwrap_or_else(|| "127.0.0.1".to_string());
|
|
|
|
let http_port = parse_port(
|
|
|
|
|