Browse Source

JWT auth, chunk manifest expansion, and upload/download throttling

- Wire JWT security config from --securityFile TOML into Guard
- Fix cookie-based JWT extraction (AT cookie, not jwt cookie)
- Set JWT leeway to 0 to properly reject expired tokens
- Add file_id extraction and validation for per-file JWT scoping
- Implement chunk manifest expansion on read (X-File-Store: chunked)
- Implement chunk manifest delete (delete children, return manifest size)
- Add upload throttling with inflight byte tracking and RAII guard
- Add download throttling with TrackedBody that releases on Drop
- Pass --securityFile and throttling flags from test framework

HTTP tests: 53/55 (was 40/55). Remaining: ImageResize, CONNECT method.
rust-volume-server
Chris Lu 2 weeks ago
parent
commit
1b5a97660f
  1. 1
      seaweed-volume/Cargo.lock
  2. 1
      seaweed-volume/Cargo.toml
  3. 87
      seaweed-volume/src/config.rs
  4. 18
      seaweed-volume/src/main.rs
  5. 3
      seaweed-volume/src/security.rs
  6. 363
      seaweed-volume/src/server/handlers.rs
  7. 13
      seaweed-volume/src/server/volume_server.rs
  8. 9
      test/volume_server/framework/cluster_rust.go

1
seaweed-volume/Cargo.lock

@ -2473,6 +2473,7 @@ dependencies = [
"flate2",
"futures",
"hex",
"http-body",
"hyper",
"hyper-util",
"jsonwebtoken",

1
seaweed-volume/Cargo.toml

@ -16,6 +16,7 @@ prost-types = "0.13"
# HTTP server
axum = { version = "0.7", features = ["multipart"] }
http-body = "1"
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["tokio"] }
tower = "0.4"

87
seaweed-volume/src/config.rs

