From 4e4e4c453ded21e518af90a886651c9910a7173b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 9 Mar 2026 02:10:12 -0700 Subject: [PATCH] feat: S3 tier concurrent multipart upload/download with 64MB parts Matches Go's 5 concurrent goroutines and 64MB part size for S3 tier data movement. Uses tokio tasks with semaphore-based concurrency control. --- seaweed-volume/src/remote_storage/s3_tier.rs | 299 ++++++++++++------- 1 file changed, 196 insertions(+), 103 deletions(-) diff --git a/seaweed-volume/src/remote_storage/s3_tier.rs b/seaweed-volume/src/remote_storage/s3_tier.rs index fb82991a9..97444826c 100644 --- a/seaweed-volume/src/remote_storage/s3_tier.rs +++ b/seaweed-volume/src/remote_storage/s3_tier.rs @@ -9,7 +9,11 @@ use std::sync::Arc; use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region}; use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; use aws_sdk_s3::Client; -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; +use tokio::sync::Semaphore; + +/// Concurrency limit for multipart upload/download (matches Go's s3manager). +const CONCURRENCY: usize = 5; /// Configuration for an S3 tier backend. #[derive(Debug, Clone)] @@ -70,14 +74,16 @@ impl S3TierBackend { } } - /// Upload a local file to S3 using multipart upload with progress reporting. + /// Upload a local file to S3 using multipart upload with concurrent parts + /// and progress reporting. /// /// Returns (s3_key, file_size) on success. /// The progress callback receives (bytes_uploaded, percentage). + /// Uses 64MB part size and 5 concurrent uploads (matches Go s3manager). pub async fn upload_file( &self, file_path: &str, - mut progress_fn: F, + progress_fn: F, ) -> Result<(String, u64), String> where F: FnMut(i64, f32) + Send + Sync + 'static, @@ -89,8 +95,8 @@ impl S3TierBackend { .map_err(|e| format!("failed to stat file {}: {}", file_path, e))?; let file_size = metadata.len(); - // Calculate part size: start at 8MB, scale up for very large files (matches Go) - let mut part_size: u64 = 8 * 1024 * 1024; + // Calculate part size: start at 64MB, scale up for very large files (matches Go) + let mut part_size: u64 = 64 * 1024 * 1024; while part_size * 1000 < file_size { part_size *= 4; } @@ -115,61 +121,97 @@ impl S3TierBackend { .ok_or_else(|| "no upload_id in multipart upload response".to_string())? .to_string(); - let mut file = tokio::fs::File::open(file_path) - .await - .map_err(|e| format!("failed to open file {}: {}", file_path, e))?; - - let mut completed_parts = Vec::new(); + // Build list of (part_number, offset, size) for all parts + let mut parts_plan: Vec<(i32, u64, usize)> = Vec::new(); let mut offset: u64 = 0; let mut part_number: i32 = 1; - - loop { + while offset < file_size { let remaining = file_size - offset; - if remaining == 0 { - break; - } let this_part_size = std::cmp::min(part_size, remaining) as usize; - let mut buf = vec![0u8; this_part_size]; - file.read_exact(&mut buf) - .await - .map_err(|e| format!("failed to read file at offset {}: {}", offset, e))?; - - let upload_part_resp = self - .client - .upload_part() - .bucket(&self.bucket) - .key(&key) - .upload_id(&upload_id) - .part_number(part_number) - .body(buf.into()) - .send() - .await - .map_err(|e| { - format!( - "failed to upload part {} at offset {}: {}", - part_number, offset, e - ) - })?; - - let e_tag = upload_part_resp.e_tag().unwrap_or_default().to_string(); - completed_parts.push( - CompletedPart::builder() - .e_tag(e_tag) - .part_number(part_number) - .build(), - ); - + parts_plan.push((part_number, offset, this_part_size)); offset += this_part_size as u64; + part_number += 1; + } - // Report progress - let pct = if file_size > 0 { - (offset as f32 * 100.0) / file_size as f32 - } else { - 100.0 - }; - progress_fn(offset as i64, pct); + // Upload parts concurrently with a semaphore limiting to CONCURRENCY + let semaphore = Arc::new(Semaphore::new(CONCURRENCY)); + let client = &self.client; + let bucket = &self.bucket; + let file_path_owned = file_path.to_string(); + let progress = Arc::new(std::sync::Mutex::new((0u64, progress_fn))); + + let mut handles = Vec::with_capacity(parts_plan.len()); + for (pn, off, size) in parts_plan { + let sem = semaphore.clone(); + let client = client.clone(); + let bucket = bucket.clone(); + let key = key.clone(); + let upload_id = upload_id.clone(); + let fp = file_path_owned.clone(); + let progress = progress.clone(); + + handles.push(tokio::spawn(async move { + let _permit = sem + .acquire() + .await + .map_err(|e| format!("semaphore error: {}", e))?; + + // Read this part's data from the file at the correct offset + let mut file = tokio::fs::File::open(&fp) + .await + .map_err(|e| format!("failed to open file {}: {}", fp, e))?; + file.seek(std::io::SeekFrom::Start(off)) + .await + .map_err(|e| format!("failed to seek to offset {}: {}", off, e))?; + let mut buf = vec![0u8; size]; + file.read_exact(&mut buf) + .await + .map_err(|e| format!("failed to read file at offset {}: {}", off, e))?; + + let upload_part_resp = client + .upload_part() + .bucket(&bucket) + .key(&key) + .upload_id(&upload_id) + .part_number(pn) + .body(buf.into()) + .send() + .await + .map_err(|e| { + format!("failed to upload part {} at offset {}: {}", pn, off, e) + })?; + + let e_tag = upload_part_resp.e_tag().unwrap_or_default().to_string(); + + // Report progress + { + let mut guard = progress.lock().unwrap(); + guard.0 += size as u64; + let uploaded = guard.0; + let pct = if file_size > 0 { + (uploaded as f32 * 100.0) / file_size as f32 + } else { + 100.0 + }; + (guard.1)(uploaded as i64, pct); + } + + Ok::<_, String>( + CompletedPart::builder() + .e_tag(e_tag) + .part_number(pn) + .build(), + ) + })); + } - part_number += 1; + // Collect results, preserving part order + let mut completed_parts = Vec::with_capacity(handles.len()); + for handle in handles { + let part = handle + .await + .map_err(|e| format!("upload task panicked: {}", e))??; + completed_parts.push(part); } // Complete multipart upload @@ -190,14 +232,16 @@ impl S3TierBackend { Ok((key, file_size)) } - /// Download a file from S3 to a local path with progress reporting. + /// Download a file from S3 to a local path with concurrent range requests + /// and progress reporting. /// /// Returns the file size on success. + /// Uses 64MB part size and 5 concurrent downloads (matches Go s3manager). pub async fn download_file( &self, dest_path: &str, key: &str, - mut progress_fn: F, + progress_fn: F, ) -> Result where F: FnMut(i64, f32) + Send + Sync + 'static, @@ -214,61 +258,110 @@ impl S3TierBackend { let file_size = head_resp.content_length().unwrap_or(0) as u64; - // Download in chunks (4MB to match Go) - let part_size: u64 = 4 * 1024 * 1024; - let mut file = tokio::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(dest_path) - .await - .map_err(|e| format!("failed to open dest file {}: {}", dest_path, e))?; - - let mut offset: u64 = 0; - - loop { - if offset >= file_size { - break; - } - let end = std::cmp::min(offset + part_size - 1, file_size - 1); - let range = format!("bytes={}-{}", offset, end); - - let get_resp = self - .client - .get_object() - .bucket(&self.bucket) - .key(key) - .range(&range) - .send() + // Pre-allocate file to full size so concurrent WriteAt-style writes work + { + let file = tokio::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(dest_path) .await - .map_err(|e| format!("failed to get object {} range {}: {}", key, range, e))?; - - let body = get_resp - .body - .collect() + .map_err(|e| format!("failed to open dest file {}: {}", dest_path, e))?; + file.set_len(file_size) .await - .map_err(|e| format!("failed to read body: {}", e))?; - let bytes = body.into_bytes(); + .map_err(|e| format!("failed to set file length: {}", e))?; + } - use tokio::io::AsyncWriteExt; - file.write_all(&bytes) - .await - .map_err(|e| format!("failed to write to {}: {}", dest_path, e))?; + let part_size: u64 = 64 * 1024 * 1024; - offset += bytes.len() as u64; + // Build list of (offset, size) for all parts + let mut parts_plan: Vec<(u64, u64)> = Vec::new(); + let mut offset: u64 = 0; + while offset < file_size { + let remaining = file_size - offset; + let this_part_size = std::cmp::min(part_size, remaining); + parts_plan.push((offset, this_part_size)); + offset += this_part_size; + } - let pct = if file_size > 0 { - (offset as f32 * 100.0) / file_size as f32 - } else { - 100.0 - }; - progress_fn(offset as i64, pct); + // Download parts concurrently with a semaphore limiting to CONCURRENCY + let semaphore = Arc::new(Semaphore::new(CONCURRENCY)); + let client = &self.client; + let bucket = &self.bucket; + let dest_path_owned = dest_path.to_string(); + let key_owned = key.to_string(); + let progress = Arc::new(std::sync::Mutex::new((0u64, progress_fn))); + + let mut handles = Vec::with_capacity(parts_plan.len()); + for (off, size) in parts_plan { + let sem = semaphore.clone(); + let client = client.clone(); + let bucket = bucket.clone(); + let key = key_owned.clone(); + let dp = dest_path_owned.clone(); + let progress = progress.clone(); + + handles.push(tokio::spawn(async move { + let _permit = sem + .acquire() + .await + .map_err(|e| format!("semaphore error: {}", e))?; + + let end = off + size - 1; + let range = format!("bytes={}-{}", off, end); + + let get_resp = client + .get_object() + .bucket(&bucket) + .key(&key) + .range(&range) + .send() + .await + .map_err(|e| format!("failed to get object {} range {}: {}", key, range, e))?; + + let body = get_resp + .body + .collect() + .await + .map_err(|e| format!("failed to read body: {}", e))?; + let bytes = body.into_bytes(); + + // Write at the correct offset (like Go's WriteAt) + let mut file = tokio::fs::OpenOptions::new() + .write(true) + .open(&dp) + .await + .map_err(|e| format!("failed to open dest file {}: {}", dp, e))?; + file.seek(std::io::SeekFrom::Start(off)) + .await + .map_err(|e| format!("failed to seek to offset {}: {}", off, e))?; + file.write_all(&bytes) + .await + .map_err(|e| format!("failed to write to {}: {}", dp, e))?; + + // Report progress + { + let mut guard = progress.lock().unwrap(); + guard.0 += bytes.len() as u64; + let downloaded = guard.0; + let pct = if file_size > 0 { + (downloaded as f32 * 100.0) / file_size as f32 + } else { + 100.0 + }; + (guard.1)(downloaded as i64, pct); + } + + Ok::<_, String>(()) + })); } - use tokio::io::AsyncWriteExt; - file.flush() - .await - .map_err(|e| format!("failed to flush {}: {}", dest_path, e))?; + // Wait for all download tasks + for handle in handles { + handle + .await + .map_err(|e| format!("download task panicked: {}", e))??; + } Ok(file_size) }