Browse Source

feat: S3 tier concurrent multipart upload/download with 64MB parts

Matches Go's 5 concurrent goroutines and 64MB part size for S3 tier
data movement. Uses tokio tasks with semaphore-based concurrency control.
rust-volume-server
Chris Lu 1 day ago
parent
commit
4e4e4c453d
  1. 299
      seaweed-volume/src/remote_storage/s3_tier.rs

299
seaweed-volume/src/remote_storage/s3_tier.rs

@ -9,7 +9,11 @@ use std::sync::Arc;
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region}; use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
use aws_sdk_s3::Client; use aws_sdk_s3::Client;
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::sync::Semaphore;
/// Concurrency limit for multipart upload/download (matches Go's s3manager).
const CONCURRENCY: usize = 5;
/// Configuration for an S3 tier backend. /// Configuration for an S3 tier backend.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -70,14 +74,16 @@ impl S3TierBackend {
} }
} }
/// Upload a local file to S3 using multipart upload with progress reporting.
/// Upload a local file to S3 using multipart upload with concurrent parts
/// and progress reporting.
/// ///
/// Returns (s3_key, file_size) on success. /// Returns (s3_key, file_size) on success.
/// The progress callback receives (bytes_uploaded, percentage). /// The progress callback receives (bytes_uploaded, percentage).
/// Uses 64MB part size and 5 concurrent uploads (matches Go s3manager).
pub async fn upload_file<F>( pub async fn upload_file<F>(
&self, &self,
file_path: &str, file_path: &str,
mut progress_fn: F,
progress_fn: F,
) -> Result<(String, u64), String> ) -> Result<(String, u64), String>
where where
F: FnMut(i64, f32) + Send + Sync + 'static, F: FnMut(i64, f32) + Send + Sync + 'static,
@ -89,8 +95,8 @@ impl S3TierBackend {
.map_err(|e| format!("failed to stat file {}: {}", file_path, e))?; .map_err(|e| format!("failed to stat file {}: {}", file_path, e))?;
let file_size = metadata.len(); let file_size = metadata.len();
// Calculate part size: start at 8MB, scale up for very large files (matches Go)
let mut part_size: u64 = 8 * 1024 * 1024;
// Calculate part size: start at 64MB, scale up for very large files (matches Go)
let mut part_size: u64 = 64 * 1024 * 1024;
while part_size * 1000 < file_size { while part_size * 1000 < file_size {
part_size *= 4; part_size *= 4;
} }
@ -115,61 +121,97 @@ impl S3TierBackend {
.ok_or_else(|| "no upload_id in multipart upload response".to_string())? .ok_or_else(|| "no upload_id in multipart upload response".to_string())?
.to_string(); .to_string();
let mut file = tokio::fs::File::open(file_path)
.await
.map_err(|e| format!("failed to open file {}: {}", file_path, e))?;
let mut completed_parts = Vec::new();
// Build list of (part_number, offset, size) for all parts
let mut parts_plan: Vec<(i32, u64, usize)> = Vec::new();
let mut offset: u64 = 0; let mut offset: u64 = 0;
let mut part_number: i32 = 1; let mut part_number: i32 = 1;
loop {
while offset < file_size {
let remaining = file_size - offset; let remaining = file_size - offset;
if remaining == 0 {
break;
}
let this_part_size = std::cmp::min(part_size, remaining) as usize; let this_part_size = std::cmp::min(part_size, remaining) as usize;
let mut buf = vec![0u8; this_part_size];
file.read_exact(&mut buf)
.await
.map_err(|e| format!("failed to read file at offset {}: {}", offset, e))?;
let upload_part_resp = self
.client
.upload_part()
.bucket(&self.bucket)
.key(&key)
.upload_id(&upload_id)
.part_number(part_number)
.body(buf.into())
.send()
.await
.map_err(|e| {
format!(
"failed to upload part {} at offset {}: {}",
part_number, offset, e
)
})?;
let e_tag = upload_part_resp.e_tag().unwrap_or_default().to_string();
completed_parts.push(
CompletedPart::builder()
.e_tag(e_tag)
.part_number(part_number)
.build(),
);
parts_plan.push((part_number, offset, this_part_size));
offset += this_part_size as u64; offset += this_part_size as u64;
part_number += 1;
}
// Report progress
let pct = if file_size > 0 {
(offset as f32 * 100.0) / file_size as f32
} else {
100.0
};
progress_fn(offset as i64, pct);
// Upload parts concurrently with a semaphore limiting to CONCURRENCY
let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
let client = &self.client;
let bucket = &self.bucket;
let file_path_owned = file_path.to_string();
let progress = Arc::new(std::sync::Mutex::new((0u64, progress_fn)));
let mut handles = Vec::with_capacity(parts_plan.len());
for (pn, off, size) in parts_plan {
let sem = semaphore.clone();
let client = client.clone();
let bucket = bucket.clone();
let key = key.clone();
let upload_id = upload_id.clone();
let fp = file_path_owned.clone();
let progress = progress.clone();
handles.push(tokio::spawn(async move {
let _permit = sem
.acquire()
.await
.map_err(|e| format!("semaphore error: {}", e))?;
// Read this part's data from the file at the correct offset
let mut file = tokio::fs::File::open(&fp)
.await
.map_err(|e| format!("failed to open file {}: {}", fp, e))?;
file.seek(std::io::SeekFrom::Start(off))
.await
.map_err(|e| format!("failed to seek to offset {}: {}", off, e))?;
let mut buf = vec![0u8; size];
file.read_exact(&mut buf)
.await
.map_err(|e| format!("failed to read file at offset {}: {}", off, e))?;
let upload_part_resp = client
.upload_part()
.bucket(&bucket)
.key(&key)
.upload_id(&upload_id)
.part_number(pn)
.body(buf.into())
.send()
.await
.map_err(|e| {
format!("failed to upload part {} at offset {}: {}", pn, off, e)
})?;
let e_tag = upload_part_resp.e_tag().unwrap_or_default().to_string();
// Report progress
{
let mut guard = progress.lock().unwrap();
guard.0 += size as u64;
let uploaded = guard.0;
let pct = if file_size > 0 {
(uploaded as f32 * 100.0) / file_size as f32
} else {
100.0
};
(guard.1)(uploaded as i64, pct);
}
Ok::<_, String>(
CompletedPart::builder()
.e_tag(e_tag)
.part_number(pn)
.build(),
)
}));
}
part_number += 1;
// Collect results, preserving part order
let mut completed_parts = Vec::with_capacity(handles.len());
for handle in handles {
let part = handle
.await
.map_err(|e| format!("upload task panicked: {}", e))??;
completed_parts.push(part);
} }
// Complete multipart upload // Complete multipart upload
@ -190,14 +232,16 @@ impl S3TierBackend {
Ok((key, file_size)) Ok((key, file_size))
} }
/// Download a file from S3 to a local path with progress reporting.
/// Download a file from S3 to a local path with concurrent range requests
/// and progress reporting.
/// ///
/// Returns the file size on success. /// Returns the file size on success.
/// Uses 64MB part size and 5 concurrent downloads (matches Go s3manager).
pub async fn download_file<F>( pub async fn download_file<F>(
&self, &self,
dest_path: &str, dest_path: &str,
key: &str, key: &str,
mut progress_fn: F,
progress_fn: F,
) -> Result<u64, String> ) -> Result<u64, String>
where where
F: FnMut(i64, f32) + Send + Sync + 'static, F: FnMut(i64, f32) + Send + Sync + 'static,
@ -214,61 +258,110 @@ impl S3TierBackend {
let file_size = head_resp.content_length().unwrap_or(0) as u64; let file_size = head_resp.content_length().unwrap_or(0) as u64;
// Download in chunks (4MB to match Go)
let part_size: u64 = 4 * 1024 * 1024;
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(dest_path)
.await
.map_err(|e| format!("failed to open dest file {}: {}", dest_path, e))?;
let mut offset: u64 = 0;
loop {
if offset >= file_size {
break;
}
let end = std::cmp::min(offset + part_size - 1, file_size - 1);
let range = format!("bytes={}-{}", offset, end);
let get_resp = self
.client
.get_object()
.bucket(&self.bucket)
.key(key)
.range(&range)
.send()
// Pre-allocate file to full size so concurrent WriteAt-style writes work
{
let file = tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(dest_path)
.await .await
.map_err(|e| format!("failed to get object {} range {}: {}", key, range, e))?;
let body = get_resp
.body
.collect()
.map_err(|e| format!("failed to open dest file {}: {}", dest_path, e))?;
file.set_len(file_size)
.await .await
.map_err(|e| format!("failed to read body: {}", e))?;
let bytes = body.into_bytes();
.map_err(|e| format!("failed to set file length: {}", e))?;
}
use tokio::io::AsyncWriteExt;
file.write_all(&bytes)
.await
.map_err(|e| format!("failed to write to {}: {}", dest_path, e))?;
let part_size: u64 = 64 * 1024 * 1024;
offset += bytes.len() as u64;
// Build list of (offset, size) for all parts
let mut parts_plan: Vec<(u64, u64)> = Vec::new();
let mut offset: u64 = 0;
while offset < file_size {
let remaining = file_size - offset;
let this_part_size = std::cmp::min(part_size, remaining);
parts_plan.push((offset, this_part_size));
offset += this_part_size;
}
let pct = if file_size > 0 {
(offset as f32 * 100.0) / file_size as f32
} else {
100.0
};
progress_fn(offset as i64, pct);
// Download parts concurrently with a semaphore limiting to CONCURRENCY
let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
let client = &self.client;
let bucket = &self.bucket;
let dest_path_owned = dest_path.to_string();
let key_owned = key.to_string();
let progress = Arc::new(std::sync::Mutex::new((0u64, progress_fn)));
let mut handles = Vec::with_capacity(parts_plan.len());
for (off, size) in parts_plan {
let sem = semaphore.clone();
let client = client.clone();
let bucket = bucket.clone();
let key = key_owned.clone();
let dp = dest_path_owned.clone();
let progress = progress.clone();
handles.push(tokio::spawn(async move {
let _permit = sem
.acquire()
.await
.map_err(|e| format!("semaphore error: {}", e))?;
let end = off + size - 1;
let range = format!("bytes={}-{}", off, end);
let get_resp = client
.get_object()
.bucket(&bucket)
.key(&key)
.range(&range)
.send()
.await
.map_err(|e| format!("failed to get object {} range {}: {}", key, range, e))?;
let body = get_resp
.body
.collect()
.await
.map_err(|e| format!("failed to read body: {}", e))?;
let bytes = body.into_bytes();
// Write at the correct offset (like Go's WriteAt)
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.open(&dp)
.await
.map_err(|e| format!("failed to open dest file {}: {}", dp, e))?;
file.seek(std::io::SeekFrom::Start(off))
.await
.map_err(|e| format!("failed to seek to offset {}: {}", off, e))?;
file.write_all(&bytes)
.await
.map_err(|e| format!("failed to write to {}: {}", dp, e))?;
// Report progress
{
let mut guard = progress.lock().unwrap();
guard.0 += bytes.len() as u64;
let downloaded = guard.0;
let pct = if file_size > 0 {
(downloaded as f32 * 100.0) / file_size as f32
} else {
100.0
};
(guard.1)(downloaded as i64, pct);
}
Ok::<_, String>(())
}));
} }
use tokio::io::AsyncWriteExt;
file.flush()
.await
.map_err(|e| format!("failed to flush {}: {}", dest_path, e))?;
// Wait for all download tasks
for handle in handles {
handle
.await
.map_err(|e| format!("download task panicked: {}", e))??;
}
Ok(file_size) Ok(file_size)
} }

Loading…
Cancel
Save