@ -177,6 +177,10 @@ pub struct Cli {
/// HTTP port for debugging.
#[arg(long = "debug.port", default_value_t = 6060)]
pub debug_port: u16,
/// Path to security.toml configuration file for JWT signing keys.
#[arg(long = "securityFile", default_value = "")]
pub security_file: String,
}
/// Resolved configuration after applying defaults and validation.
@ -220,6 +224,10 @@ pub struct VolumeServerConfig {
pub metrics_ip: String,
pub debug: bool,
pub debug_port: u16,
pub jwt_signing_key: Vec<u8>,
pub jwt_signing_expires_seconds: i64,
pub jwt_read_signing_key: Vec<u8>,
pub jwt_read_signing_expires_seconds: i64,
}
pub use crate::storage::needle_map::NeedleMapKind;
@ -536,6 +544,10 @@ fn resolve_config(cli: Cli) -> VolumeServerConfig {
let inflight_upload_data_timeout = parse_duration(&cli.inflight_upload_data_timeout);
let inflight_download_data_timeout = parse_duration(&cli.inflight_download_data_timeout);
// Parse security config from TOML file
let (jwt_signing_key, jwt_signing_expires, jwt_read_signing_key, jwt_read_signing_expires) =
parse_security_config(&cli.security_file);
VolumeServerConfig {
port: cli.port,
grpc_port,
@ -575,7 +587,82 @@ fn resolve_config(cli: Cli) -> VolumeServerConfig {
metrics_ip,
debug: cli.debug,
debug_port: cli.debug_port,
jwt_signing_key,
jwt_signing_expires_seconds: jwt_signing_expires,
jwt_read_signing_key: jwt_read_signing_key,
jwt_read_signing_expires_seconds: jwt_read_signing_expires,
}
}
/// Parse a security.toml file to extract JWT signing keys.
/// Format:
/// ```toml
/// [jwt.signing]
/// key = "secret"
/// expires_after_seconds = 60
///
/// [jwt.signing.read]
/// key = "read-secret"
/// expires_after_seconds = 60
/// ```
fn parse_security_config(path: &str) -> (Vec<u8>, i64, Vec<u8>, i64) {
if path.is_empty() {
return (vec![], 0, vec![], 0);
}
let content = match std::fs::read_to_string(path) {
Ok(c) => c,
Err(_) => return (vec![], 0, vec![], 0),
};
let mut signing_key = Vec::new();
let mut signing_expires: i64 = 0;
let mut read_key = Vec::new();
let mut read_expires: i64 = 0;
// Simple TOML parser for the specific security config format
let mut in_jwt_signing = false;
let mut in_jwt_signing_read = false;
for line in content.lines() {
let trimmed = line.trim();
if trimmed.starts_with('#') || trimmed.is_empty() {
continue;
}
if trimmed == "[jwt.signing.read]" {
in_jwt_signing = false;
in_jwt_signing_read = true;
continue;
}
if trimmed == "[jwt.signing]" {
in_jwt_signing = true;
in_jwt_signing_read = false;
continue;
}
if trimmed.starts_with('[') {
in_jwt_signing = false;
in_jwt_signing_read = false;
continue;
}
if let Some((key, value)) = trimmed.split_once('=') {
let key = key.trim();
let value = value.trim().trim_matches('"');
if in_jwt_signing_read {
match key {
"key" => read_key = value.as_bytes().to_vec(),
"expires_after_seconds" => read_expires = value.parse().unwrap_or(0),
_ => {}
}
} else if in_jwt_signing {
match key {
"key" => signing_key = value.as_bytes().to_vec(),
"expires_after_seconds" => signing_expires = value.parse().unwrap_or(0),
_ => {}
}
}
}
}
(signing_key, signing_expires, read_key, read_expires)
}
/// Detect the host's IP address.

18
seaweed-volume/src/main.rs

@ -67,14 +67,12 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
}
// Build shared state
// TODO: Wire up JWT signing keys from config. Empty keys are acceptable for now
// while the Rust volume server is still in development.
let guard = Guard::new(
&config.white_list,
SigningKey(vec![]),
0,
SigningKey(vec![]),
0,
SigningKey(config.jwt_signing_key.clone()),
config.jwt_signing_expires_seconds,
SigningKey(config.jwt_read_signing_key.clone()),
config.jwt_read_signing_expires_seconds,
);
let state = Arc::new(VolumeServerState {
store: RwLock::new(store),
@ -82,6 +80,14 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
is_stopping: RwLock::new(false),
maintenance: std::sync::atomic::AtomicBool::new(false),
state_version: std::sync::atomic::AtomicU32::new(0),
concurrent_upload_limit: config.concurrent_upload_limit,
concurrent_download_limit: config.concurrent_download_limit,
inflight_upload_data_timeout: config.inflight_upload_data_timeout,
inflight_download_data_timeout: config.inflight_download_data_timeout,
inflight_upload_bytes: std::sync::atomic::AtomicI64::new(0),
inflight_download_bytes: std::sync::atomic::AtomicI64::new(0),
upload_notify: tokio::sync::Notify::new(),
download_notify: tokio::sync::Notify::new(),
});
// Build HTTP routers

3
seaweed-volume/src/security.rs

