Browse Source

feat(rust/volume_server): add proxy supervision mode for integration parity

codex-rust-volume-server-bootstrap
Chris Lu 4 weeks ago
parent
commit
a7f50d23b5
  1. 380
      rust/volume_server/src/main.rs

380
rust/volume_server/src/main.rs

@ -1,13 +1,58 @@
use std::env;
use std::ffi::OsString;
use std::io;
use std::net::{Shutdown, TcpListener, TcpStream};
use std::path::PathBuf;
use std::process::{Command, ExitCode};
use std::process::{Child, Command, ExitCode, ExitStatus};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
#[cfg(unix)]
use std::os::unix::process::CommandExt;
static TERMINATE: AtomicBool = AtomicBool::new(false);
#[cfg(unix)]
type SigHandler = extern "C" fn(i32);
#[cfg(unix)]
const SIGINT: i32 = 2;
#[cfg(unix)]
const SIGTERM: i32 = 15;
#[cfg(unix)]
extern "C" {
fn signal(sig: i32, handler: SigHandler) -> SigHandler;
}
#[cfg(unix)]
extern "C" fn handle_termination_signal(_sig: i32) {
TERMINATE.store(true, Ordering::SeqCst);
}
#[derive(Clone)]
struct FrontendPorts {
bind_ip: String,
http_port: u16,
grpc_port: u16,
public_port: u16,
}
#[derive(Clone)]
struct BackendPorts {
bind_ip: String,
http_port: u16,
grpc_port: u16,
public_port: u16,
}
#[derive(Clone)]
struct ProxySpec {
frontend_addr: String,
backend_addr: String,
}
fn main() -> ExitCode {
match run() {
Ok(()) => ExitCode::SUCCESS,
@ -30,20 +75,37 @@ fn run() -> Result<(), String> {
return Ok(());
}
let mut forwarded = Vec::<String>::new();
let has_volume_subcommand = args.iter().any(|a| a == "volume");
if has_volume_subcommand {
forwarded.extend(args);
} else {
forwarded.push("volume".to_string());
forwarded.extend(args);
let forwarded = normalize_volume_args(args);
let mode = env::var("VOLUME_SERVER_RUST_MODE")
.unwrap_or_else(|_| "exec".to_string())
.to_lowercase();
match mode.as_str() {
"exec" => run_exec_mode(&forwarded),
"proxy" => run_proxy_mode(&forwarded),
other => Err(format!(
"unsupported VOLUME_SERVER_RUST_MODE {:?} (supported: exec, proxy)",
other
)),
}
}
fn normalize_volume_args(args: Vec<String>) -> Vec<String> {
if args.iter().any(|a| a == "volume") {
return args;
}
let mut forwarded = Vec::with_capacity(args.len() + 1);
forwarded.push("volume".to_string());
forwarded.extend(args);
forwarded
}
fn run_exec_mode(forwarded: &[String]) -> Result<(), String> {
let weed_binary = resolve_weed_binary()?;
#[cfg(unix)]
{
let exec_err = Command::new(&weed_binary).args(&forwarded).exec();
let exec_err = Command::new(&weed_binary).args(forwarded).exec();
return Err(format!(
"exec {} failed: {}",
weed_binary.display(),
@ -54,7 +116,7 @@ fn run() -> Result<(), String> {
#[cfg(not(unix))]
{
let status = Command::new(&weed_binary)
.args(&forwarded)
.args(forwarded)
.status()
.map_err(|e| format!("spawn {} failed: {}", weed_binary.display(), e))?;
if status.success() {
@ -68,15 +130,300 @@ fn run() -> Result<(), String> {
}
}
fn run_proxy_mode(forwarded: &[String]) -> Result<(), String> {
install_signal_handlers();
let weed_binary = resolve_weed_binary()?;
let frontend = parse_frontend_ports(forwarded)?;
let backend_bind_ip = "127.0.0.1";
let backend =
allocate_backend_ports(backend_bind_ip, frontend.public_port == frontend.http_port)?;
let mut backend_args = replace_flag_value(forwarded, "-ip", &backend.bind_ip);
backend_args = replace_flag_value(&backend_args, "-port", &backend.http_port.to_string());
backend_args = replace_flag_value(&backend_args, "-port.grpc", &backend.grpc_port.to_string());
backend_args = replace_flag_value(
&backend_args,
"-port.public",
&backend.public_port.to_string(),
);
let mut child = spawn_backend(&weed_binary, &backend_args)?;
let mut handles = Vec::new();
for spec in build_listener_specs(&frontend, &backend) {
handles.push(start_proxy_listener(spec));
}
let mut terminated_by_signal = false;
let child_status: ExitStatus = loop {
if TERMINATE.load(Ordering::SeqCst) {
terminated_by_signal = true;
terminate_child(&mut child);
break wait_child(&mut child)?;
}
match child.try_wait() {
Ok(Some(status)) => break status,
Ok(None) => thread::sleep(Duration::from_millis(100)),
Err(err) => return Err(format!("failed to poll backend process: {}", err)),
}
};
TERMINATE.store(true, Ordering::SeqCst);
for handle in handles {
let _ = handle.join();
}
if terminated_by_signal {
return Ok(());
}
if child_status.success() {
Ok(())
} else {
Err(format!(
"backend volume process exited with status {}",
child_status
))
}
}
fn spawn_backend(weed_binary: &PathBuf, backend_args: &[String]) -> Result<Child, String> {
Command::new(weed_binary)
.args(backend_args)
.spawn()
.map_err(|e| format!("spawn backend {} failed: {}", weed_binary.display(), e))
}
fn wait_child(child: &mut Child) -> Result<ExitStatus, String> {
child
.wait()
.map_err(|e| format!("wait backend process failed: {}", e))
}
fn terminate_child(child: &mut Child) {
let _ = child.kill();
}
fn start_proxy_listener(spec: ProxySpec) -> thread::JoinHandle<()> {
thread::spawn(move || {
let listener = match TcpListener::bind(&spec.frontend_addr) {
Ok(v) => v,
Err(err) => {
eprintln!(
"weed-volume-rs: failed to bind proxy listener {}: {}",
spec.frontend_addr, err
);
TERMINATE.store(true, Ordering::SeqCst);
return;
}
};
if let Err(err) = listener.set_nonblocking(true) {
eprintln!(
"weed-volume-rs: failed to set nonblocking listener {}: {}",
spec.frontend_addr, err
);
TERMINATE.store(true, Ordering::SeqCst);
return;
}
while !TERMINATE.load(Ordering::SeqCst) {
match listener.accept() {
Ok((stream, _)) => {
let backend_addr = spec.backend_addr.clone();
thread::spawn(move || {
if let Err(err) = proxy_connection(stream, &backend_addr) {
eprintln!(
"weed-volume-rs: proxy connection to {} failed: {}",
backend_addr, err
);
}
});
}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(20));
}
Err(err) => {
eprintln!(
"weed-volume-rs: accept failed on {}: {}",
spec.frontend_addr, err
);
thread::sleep(Duration::from_millis(100));
}
}
}
})
}
fn proxy_connection(mut downstream: TcpStream, backend_addr: &str) -> io::Result<()> {
let mut upstream = TcpStream::connect(backend_addr)?;
let _ = downstream.set_nodelay(true);
let _ = upstream.set_nodelay(true);
let mut downstream_read = downstream.try_clone()?;
let mut upstream_write = upstream.try_clone()?;
let upload = thread::spawn(move || {
let _ = io::copy(&mut downstream_read, &mut upstream_write);
let _ = upstream_write.shutdown(Shutdown::Write);
});
let _ = io::copy(&mut upstream, &mut downstream);
let _ = downstream.shutdown(Shutdown::Write);
let _ = upstream.shutdown(Shutdown::Read);
let _ = upload.join();
Ok(())
}
fn build_listener_specs(frontend: &FrontendPorts, backend: &BackendPorts) -> Vec<ProxySpec> {
let mut specs = Vec::new();
specs.push(ProxySpec {
frontend_addr: format!("{}:{}", frontend.bind_ip, frontend.http_port),
backend_addr: format!("{}:{}", backend.bind_ip, backend.http_port),
});
specs.push(ProxySpec {
frontend_addr: format!("{}:{}", frontend.bind_ip, frontend.grpc_port),
backend_addr: format!("{}:{}", backend.bind_ip, backend.grpc_port),
});
if frontend.public_port != frontend.http_port {
specs.push(ProxySpec {
frontend_addr: format!("{}:{}", frontend.bind_ip, frontend.public_port),
backend_addr: format!("{}:{}", backend.bind_ip, backend.public_port),
});
}
specs
}
fn parse_frontend_ports(args: &[String]) -> Result<FrontendPorts, String> {
let bind_ip = extract_flag(args, "-ip").unwrap_or_else(|| "127.0.0.1".to_string());
let http_port = parse_port(
&extract_flag(args, "-port").ok_or_else(|| "missing -port value".to_string())?,
"-port",
)?;
let grpc_port = parse_port(
&extract_flag(args, "-port.grpc").ok_or_else(|| "missing -port.grpc value".to_string())?,
"-port.grpc",
)?;
let public_port = parse_port(
&extract_flag(args, "-port.public").unwrap_or_else(|| http_port.to_string()),
"-port.public",
)?;
Ok(FrontendPorts {
bind_ip,
http_port,
grpc_port,
public_port,
})
}
fn allocate_backend_ports(bind_ip: &str, share_public_http: bool) -> Result<BackendPorts, String> {
let http_port = allocate_free_port(bind_ip)?;
let grpc_port = allocate_free_port(bind_ip)?;
let public_port = if share_public_http {
http_port
} else {
allocate_free_port(bind_ip)?
};
Ok(BackendPorts {
bind_ip: bind_ip.to_string(),
http_port,
grpc_port,
public_port,
})
}
fn allocate_free_port(bind_ip: &str) -> Result<u16, String> {
let listener = TcpListener::bind((bind_ip, 0))
.or_else(|_| TcpListener::bind(("127.0.0.1", 0)))
.map_err(|e| format!("allocate free port failed: {}", e))?;
let port = listener
.local_addr()
.map_err(|e| format!("read allocated port failed: {}", e))?
.port();
drop(listener);
Ok(port)
}
fn extract_flag(args: &[String], key: &str) -> Option<String> {
let with_equals = format!("{}=", key);
let mut i = 0;
while i < args.len() {
let arg = &args[i];
if let Some(v) = arg.strip_prefix(&with_equals) {
return Some(v.to_string());
}
if arg == key {
if i + 1 < args.len() {
return Some(args[i + 1].clone());
}
return None;
}
i += 1;
}
None
}
fn replace_flag_value(args: &[String], key: &str, value: &str) -> Vec<String> {
let with_equals = format!("{}=", key);
let mut out = Vec::with_capacity(args.len() + 2);
let mut i = 0;
let mut replaced = false;
while i < args.len() {
let arg = &args[i];
if arg.starts_with(&with_equals) {
out.push(format!("{}={}", key, value));
replaced = true;
i += 1;
continue;
}
if arg == key {
out.push(arg.clone());
out.push(value.to_string());
replaced = true;
i += 2;
continue;
}
out.push(arg.clone());
i += 1;
}
if !replaced {
out.push(format!("{}={}", key, value));
}
out
}
fn parse_port(value: &str, flag: &str) -> Result<u16, String> {
value
.parse::<u16>()
.map_err(|e| format!("invalid {} value {:?}: {}", flag, value, e))
}
fn install_signal_handlers() {
#[cfg(unix)]
unsafe {
let _ = signal(SIGINT, handle_termination_signal);
let _ = signal(SIGTERM, handle_termination_signal);
}
}
fn print_help() {
println!("weed-volume-rs");
println!();
println!("Rust compatibility launcher for SeaweedFS volume server.");
println!("It forwards all volume-server flags to the Go weed binary.");
println!("Modes:");
println!(" - exec (default): exec Go weed volume process directly");
println!(" - proxy: run Rust TCP proxy front-end and supervise Go weed backend");
println!();
println!("Environment:");
println!(" VOLUME_SERVER_RUST_MODE=exec|proxy");
println!(" WEED_BINARY=/path/to/weed");
println!();
println!("Examples:");
println!(" weed-volume-rs -ip=127.0.0.1 -port=8080 -master=127.0.0.1:9333 ...");
println!(" weed-volume-rs volume -ip=127.0.0.1 -port=8080 -master=127.0.0.1:9333 ...");
println!(" weed-volume-rs -config_dir=/tmp/cfg volume -ip=127.0.0.1 -port=8080 ...");
println!(" VOLUME_SERVER_RUST_MODE=proxy weed-volume-rs -config_dir=/tmp/cfg volume -ip=127.0.0.1 -port=8080 ...");
}
fn resolve_weed_binary() -> Result<PathBuf, String> {
@ -170,8 +517,3 @@ fn is_executable_file(path: &PathBuf) -> bool {
true
}
}
#[allow(dead_code)]
fn _collect_os_args() -> Vec<OsString> {
env::args_os().collect()
}
Loading…
Cancel
Save