From 1b5a97660fc1e7e1b114c46b75f88d3fb1f96e12 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Mar 2026 23:51:20 -0800 Subject: [PATCH] JWT auth, chunk manifest expansion, and upload/download throttling - Wire JWT security config from --securityFile TOML into Guard - Fix cookie-based JWT extraction (AT cookie, not jwt cookie) - Set JWT leeway to 0 to properly reject expired tokens - Add file_id extraction and validation for per-file JWT scoping - Implement chunk manifest expansion on read (X-File-Store: chunked) - Implement chunk manifest delete (delete children, return manifest size) - Add upload throttling with inflight byte tracking and RAII guard - Add download throttling with TrackedBody that releases on Drop - Pass --securityFile and throttling flags from test framework HTTP tests: 53/55 (was 40/55). Remaining: ImageResize, CONNECT method. --- seaweed-volume/Cargo.lock | 1 + seaweed-volume/Cargo.toml | 1 + seaweed-volume/src/config.rs | 87 +++++ seaweed-volume/src/main.rs | 18 +- seaweed-volume/src/security.rs | 3 + seaweed-volume/src/server/handlers.rs | 363 ++++++++++++++++++- seaweed-volume/src/server/volume_server.rs | 13 +- test/volume_server/framework/cluster_rust.go | 9 + 8 files changed, 471 insertions(+), 24 deletions(-) diff --git a/seaweed-volume/Cargo.lock b/seaweed-volume/Cargo.lock index 8474e3505..0426bf47b 100644 --- a/seaweed-volume/Cargo.lock +++ b/seaweed-volume/Cargo.lock @@ -2473,6 +2473,7 @@ dependencies = [ "flate2", "futures", "hex", + "http-body", "hyper", "hyper-util", "jsonwebtoken", diff --git a/seaweed-volume/Cargo.toml b/seaweed-volume/Cargo.toml index 30a63c902..a61f5c639 100644 --- a/seaweed-volume/Cargo.toml +++ b/seaweed-volume/Cargo.toml @@ -16,6 +16,7 @@ prost-types = "0.13" # HTTP server axum = { version = "0.7", features = ["multipart"] } +http-body = "1" hyper = { version = "1", features = ["full"] } hyper-util = { version = "0.1", features = ["tokio"] } tower = "0.4" diff --git a/seaweed-volume/src/config.rs b/seaweed-volume/src/config.rs index 31162086a..58dbc797f 100644 --- a/seaweed-volume/src/config.rs +++ b/seaweed-volume/src/config.rs @@ -177,6 +177,10 @@ pub struct Cli { /// HTTP port for debugging. #[arg(long = "debug.port", default_value_t = 6060)] pub debug_port: u16, + + /// Path to security.toml configuration file for JWT signing keys. + #[arg(long = "securityFile", default_value = "")] + pub security_file: String, } /// Resolved configuration after applying defaults and validation. @@ -220,6 +224,10 @@ pub struct VolumeServerConfig { pub metrics_ip: String, pub debug: bool, pub debug_port: u16, + pub jwt_signing_key: Vec, + pub jwt_signing_expires_seconds: i64, + pub jwt_read_signing_key: Vec, + pub jwt_read_signing_expires_seconds: i64, } pub use crate::storage::needle_map::NeedleMapKind; @@ -536,6 +544,10 @@ fn resolve_config(cli: Cli) -> VolumeServerConfig { let inflight_upload_data_timeout = parse_duration(&cli.inflight_upload_data_timeout); let inflight_download_data_timeout = parse_duration(&cli.inflight_download_data_timeout); + // Parse security config from TOML file + let (jwt_signing_key, jwt_signing_expires, jwt_read_signing_key, jwt_read_signing_expires) = + parse_security_config(&cli.security_file); + VolumeServerConfig { port: cli.port, grpc_port, @@ -575,7 +587,82 @@ fn resolve_config(cli: Cli) -> VolumeServerConfig { metrics_ip, debug: cli.debug, debug_port: cli.debug_port, + jwt_signing_key, + jwt_signing_expires_seconds: jwt_signing_expires, + jwt_read_signing_key: jwt_read_signing_key, + jwt_read_signing_expires_seconds: jwt_read_signing_expires, + } +} + +/// Parse a security.toml file to extract JWT signing keys. +/// Format: +/// ```toml +/// [jwt.signing] +/// key = "secret" +/// expires_after_seconds = 60 +/// +/// [jwt.signing.read] +/// key = "read-secret" +/// expires_after_seconds = 60 +/// ``` +fn parse_security_config(path: &str) -> (Vec, i64, Vec, i64) { + if path.is_empty() { + return (vec![], 0, vec![], 0); } + let content = match std::fs::read_to_string(path) { + Ok(c) => c, + Err(_) => return (vec![], 0, vec![], 0), + }; + + let mut signing_key = Vec::new(); + let mut signing_expires: i64 = 0; + let mut read_key = Vec::new(); + let mut read_expires: i64 = 0; + + // Simple TOML parser for the specific security config format + let mut in_jwt_signing = false; + let mut in_jwt_signing_read = false; + for line in content.lines() { + let trimmed = line.trim(); + if trimmed.starts_with('#') || trimmed.is_empty() { + continue; + } + if trimmed == "[jwt.signing.read]" { + in_jwt_signing = false; + in_jwt_signing_read = true; + continue; + } + if trimmed == "[jwt.signing]" { + in_jwt_signing = true; + in_jwt_signing_read = false; + continue; + } + if trimmed.starts_with('[') { + in_jwt_signing = false; + in_jwt_signing_read = false; + continue; + } + + if let Some((key, value)) = trimmed.split_once('=') { + let key = key.trim(); + let value = value.trim().trim_matches('"'); + if in_jwt_signing_read { + match key { + "key" => read_key = value.as_bytes().to_vec(), + "expires_after_seconds" => read_expires = value.parse().unwrap_or(0), + _ => {} + } + } else if in_jwt_signing { + match key { + "key" => signing_key = value.as_bytes().to_vec(), + "expires_after_seconds" => signing_expires = value.parse().unwrap_or(0), + _ => {} + } + } + } + } + + (signing_key, signing_expires, read_key, read_expires) } /// Detect the host's IP address. diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index ce813d9db..0a0fd8aac 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -67,14 +67,12 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box Result<(), Box( token, diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index dd906e30c..689796096 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -5,6 +5,7 @@ //! volume_server_handlers_admin.go. use std::sync::Arc; +use std::sync::atomic::Ordering; use axum::body::Body; use axum::extract::{Query, State}; @@ -18,12 +19,85 @@ use crate::storage::needle::needle::Needle; use crate::storage::types::*; use super::volume_server::VolumeServerState; +// ============================================================================ +// Inflight Throttle Guard +// ============================================================================ + +/// RAII guard that subtracts bytes from an atomic counter and notifies waiters on drop. +struct InflightGuard<'a> { + counter: &'a std::sync::atomic::AtomicI64, + bytes: i64, + notify: &'a tokio::sync::Notify, +} + +impl<'a> Drop for InflightGuard<'a> { + fn drop(&mut self) { + self.counter.fetch_sub(self.bytes, Ordering::Relaxed); + self.notify.notify_waiters(); + } +} + +/// Body wrapper that tracks download inflight bytes and releases them when dropped. +struct TrackedBody { + data: Vec, + state: Arc, + bytes: i64, +} + +impl http_body::Body for TrackedBody { + type Data = bytes::Bytes; + type Error = std::convert::Infallible; + + fn poll_frame( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll, Self::Error>>> { + if self.data.is_empty() { + return std::task::Poll::Ready(None); + } + let data = std::mem::take(&mut self.data); + std::task::Poll::Ready(Some(Ok(http_body::Frame::data(bytes::Bytes::from(data))))) + } +} + +impl Drop for TrackedBody { + fn drop(&mut self) { + self.state.inflight_download_bytes.fetch_sub(self.bytes, Ordering::Relaxed); + self.state.download_notify.notify_waiters(); + } +} + // ============================================================================ // URL Parsing // ============================================================================ /// Parse volume ID and file ID from URL path. /// Supports: "vid,fid", "vid/fid", "vid,fid.ext", "vid/fid/filename.ext" +/// Extract the file_id string (e.g., "3,01637037d6") from a URL path for JWT validation. +fn extract_file_id(path: &str) -> String { + let path = path.trim_start_matches('/'); + // Strip extension and filename after second slash + if let Some(comma) = path.find(',') { + let after_comma = &path[comma + 1..]; + let fid_part = if let Some(slash) = after_comma.find('/') { + &after_comma[..slash] + } else if let Some(dot) = after_comma.rfind('.') { + &after_comma[..dot] + } else { + after_comma + }; + // Strip "_suffix" from fid (Go does this for filenames appended with underscore) + let fid_part = if let Some(underscore) = fid_part.rfind('_') { + &fid_part[..underscore] + } else { + fid_part + }; + format!("{},{}", &path[..comma], fid_part) + } else { + path.to_string() + } +} + fn parse_url_path(path: &str) -> Option<(VolumeId, NeedleId, Cookie)> { let path = path.trim_start_matches('/'); @@ -68,6 +142,8 @@ pub struct ReadQueryParams { pub dl: Option, #[serde(rename = "readDeleted")] pub read_deleted: Option, + /// cm=false disables chunk manifest expansion (returns raw manifest JSON). + pub cm: Option, } // ============================================================================ @@ -117,11 +193,36 @@ async fn get_or_head_handler_inner( }; // JWT check for reads + let file_id = extract_file_id(&path); let token = extract_jwt(&headers, request.uri()); - if let Err(e) = state.guard.check_jwt(token.as_deref(), false) { + if let Err(e) = state.guard.check_jwt_for_file(token.as_deref(), &file_id, false) { return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response(); } + // Download throttling + let download_guard = if state.concurrent_download_limit > 0 { + let timeout = if state.inflight_download_data_timeout.is_zero() { + std::time::Duration::from_secs(2) + } else { + state.inflight_download_data_timeout + }; + let deadline = tokio::time::Instant::now() + timeout; + + loop { + let current = state.inflight_download_bytes.load(Ordering::Relaxed); + if current < state.concurrent_download_limit { + break; + } + if tokio::time::timeout_at(deadline, state.download_notify.notified()).await.is_err() { + return (StatusCode::TOO_MANY_REQUESTS, "download limit exceeded").into_response(); + } + } + // We'll set the actual bytes after reading the needle (once we know the size) + Some(state.clone()) + } else { + None + }; + // Read needle let mut n = Needle { id: needle_id, @@ -154,6 +255,12 @@ async fn get_or_head_handler_inner( return StatusCode::NOT_FOUND.into_response(); } + // Chunk manifest expansion + let bypass_cm = query.cm.as_deref() == Some("false"); + if n.is_chunk_manifest() && !bypass_cm { + return expand_chunk_manifest(&state, &n, &headers, &method); + } + // Build ETag let etag = format!("\"{}\"", n.etag()); @@ -267,6 +374,22 @@ async fn get_or_head_handler_inner( .with_label_values(&["read"]) .observe(start.elapsed().as_secs_f64()); + // If download throttling is active, wrap the body so we track when it's fully sent + if download_guard.is_some() { + let data_len = data.len() as i64; + state.inflight_download_bytes.fetch_add(data_len, Ordering::Relaxed); + let tracked_body = TrackedBody { + data, + state: state.clone(), + bytes: data_len, + }; + let body = Body::new(tracked_body); + let mut resp = Response::new(body); + *resp.status_mut() = StatusCode::OK; + *resp.headers_mut() = response_headers; + return resp; + } + (StatusCode::OK, response_headers, data).into_response() } @@ -413,11 +536,52 @@ pub async fn post_handler( }; // JWT check for writes + let file_id = extract_file_id(&path); let token = extract_jwt(&headers, request.uri()); - if let Err(e) = state.guard.check_jwt(token.as_deref(), true) { + if let Err(e) = state.guard.check_jwt_for_file(token.as_deref(), &file_id, true) { return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response(); } + // Upload throttling: check inflight bytes against limit + let is_replicate = query.split('&').any(|p| p == "type=replicate"); + let content_length = headers.get(header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(0); + + if !is_replicate && state.concurrent_upload_limit > 0 { + // Wait for inflight bytes to drop below limit, or timeout + let timeout = if state.inflight_upload_data_timeout.is_zero() { + std::time::Duration::from_secs(2) + } else { + state.inflight_upload_data_timeout + }; + let deadline = tokio::time::Instant::now() + timeout; + + loop { + let current = state.inflight_upload_bytes.load(Ordering::Relaxed); + if current < state.concurrent_upload_limit { + break; + } + // Wait for notification or timeout + if tokio::time::timeout_at(deadline, state.upload_notify.notified()).await.is_err() { + return (StatusCode::TOO_MANY_REQUESTS, "upload limit exceeded").into_response(); + } + } + state.inflight_upload_bytes.fetch_add(content_length, Ordering::Relaxed); + } + + // RAII guard to release upload throttle on any exit path + let _upload_guard = if !is_replicate && state.concurrent_upload_limit > 0 { + Some(InflightGuard { + counter: &state.inflight_upload_bytes, + bytes: content_length, + notify: &state.upload_notify, + }) + } else { + None + }; + // Check for chunk manifest flag let is_chunk_manifest = query.split('&') .any(|p| p == "cm=true" || p == "cm=1"); @@ -479,22 +643,22 @@ pub async fn post_handler( } let mut store = state.store.write().unwrap(); - match store.write_volume_needle(vid, &mut n) { + let resp = match store.write_volume_needle(vid, &mut n) { Ok((_offset, _size, is_unchanged)) => { if is_unchanged { let etag = format!("\"{}\"", n.etag()); - return (StatusCode::NO_CONTENT, [(header::ETAG, etag)]).into_response(); + (StatusCode::NO_CONTENT, [(header::ETAG, etag)]).into_response() + } else { + let result = UploadResult { + name: String::new(), + size: n.data_size, + etag: n.etag(), + }; + metrics::REQUEST_DURATION + .with_label_values(&["write"]) + .observe(start.elapsed().as_secs_f64()); + (StatusCode::CREATED, axum::Json(result)).into_response() } - - let result = UploadResult { - name: String::new(), - size: n.data_size, - etag: n.etag(), - }; - metrics::REQUEST_DURATION - .with_label_values(&["write"]) - .observe(start.elapsed().as_secs_f64()); - (StatusCode::CREATED, axum::Json(result)).into_response() } Err(crate::storage::volume::VolumeError::NotFound) => { (StatusCode::NOT_FOUND, "volume not found").into_response() @@ -505,7 +669,10 @@ pub async fn post_handler( Err(e) => { (StatusCode::INTERNAL_SERVER_ERROR, format!("write error: {}", e)).into_response() } - } + }; + + // _upload_guard drops here, releasing inflight bytes + resp } // ============================================================================ @@ -533,8 +700,9 @@ pub async fn delete_handler( }; // JWT check for writes (deletes use write key) + let file_id = extract_file_id(&path); let token = extract_jwt(&headers, request.uri()); - if let Err(e) = state.guard.check_jwt(token.as_deref(), true) { + if let Err(e) = state.guard.check_jwt_for_file(token.as_deref(), &file_id, true) { return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response(); } @@ -560,6 +728,63 @@ pub async fn delete_handler( return (StatusCode::BAD_REQUEST, "File Random Cookie does not match.").into_response(); } + // If this is a chunk manifest, delete child chunks first + if n.is_chunk_manifest() { + let manifest_data = if n.is_compressed() { + use flate2::read::GzDecoder; + use std::io::Read as _; + let mut decoder = GzDecoder::new(&n.data[..]); + let mut decompressed = Vec::new(); + if decoder.read_to_end(&mut decompressed).is_ok() { + decompressed + } else { + n.data.clone() + } + } else { + n.data.clone() + }; + + if let Ok(manifest) = serde_json::from_slice::(&manifest_data) { + // Delete all child chunks first + for chunk in &manifest.chunks { + let (chunk_vid, chunk_nid, chunk_cookie) = match parse_url_path(&chunk.fid) { + Some(p) => p, + None => { + return (StatusCode::INTERNAL_SERVER_ERROR, format!("invalid chunk fid: {}", chunk.fid)).into_response(); + } + }; + let mut chunk_needle = Needle { + id: chunk_nid, + cookie: chunk_cookie, + ..Needle::default() + }; + // Read the chunk to validate it exists + { + let store = state.store.read().unwrap(); + if let Err(e) = store.read_volume_needle(chunk_vid, &mut chunk_needle) { + return (StatusCode::INTERNAL_SERVER_ERROR, format!("read chunk {}: {}", chunk.fid, e)).into_response(); + } + } + // Delete the chunk + let mut store = state.store.write().unwrap(); + if let Err(e) = store.delete_volume_needle(chunk_vid, &mut chunk_needle) { + return (StatusCode::INTERNAL_SERVER_ERROR, format!("delete chunk {}: {}", chunk.fid, e)).into_response(); + } + } + // Delete the manifest itself + let mut store = state.store.write().unwrap(); + if let Err(e) = store.delete_volume_needle(vid, &mut n) { + return (StatusCode::INTERNAL_SERVER_ERROR, format!("delete manifest: {}", e)).into_response(); + } + metrics::REQUEST_DURATION + .with_label_values(&["delete"]) + .observe(start.elapsed().as_secs_f64()); + // Return the manifest's declared size (matches Go behavior) + let result = DeleteResult { size: manifest.size as i32 }; + return (StatusCode::ACCEPTED, axum::Json(result)).into_response(); + } + } + let mut store = state.store.write().unwrap(); match store.delete_volume_needle(vid, &mut n) { Ok(size) => { @@ -711,6 +936,110 @@ pub async fn ui_handler( .into_response() } +// ============================================================================ +// Chunk Manifest +// ============================================================================ + +#[derive(Deserialize)] +struct ChunkManifest { + #[serde(default)] + name: String, + #[serde(default)] + mime: String, + #[serde(default)] + size: i64, + #[serde(default)] + chunks: Vec, +} + +#[derive(Deserialize)] +struct ChunkInfo { + fid: String, + offset: i64, + size: i64, +} + +/// Expand a chunk manifest needle: read each chunk and concatenate. +fn expand_chunk_manifest( + state: &Arc, + n: &Needle, + _headers: &HeaderMap, + method: &Method, +) -> Response { + let data = if n.is_compressed() { + use flate2::read::GzDecoder; + use std::io::Read as _; + let mut decoder = GzDecoder::new(&n.data[..]); + let mut decompressed = Vec::new(); + if decoder.read_to_end(&mut decompressed).is_err() { + return (StatusCode::INTERNAL_SERVER_ERROR, "failed to decompress manifest").into_response(); + } + decompressed + } else { + n.data.clone() + }; + + let manifest: ChunkManifest = match serde_json::from_slice(&data) { + Ok(m) => m, + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("invalid chunk manifest: {}", e)).into_response(), + }; + + // Read and concatenate all chunks + let mut result = vec![0u8; manifest.size as usize]; + let store = state.store.read().unwrap(); + for chunk in &manifest.chunks { + let (chunk_vid, chunk_nid, chunk_cookie) = match parse_url_path(&chunk.fid) { + Some(p) => p, + None => return (StatusCode::INTERNAL_SERVER_ERROR, format!("invalid chunk fid: {}", chunk.fid)).into_response(), + }; + let mut chunk_needle = Needle { + id: chunk_nid, + cookie: chunk_cookie, + ..Needle::default() + }; + match store.read_volume_needle(chunk_vid, &mut chunk_needle) { + Ok(_) => {} + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("read chunk {}: {}", chunk.fid, e)).into_response(), + } + let chunk_data = if chunk_needle.is_compressed() { + use flate2::read::GzDecoder; + use std::io::Read as _; + let mut decoder = GzDecoder::new(&chunk_needle.data[..]); + let mut decompressed = Vec::new(); + if decoder.read_to_end(&mut decompressed).is_ok() { + decompressed + } else { + chunk_needle.data.clone() + } + } else { + chunk_needle.data.clone() + }; + let offset = chunk.offset as usize; + let end = std::cmp::min(offset + chunk_data.len(), result.len()); + let copy_len = end - offset; + if copy_len > 0 { + result[offset..offset + copy_len].copy_from_slice(&chunk_data[..copy_len]); + } + } + + let content_type = if !manifest.mime.is_empty() { + manifest.mime.clone() + } else { + "application/octet-stream".to_string() + }; + + let mut response_headers = HeaderMap::new(); + response_headers.insert(header::CONTENT_TYPE, content_type.parse().unwrap()); + response_headers.insert("X-File-Store", "chunked".parse().unwrap()); + + if *method == Method::HEAD { + response_headers.insert(header::CONTENT_LENGTH, result.len().to_string().parse().unwrap()); + return (StatusCode::OK, response_headers).into_response(); + } + + (StatusCode::OK, response_headers, result).into_response() +} + // ============================================================================ // Helpers // ============================================================================ @@ -743,7 +1072,7 @@ fn extract_jwt(headers: &HeaderMap, uri: &axum::http::Uri) -> Option { if let Ok(cookie_str) = cookie_header.to_str() { for cookie in cookie_str.split(';') { let cookie = cookie.trim(); - if let Some(value) = cookie.strip_prefix("jwt=") { + if let Some(value) = cookie.strip_prefix("AT=") { if !value.is_empty() { return Some(value.to_string()); } diff --git a/seaweed-volume/src/server/volume_server.rs b/seaweed-volume/src/server/volume_server.rs index 97f0cfcb7..62c86a1b5 100644 --- a/seaweed-volume/src/server/volume_server.rs +++ b/seaweed-volume/src/server/volume_server.rs @@ -10,7 +10,7 @@ //! Matches Go's server/volume_server.go. use std::sync::{Arc, RwLock}; -use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering}; use axum::{ Router, @@ -35,6 +35,17 @@ pub struct VolumeServerState { pub maintenance: AtomicBool, /// State version — incremented on each SetState call. pub state_version: AtomicU32, + /// Throttling: concurrent upload/download limits (in bytes, 0 = disabled). + pub concurrent_upload_limit: i64, + pub concurrent_download_limit: i64, + pub inflight_upload_data_timeout: std::time::Duration, + pub inflight_download_data_timeout: std::time::Duration, + /// Current in-flight upload/download bytes. + pub inflight_upload_bytes: AtomicI64, + pub inflight_download_bytes: AtomicI64, + /// Notify waiters when inflight bytes decrease. + pub upload_notify: tokio::sync::Notify, + pub download_notify: tokio::sync::Notify, } impl VolumeServerState { diff --git a/test/volume_server/framework/cluster_rust.go b/test/volume_server/framework/cluster_rust.go index 6d53e2525..e29c128bd 100644 --- a/test/volume_server/framework/cluster_rust.go +++ b/test/volume_server/framework/cluster_rust.go @@ -193,6 +193,15 @@ func (rc *RustCluster) startRustVolume(dataDir string) error { "--dir", dataDir, "--max", "16", "--master", "127.0.0.1:" + strconv.Itoa(rc.masterPort), + "--securityFile", filepath.Join(rc.configDir, "security.toml"), + "--concurrentUploadLimitMB", strconv.Itoa(rc.profile.ConcurrentUploadLimitMB), + "--concurrentDownloadLimitMB", strconv.Itoa(rc.profile.ConcurrentDownloadLimitMB), + } + if rc.profile.InflightUploadTimeout > 0 { + args = append(args, "--inflightUploadDataTimeout", rc.profile.InflightUploadTimeout.String()) + } + if rc.profile.InflightDownloadTimeout > 0 { + args = append(args, "--inflightDownloadDataTimeout", rc.profile.InflightDownloadTimeout.String()) } rc.volumeCmd = exec.Command(rc.rustVolumeBinary, args...)