@ -91,7 +91,10 @@ pub fn decode_jwt(
let mut validation = Validation::new(Algorithm::HS256);
// Match Go behavior: tokens without exp are accepted (Go's jwt-go does not require exp)
// But if exp IS present, it must be valid (not expired).
validation.required_spec_claims.clear();
validation.validate_exp = true;
validation.leeway = 0;
let data = decode::<FileIdClaims>(
token,

363
seaweed-volume/src/server/handlers.rs

@ -5,6 +5,7 @@
//! volume_server_handlers_admin.go.
use std::sync::Arc;
use std::sync::atomic::Ordering;
use axum::body::Body;
use axum::extract::{Query, State};
@ -18,12 +19,85 @@ use crate::storage::needle::needle::Needle;
use crate::storage::types::*;
use super::volume_server::VolumeServerState;
// ============================================================================
// Inflight Throttle Guard
// ============================================================================
/// RAII guard that subtracts bytes from an atomic counter and notifies waiters on drop.
struct InflightGuard<'a> {
counter: &'a std::sync::atomic::AtomicI64,
bytes: i64,
notify: &'a tokio::sync::Notify,
}
impl<'a> Drop for InflightGuard<'a> {
fn drop(&mut self) {
self.counter.fetch_sub(self.bytes, Ordering::Relaxed);
self.notify.notify_waiters();
}
}
/// Body wrapper that tracks download inflight bytes and releases them when dropped.
struct TrackedBody {
data: Vec<u8>,
state: Arc<VolumeServerState>,
bytes: i64,
}
impl http_body::Body for TrackedBody {
type Data = bytes::Bytes;
type Error = std::convert::Infallible;
fn poll_frame(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
if self.data.is_empty() {
return std::task::Poll::Ready(None);
}
let data = std::mem::take(&mut self.data);
std::task::Poll::Ready(Some(Ok(http_body::Frame::data(bytes::Bytes::from(data)))))
}
}
impl Drop for TrackedBody {
fn drop(&mut self) {
self.state.inflight_download_bytes.fetch_sub(self.bytes, Ordering::Relaxed);
self.state.download_notify.notify_waiters();
}
}
// ============================================================================
// URL Parsing
// ============================================================================
/// Parse volume ID and file ID from URL path.
/// Supports: "vid,fid", "vid/fid", "vid,fid.ext", "vid/fid/filename.ext"
/// Extract the file_id string (e.g., "3,01637037d6") from a URL path for JWT validation.
fn extract_file_id(path: &str) -> String {
let path = path.trim_start_matches('/');
// Strip extension and filename after second slash
if let Some(comma) = path.find(',') {
let after_comma = &path[comma + 1..];
let fid_part = if let Some(slash) = after_comma.find('/') {
&after_comma[..slash]
} else if let Some(dot) = after_comma.rfind('.') {
&after_comma[..dot]
} else {
after_comma
};
// Strip "_suffix" from fid (Go does this for filenames appended with underscore)
let fid_part = if let Some(underscore) = fid_part.rfind('_') {
&fid_part[..underscore]
} else {
fid_part
};
format!("{},{}", &path[..comma], fid_part)
} else {
path.to_string()
}
}
fn parse_url_path(path: &str) -> Option<(VolumeId, NeedleId, Cookie)> {
let path = path.trim_start_matches('/');
@ -68,6 +142,8 @@ pub struct ReadQueryParams {
pub dl: Option<String>,
#[serde(rename = "readDeleted")]
pub read_deleted: Option<String>,
/// cm=false disables chunk manifest expansion (returns raw manifest JSON).
pub cm: Option<String>,
}
// ============================================================================
@ -117,11 +193,36 @@ async fn get_or_head_handler_inner(
};
// JWT check for reads
let file_id = extract_file_id(&path);
let token = extract_jwt(&headers, request.uri());
if let Err(e) = state.guard.check_jwt(token.as_deref(), false) {
if let Err(e) = state.guard.check_jwt_for_file(token.as_deref(), &file_id, false) {
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
}
// Download throttling
let download_guard = if state.concurrent_download_limit > 0 {
let timeout = if state.inflight_download_data_timeout.is_zero() {
std::time::Duration::from_secs(2)
} else {
state.inflight_download_data_timeout
};
let deadline = tokio::time::Instant::now() + timeout;
loop {
let current = state.inflight_download_bytes.load(Ordering::Relaxed);
if current < state.concurrent_download_limit {
break;
}
if tokio::time::timeout_at(deadline, state.download_notify.notified()).await.is_err() {
return (StatusCode::TOO_MANY_REQUESTS, "download limit exceeded").into_response();
}
}
// We'll set the actual bytes after reading the needle (once we know the size)
Some(state.clone())
} else {
None
};
// Read needle
let mut n = Needle {
id: needle_id,
@ -154,6 +255,12 @@ async fn get_or_head_handler_inner(
return StatusCode::NOT_FOUND.into_response();
}
// Chunk manifest expansion
let bypass_cm = query.cm.as_deref() == Some("false");
if n.is_chunk_manifest() && !bypass_cm {
return expand_chunk_manifest(&state, &n, &headers, &method);
}
// Build ETag
let etag = format!("\"{}\"", n.etag());
@ -267,6 +374,22 @@ async fn get_or_head_handler_inner(
.with_label_values(&["read"])
.observe(start.elapsed().as_secs_f64());
// If download throttling is active, wrap the body so we track when it's fully sent
if download_guard.is_some() {
let data_len = data.len() as i64;
state.inflight_download_bytes.fetch_add(data_len, Ordering::Relaxed);
let tracked_body = TrackedBody {
data,
state: state.clone(),
bytes: data_len,
};
let body = Body::new(tracked_body);
let mut resp = Response::new(body);
*resp.status_mut() = StatusCode::OK;
*resp.headers_mut() = response_headers;
return resp;
}
(StatusCode::OK, response_headers, data).into_response()
}
@ -413,11 +536,52 @@ pub async fn post_handler(
};
// JWT check for writes
let file_id = extract_file_id(&path);
let token = extract_jwt(&headers, request.uri());
if let Err(e) = state.guard.check_jwt(token.as_deref(), true) {
if let Err(e) = state.guard.check_jwt_for_file(token.as_deref(), &file_id, true) {
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
}
// Upload throttling: check inflight bytes against limit
let is_replicate = query.split('&').any(|p| p == "type=replicate");
let content_length = headers.get(header::CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(0);
if !is_replicate && state.concurrent_upload_limit > 0 {
// Wait for inflight bytes to drop below limit, or timeout
let timeout = if state.inflight_upload_data_timeout.is_zero() {
std::time::Duration::from_secs(2)
} else {
state.inflight_upload_data_timeout
};
let deadline = tokio::time::Instant::now() + timeout;
loop {
let current = state.inflight_upload_bytes.load(Ordering::Relaxed);
if current < state.concurrent_upload_limit {
break;
}
// Wait for notification or timeout
if tokio::time::timeout_at(deadline, state.upload_notify.notified()).await.is_err() {
return (StatusCode::TOO_MANY_REQUESTS, "upload limit exceeded").into_response();
}
}
state.inflight_upload_bytes.fetch_add(content_length, Ordering::Relaxed);
}
// RAII guard to release upload throttle on any exit path
let _upload_guard = if !is_replicate && state.concurrent_upload_limit > 0 {
Some(InflightGuard {
counter: &state.inflight_upload_bytes,
bytes: content_length,
notify: &state.upload_notify,
})
} else {
None
};
// Check for chunk manifest flag
let is_chunk_manifest = query.split('&')
.any(|p| p == "cm=true" || p == "cm=1");
@ -479,22 +643,22 @@ pub async fn post_handler(
}
let mut store = state.store.write().unwrap();
match store.write_volume_needle(vid, &mut n) {
let resp = match store.write_volume_needle(vid, &mut n) {
Ok((_offset, _size, is_unchanged)) => {
if is_unchanged {
let etag = format!("\"{}\"", n.etag());
return (StatusCode::NO_CONTENT, [(header::ETAG, etag)]).into_response();
(StatusCode::NO_CONTENT, [(header::ETAG, etag)]).into_response()
} else {
let result = UploadResult {
name: String::new(),
size: n.data_size,
etag: n.etag(),
};
metrics::REQUEST_DURATION
.with_label_values(&["write"])
.observe(start.elapsed().as_secs_f64());
(StatusCode::CREATED, axum::Json(result)).into_response()
}
let result = UploadResult {
name: String::new(),
size: n.data_size,
etag: n.etag(),
};
metrics::REQUEST_DURATION
.with_label_values(&["write"])
.observe(start.elapsed().as_secs_f64());
(StatusCode::CREATED, axum::Json(result)).into_response()
}
Err(crate::storage::volume::VolumeError::NotFound) => {
(StatusCode::NOT_FOUND, "volume not found").into_response()
@ -505,7 +669,10 @@ pub async fn post_handler(
Err(e) => {
(StatusCode::INTERNAL_SERVER_ERROR, format!("write error: {}", e)).into_response()
}
}
};
// _upload_guard drops here, releasing inflight bytes
resp
}
// ============================================================================
@ -533,8 +700,9 @@ pub async fn delete_handler(
};
// JWT check for writes (deletes use write key)
let file_id = extract_file_id(&path);
let token = extract_jwt(&headers, request.uri());
if let Err(e) = state.guard.check_jwt(token.as_deref(), true) {
if let Err(e) = state.guard.check_jwt_for_file(token.as_deref(), &file_id, true) {
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
}
@ -560,6 +728,63 @@ pub async fn delete_handler(
return (StatusCode::BAD_REQUEST, "File Random Cookie does not match.").into_response();
}
// If this is a chunk manifest, delete child chunks first
if n.is_chunk_manifest() {
let manifest_data = if n.is_compressed() {
use flate2::read::GzDecoder;
use std::io::Read as _;
let mut decoder = GzDecoder::new(&n.data[..]);
let mut decompressed = Vec::new();
if decoder.read_to_end(&mut decompressed).is_ok() {
decompressed
} else {
n.data.clone()
}
} else {
n.data.clone()
};
if let Ok(manifest) = serde_json::from_slice::<ChunkManifest>(&manifest_data) {
// Delete all child chunks first
for chunk in &manifest.chunks {
let (chunk_vid, chunk_nid, chunk_cookie) = match parse_url_path(&chunk.fid) {
Some(p) => p,
None => {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("invalid chunk fid: {}", chunk.fid)).into_response();
}
};
let mut chunk_needle = Needle {
id: chunk_nid,
cookie: chunk_cookie,
..Needle::default()
};
// Read the chunk to validate it exists
{
let store = state.store.read().unwrap();
if let Err(e) = store.read_volume_needle(chunk_vid, &mut chunk_needle) {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("read chunk {}: {}", chunk.fid, e)).into_response();
}
}
// Delete the chunk
let mut store = state.store.write().unwrap();
if let Err(e) = store.delete_volume_needle(chunk_vid, &mut chunk_needle) {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("delete chunk {}: {}", chunk.fid, e)).into_response();
}
}
// Delete the manifest itself
let mut store = state.store.write().unwrap();
if let Err(e) = store.delete_volume_needle(vid, &mut n) {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("delete manifest: {}", e)).into_response();
}
metrics::REQUEST_DURATION
.with_label_values(&["delete"])
.observe(start.elapsed().as_secs_f64());
// Return the manifest's declared size (matches Go behavior)
let result = DeleteResult { size: manifest.size as i32 };
return (StatusCode::ACCEPTED, axum::Json(result)).into_response();
}
}
let mut store = state.store.write().unwrap();
match store.delete_volume_needle(vid, &mut n) {
Ok(size) => {
@ -711,6 +936,110 @@ pub async fn ui_handler(
.into_response()
}
// ============================================================================
// Chunk Manifest
// ============================================================================
#[derive(Deserialize)]
struct ChunkManifest {
#[serde(default)]
name: String,
#[serde(default)]
mime: String,
#[serde(default)]
size: i64,
#[serde(default)]
chunks: Vec<ChunkInfo>,
}
#[derive(Deserialize)]
struct ChunkInfo {
fid: String,
offset: i64,
size: i64,
}
/// Expand a chunk manifest needle: read each chunk and concatenate.
fn expand_chunk_manifest(
state: &Arc<VolumeServerState>,
n: &Needle,
_headers: &HeaderMap,
method: &Method,
) -> Response {
let data = if n.is_compressed() {
use flate2::read::GzDecoder;
use std::io::Read as _;
let mut decoder = GzDecoder::new(&n.data[..]);
let mut decompressed = Vec::new();
if decoder.read_to_end(&mut decompressed).is_err() {
return (StatusCode::INTERNAL_SERVER_ERROR, "failed to decompress manifest").into_response();
}
decompressed
} else {
n.data.clone()
};
let manifest: ChunkManifest = match serde_json::from_slice(&data) {
Ok(m) => m,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("invalid chunk manifest: {}", e)).into_response(),
};
// Read and concatenate all chunks
let mut result = vec![0u8; manifest.size as usize];
let store = state.store.read().unwrap();
for chunk in &manifest.chunks {
let (chunk_vid, chunk_nid, chunk_cookie) = match parse_url_path(&chunk.fid) {
Some(p) => p,
None => return (StatusCode::INTERNAL_SERVER_ERROR, format!("invalid chunk fid: {}", chunk.fid)).into_response(),
};
let mut chunk_needle = Needle {
id: chunk_nid,
cookie: chunk_cookie,
..Needle::default()
};
match store.read_volume_needle(chunk_vid, &mut chunk_needle) {
Ok(_) => {}
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("read chunk {}: {}", chunk.fid, e)).into_response(),
}
let chunk_data = if chunk_needle.is_compressed() {
use flate2::read::GzDecoder;
use std::io::Read as _;
let mut decoder = GzDecoder::new(&chunk_needle.data[..]);
let mut decompressed = Vec::new();
if decoder.read_to_end(&mut decompressed).is_ok() {
decompressed
} else {
chunk_needle.data.clone()
}
} else {
chunk_needle.data.clone()
};
let offset = chunk.offset as usize;
let end = std::cmp::min(offset + chunk_data.len(), result.len());
let copy_len = end - offset;
if copy_len > 0 {
result[offset..offset + copy_len].copy_from_slice(&chunk_data[..copy_len]);
}
}
let content_type = if !manifest.mime.is_empty() {
manifest.mime.clone()
} else {
"application/octet-stream".to_string()
};
let mut response_headers = HeaderMap::new();
response_headers.insert(header::CONTENT_TYPE, content_type.parse().unwrap());
response_headers.insert("X-File-Store", "chunked".parse().unwrap());
if *method == Method::HEAD {
response_headers.insert(header::CONTENT_LENGTH, result.len().to_string().parse().unwrap());
return (StatusCode::OK, response_headers).into_response();
}
(StatusCode::OK, response_headers, result).into_response()
}
// ============================================================================
// Helpers
// ============================================================================
@ -743,7 +1072,7 @@ fn extract_jwt(headers: &HeaderMap, uri: &axum::http::Uri) -> Option<String> {
if let Ok(cookie_str) = cookie_header.to_str() {
for cookie in cookie_str.split(';') {
let cookie = cookie.trim();
if let Some(value) = cookie.strip_prefix("jwt=") {
if let Some(value) = cookie.strip_prefix("AT=") {
if !value.is_empty() {
return Some(value.to_string());
}

13
seaweed-volume/src/server/volume_server.rs

@ -10,7 +10,7 @@
//! Matches Go's server/volume_server.go.
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering};
use axum::{
Router,
@ -35,6 +35,17 @@ pub struct VolumeServerState {
pub maintenance: AtomicBool,
/// State version — incremented on each SetState call.
pub state_version: AtomicU32,
/// Throttling: concurrent upload/download limits (in bytes, 0 = disabled).
pub concurrent_upload_limit: i64,
pub concurrent_download_limit: i64,
pub inflight_upload_data_timeout: std::time::Duration,
pub inflight_download_data_timeout: std::time::Duration,
/// Current in-flight upload/download bytes.
pub inflight_upload_bytes: AtomicI64,
pub inflight_download_bytes: AtomicI64,
/// Notify waiters when inflight bytes decrease.
pub upload_notify: tokio::sync::Notify,
pub download_notify: tokio::sync::Notify,
}
impl VolumeServerState {

9
test/volume_server/framework/cluster_rust.go

@ -193,6 +193,15 @@ func (rc *RustCluster) startRustVolume(dataDir string) error {
"--dir", dataDir,
"--max", "16",
"--master", "127.0.0.1:" + strconv.Itoa(rc.masterPort),
"--securityFile", filepath.Join(rc.configDir, "security.toml"),
"--concurrentUploadLimitMB", strconv.Itoa(rc.profile.ConcurrentUploadLimitMB),
"--concurrentDownloadLimitMB", strconv.Itoa(rc.profile.ConcurrentDownloadLimitMB),
}
if rc.profile.InflightUploadTimeout > 0 {
args = append(args, "--inflightUploadDataTimeout", rc.profile.InflightUploadTimeout.String())
}
if rc.profile.InflightDownloadTimeout > 0 {
args = append(args, "--inflightDownloadDataTimeout", rc.profile.InflightDownloadTimeout.String())
}
rc.volumeCmd = exec.Command(rc.rustVolumeBinary, args...)

Loading…
Cancel
Save