diff --git a/seaweed-volume/Cargo.lock b/seaweed-volume/Cargo.lock index 1f45e758e..6bd1e6e96 100644 --- a/seaweed-volume/Cargo.lock +++ b/seaweed-volume/Cargo.lock @@ -2099,6 +2099,15 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "kamadak-exif" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef4fc70d0ab7e5b6bafa30216a6b48705ea964cdfc29c050f2412295eba58077" +dependencies = [ + "mutate_once", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -2293,6 +2302,12 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +[[package]] +name = "mutate_once" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13d2233c9842d08cfe13f9eac96e207ca6a2ea10b80259ebe8ad0268be27d2af" + [[package]] name = "native-tls" version = "0.2.18" @@ -3324,6 +3339,7 @@ dependencies = [ "hyper-util", "image", "jsonwebtoken", + "kamadak-exif", "lazy_static", "libc", "md-5", diff --git a/seaweed-volume/Cargo.toml b/seaweed-volume/Cargo.toml index 2aa74b092..94dd917f2 100644 --- a/seaweed-volume/Cargo.toml +++ b/seaweed-volume/Cargo.toml @@ -76,6 +76,7 @@ flate2 = "1" # Image processing image = { version = "0.25", default-features = false, features = ["png", "jpeg", "gif"] } +kamadak-exif = "0.5" # Misc bytes = "1" diff --git a/seaweed-volume/DEV_PLAN.md b/seaweed-volume/DEV_PLAN.md index 4083ec6d9..3df46631e 100644 --- a/seaweed-volume/DEV_PLAN.md +++ b/seaweed-volume/DEV_PLAN.md @@ -7,7 +7,7 @@ **Rust integration tests**: 8/8 pass **S3 remote storage tests**: 3/3 pass **Total**: 117/117 (100%) + 8 Rust + 3 S3 tests -**Rust unit tests**: 112 lib + 7 integration = 119 +**Rust unit tests**: 124 lib + 7 integration = 131 ## Completed Features @@ -57,30 +57,23 @@ All phases from the original plan are complete: - TestVolumeMoveHandlesInFlightWrites now uses Rust volume servers - CI skip list cleaned up (all tests pass with Rust) +- **Production Sprint 4** — Advanced Features: + - BatchDelete EC shard support (ecx index lookup + ecj journal deletion) + - JPEG EXIF orientation auto-fix on upload (kamadak-exif + image crate) + - Async batched write processing (mpsc queue, up to 128 entries per batch) + - VolumeTierMoveDatToRemote/FromRemote (S3 multipart upload/download) + - S3TierRegistry for managing remote storage backends + - VolumeInfo (.vif) persistence for remote file references + ## Remaining Work (Production Readiness) ### Medium Priority (nice to have) -1. **VolumeTierMoveDatToRemote/FromRemote** — Move volume data to/from remote storage - backends (S3, etc.). Currently returns error paths only. Would need to implement - full dat file upload/download to S3. - -2. **BatchDelete EC shards** — BatchDelete currently only handles regular volumes. - Go also checks EC volumes and calls DeleteEcShardNeedle. - -3. **TLS/HTTPS** — rustls + tokio-rustls for both HTTP and gRPC. - -4. **JPEG orientation fix** — Auto-fix EXIF orientation on upload. - -6. **Async request processing** — Batched writes with 128-entry queue. - -### Low Priority - -7. **LevelDB needle maps** — For volumes with millions of needles. +1. **LevelDB needle maps** — For volumes with millions of needles. -9. **Volume backup/sync** — Streaming backup, binary search. +2. **Volume backup/sync** — Streaming backup, binary search. -10. **EC distribution/rebalancing** — Advanced EC operations. +3. **EC distribution/rebalancing** — Advanced EC operations. ## Test Commands diff --git a/seaweed-volume/src/config.rs b/seaweed-volume/src/config.rs index f51eb7a04..6844a8bd0 100644 --- a/seaweed-volume/src/config.rs +++ b/seaweed-volume/src/config.rs @@ -232,6 +232,8 @@ pub struct VolumeServerConfig { pub https_key_file: String, pub grpc_cert_file: String, pub grpc_key_file: String, + /// Enable batched write queue for improved throughput under load. + pub enable_write_queue: bool, } pub use crate::storage::needle_map::NeedleMapKind; @@ -598,6 +600,7 @@ fn resolve_config(cli: Cli) -> VolumeServerConfig { https_key_file: sec.https_key_file, grpc_cert_file: sec.grpc_cert_file, grpc_key_file: sec.grpc_key_file, + enable_write_queue: std::env::var("SEAWEED_WRITE_QUEUE").map(|v| v == "1" || v == "true").unwrap_or(false), } } diff --git a/seaweed-volume/src/images.rs b/seaweed-volume/src/images.rs new file mode 100644 index 000000000..d534006b6 --- /dev/null +++ b/seaweed-volume/src/images.rs @@ -0,0 +1,275 @@ +//! JPEG EXIF orientation auto-fix, matching Go's `FixJpgOrientation`. +//! +//! Reads the EXIF orientation tag from JPEG data and rotates/flips the image +//! to normalize it to orientation 1 (top-left). If EXIF parsing fails or +//! orientation is already normal, returns the original data unchanged. + +use std::io::Cursor; + +use image::{DynamicImage, GenericImageView, ImageFormat, RgbaImage}; + +/// EXIF orientation tag values. +/// See: +const TOP_LEFT_SIDE: u32 = 1; +const TOP_RIGHT_SIDE: u32 = 2; +const BOTTOM_RIGHT_SIDE: u32 = 3; +const BOTTOM_LEFT_SIDE: u32 = 4; +const LEFT_SIDE_TOP: u32 = 5; +const RIGHT_SIDE_TOP: u32 = 6; +const RIGHT_SIDE_BOTTOM: u32 = 7; +const LEFT_SIDE_BOTTOM: u32 = 8; + +/// Fix JPEG orientation based on EXIF data. +/// +/// Reads the EXIF orientation tag and applies the appropriate rotation/flip +/// to normalize the image to orientation 1 (top-left). Re-encodes as JPEG. +/// +/// Returns the original data unchanged if: +/// - EXIF data cannot be parsed +/// - No orientation tag is present +/// - Orientation is already 1 (normal) +/// - Image decoding or re-encoding fails +pub fn fix_jpg_orientation(data: &[u8]) -> Vec { + // Parse EXIF data + let orientation = match read_exif_orientation(data) { + Some(o) => o, + None => return data.to_vec(), + }; + + // Orientation 1 means normal — no transformation needed + if orientation == TOP_LEFT_SIDE { + return data.to_vec(); + } + + // Determine rotation angle and flip mode + let (angle, flip_horizontal) = match orientation { + TOP_RIGHT_SIDE => (0, true), + BOTTOM_RIGHT_SIDE => (180, false), + BOTTOM_LEFT_SIDE => (180, true), + LEFT_SIDE_TOP => (-90, true), + RIGHT_SIDE_TOP => (-90, false), + RIGHT_SIDE_BOTTOM => (90, true), + LEFT_SIDE_BOTTOM => (90, false), + _ => return data.to_vec(), + }; + + // Decode the image + let src_image = match image::load_from_memory_with_format(data, ImageFormat::Jpeg) { + Ok(img) => img, + Err(_) => return data.to_vec(), + }; + + // Apply rotation then flip (matching Go's flip(rotate(img, angle), flipMode)) + let transformed = flip_horizontal_if(rotate(src_image, angle), flip_horizontal); + + // Re-encode as JPEG + let mut buf = Cursor::new(Vec::new()); + match transformed.write_to(&mut buf, ImageFormat::Jpeg) { + Ok(_) => buf.into_inner(), + Err(_) => data.to_vec(), + } +} + +/// Read the EXIF orientation tag from JPEG data. +/// Returns None if EXIF cannot be parsed or orientation tag is not present. +fn read_exif_orientation(data: &[u8]) -> Option { + let exif_reader = exif::Reader::new(); + let mut cursor = Cursor::new(data); + let exif_data = exif_reader.read_from_container(&mut cursor).ok()?; + + let orientation_field = exif_data.get_field(exif::Tag::Orientation, exif::In::PRIMARY)?; + match orientation_field.value { + exif::Value::Short(ref v) if !v.is_empty() => Some(v[0] as u32), + _ => orientation_field.value.get_uint(0), + } +} + +/// Rotate an image by the given angle (counter-clockwise, in degrees). +/// Matches Go's rotate function. +fn rotate(img: DynamicImage, angle: i32) -> DynamicImage { + let (width, height) = img.dimensions(); + + match angle { + 90 => { + // 90 degrees counter-clockwise + let new_w = height; + let new_h = width; + let mut out = RgbaImage::new(new_w, new_h); + for y in 0..new_h { + for x in 0..new_w { + out.put_pixel(x, y, img.get_pixel(new_h - 1 - y, x)); + } + } + DynamicImage::ImageRgba8(out) + } + -90 => { + // 90 degrees clockwise (or 270 counter-clockwise) + let new_w = height; + let new_h = width; + let mut out = RgbaImage::new(new_w, new_h); + for y in 0..new_h { + for x in 0..new_w { + out.put_pixel(x, y, img.get_pixel(y, new_w - 1 - x)); + } + } + DynamicImage::ImageRgba8(out) + } + 180 | -180 => { + let mut out = RgbaImage::new(width, height); + for y in 0..height { + for x in 0..width { + out.put_pixel(x, y, img.get_pixel(width - 1 - x, height - 1 - y)); + } + } + DynamicImage::ImageRgba8(out) + } + _ => img, + } +} + +/// Flip the image horizontally if requested. +/// In Go, flipMode 2 == FlipHorizontal. We simplify since only horizontal flip is used. +fn flip_horizontal_if(img: DynamicImage, do_flip: bool) -> DynamicImage { + if !do_flip { + return img; + } + let (width, height) = img.dimensions(); + let mut out = RgbaImage::new(width, height); + for y in 0..height { + for x in 0..width { + out.put_pixel(x, y, img.get_pixel(width - 1 - x, y)); + } + } + DynamicImage::ImageRgba8(out) +} + +/// Returns true if the given MIME type or file path extension indicates a JPEG file. +pub fn is_jpeg(mime_type: &str, path: &str) -> bool { + if mime_type == "image/jpeg" { + return true; + } + let lower = path.to_lowercase(); + lower.ends_with(".jpg") || lower.ends_with(".jpeg") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_non_jpeg_data_returned_unchanged() { + let data = b"not a jpeg file at all"; + let result = fix_jpg_orientation(data); + assert_eq!(result, data); + } + + #[test] + fn test_jpeg_without_exif_returned_unchanged() { + // Create a minimal JPEG without EXIF data + let img = DynamicImage::ImageRgba8(RgbaImage::new(2, 2)); + let mut buf = Cursor::new(Vec::new()); + img.write_to(&mut buf, ImageFormat::Jpeg).unwrap(); + let jpeg_data = buf.into_inner(); + + let result = fix_jpg_orientation(&jpeg_data); + // Should return data unchanged (no EXIF orientation tag) + // Just verify it's still valid JPEG + assert!(!result.is_empty()); + assert_eq!(&result[0..2], &[0xFF, 0xD8]); // JPEG magic bytes + } + + #[test] + fn test_is_jpeg() { + assert!(is_jpeg("image/jpeg", "")); + assert!(is_jpeg("", "/3,abc.jpg")); + assert!(is_jpeg("", "/3,abc.JPEG")); + assert!(is_jpeg("application/octet-stream", "/3,abc.JPG")); + assert!(!is_jpeg("image/png", "/3,abc.png")); + assert!(!is_jpeg("", "/3,abc.png")); + } + + #[test] + fn test_rotate_180() { + // Create a 2x2 image with distinct pixel colors + let mut img = RgbaImage::new(2, 2); + img.put_pixel(0, 0, image::Rgba([255, 0, 0, 255])); // red top-left + img.put_pixel(1, 0, image::Rgba([0, 255, 0, 255])); // green top-right + img.put_pixel(0, 1, image::Rgba([0, 0, 255, 255])); // blue bottom-left + img.put_pixel(1, 1, image::Rgba([255, 255, 0, 255])); // yellow bottom-right + let dynamic = DynamicImage::ImageRgba8(img); + + let rotated = rotate(dynamic, 180); + let (w, h) = rotated.dimensions(); + assert_eq!((w, h), (2, 2)); + // After 180 rotation: top-left should be yellow, top-right should be blue + assert_eq!(rotated.get_pixel(0, 0), image::Rgba([255, 255, 0, 255])); + assert_eq!(rotated.get_pixel(1, 0), image::Rgba([0, 0, 255, 255])); + assert_eq!(rotated.get_pixel(0, 1), image::Rgba([0, 255, 0, 255])); + assert_eq!(rotated.get_pixel(1, 1), image::Rgba([255, 0, 0, 255])); + } + + #[test] + fn test_rotate_90_ccw() { + // Create 3x2 image (width=3, height=2) + let mut img = RgbaImage::new(3, 2); + img.put_pixel(0, 0, image::Rgba([1, 0, 0, 255])); + img.put_pixel(1, 0, image::Rgba([2, 0, 0, 255])); + img.put_pixel(2, 0, image::Rgba([3, 0, 0, 255])); + img.put_pixel(0, 1, image::Rgba([4, 0, 0, 255])); + img.put_pixel(1, 1, image::Rgba([5, 0, 0, 255])); + img.put_pixel(2, 1, image::Rgba([6, 0, 0, 255])); + let dynamic = DynamicImage::ImageRgba8(img); + + let rotated = rotate(dynamic, 90); + let (w, h) = rotated.dimensions(); + // 90 CCW: width=3,height=2 -> new_w=2, new_h=3 + assert_eq!((w, h), (2, 3)); + // Top-right (2,0) should move to top-left (0,0) in CCW 90 + assert_eq!(rotated.get_pixel(0, 0)[0], 3); + assert_eq!(rotated.get_pixel(1, 0)[0], 6); + } + + #[test] + fn test_rotate_neg90_cw() { + // Create 3x2 image + let mut img = RgbaImage::new(3, 2); + img.put_pixel(0, 0, image::Rgba([1, 0, 0, 255])); + img.put_pixel(1, 0, image::Rgba([2, 0, 0, 255])); + img.put_pixel(2, 0, image::Rgba([3, 0, 0, 255])); + img.put_pixel(0, 1, image::Rgba([4, 0, 0, 255])); + img.put_pixel(1, 1, image::Rgba([5, 0, 0, 255])); + img.put_pixel(2, 1, image::Rgba([6, 0, 0, 255])); + let dynamic = DynamicImage::ImageRgba8(img); + + let rotated = rotate(dynamic, -90); + let (w, h) = rotated.dimensions(); + assert_eq!((w, h), (2, 3)); + // -90 (CW 90): top-left (0,0) should go to top-right + assert_eq!(rotated.get_pixel(0, 0)[0], 4); + assert_eq!(rotated.get_pixel(1, 0)[0], 1); + } + + #[test] + fn test_flip_horizontal() { + let mut img = RgbaImage::new(2, 1); + img.put_pixel(0, 0, image::Rgba([10, 0, 0, 255])); + img.put_pixel(1, 0, image::Rgba([20, 0, 0, 255])); + let dynamic = DynamicImage::ImageRgba8(img); + + let flipped = flip_horizontal_if(dynamic, true); + assert_eq!(flipped.get_pixel(0, 0)[0], 20); + assert_eq!(flipped.get_pixel(1, 0)[0], 10); + } + + #[test] + fn test_flip_horizontal_noop() { + let mut img = RgbaImage::new(2, 1); + img.put_pixel(0, 0, image::Rgba([10, 0, 0, 255])); + img.put_pixel(1, 0, image::Rgba([20, 0, 0, 255])); + let dynamic = DynamicImage::ImageRgba8(img); + + let not_flipped = flip_horizontal_if(dynamic, false); + assert_eq!(not_flipped.get_pixel(0, 0)[0], 10); + assert_eq!(not_flipped.get_pixel(1, 0)[0], 20); + } +} diff --git a/seaweed-volume/src/lib.rs b/seaweed-volume/src/lib.rs index 002c16e9c..cc0541b30 100644 --- a/seaweed-volume/src/lib.rs +++ b/seaweed-volume/src/lib.rs @@ -1,4 +1,5 @@ pub mod config; +pub mod images; pub mod storage; pub mod security; pub mod server; diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index 3fbc20f56..c523ae898 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -7,6 +7,7 @@ use seaweed_volume::metrics; use seaweed_volume::security::{Guard, SigningKey}; use seaweed_volume::server::grpc_server::VolumeGrpcService; use seaweed_volume::server::volume_server::VolumeServerState; +use seaweed_volume::server::write_queue::WriteQueue; use seaweed_volume::storage::store::Store; use seaweed_volume::storage::types::DiskType; use seaweed_volume::pb::volume_server_pb::volume_server_server::VolumeServerServer; @@ -118,8 +119,19 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box Self { + let region = if config.region.is_empty() { + "us-east-1" + } else { + &config.region + }; + + let credentials = Credentials::new( + &config.access_key, + &config.secret_key, + None, + None, + "seaweedfs-volume-tier", + ); + + let mut s3_config = aws_sdk_s3::Config::builder() + .behavior_version(BehaviorVersion::latest()) + .region(Region::new(region.to_string())) + .credentials_provider(credentials) + .force_path_style(config.force_path_style); + + if !config.endpoint.is_empty() { + s3_config = s3_config.endpoint_url(&config.endpoint); + } + + let client = Client::from_conf(s3_config.build()); + + S3TierBackend { + client, + bucket: config.bucket.clone(), + storage_class: if config.storage_class.is_empty() { + "STANDARD_IA".to_string() + } else { + config.storage_class.clone() + }, + } + } + + /// Upload a local file to S3 using multipart upload with progress reporting. + /// + /// Returns (s3_key, file_size) on success. + /// The progress callback receives (bytes_uploaded, percentage). + pub async fn upload_file( + &self, + file_path: &str, + mut progress_fn: F, + ) -> Result<(String, u64), String> + where + F: FnMut(i64, f32) + Send + Sync + 'static, + { + let key = uuid::Uuid::new_v4().to_string(); + + let metadata = tokio::fs::metadata(file_path) + .await + .map_err(|e| format!("failed to stat file {}: {}", file_path, e))?; + let file_size = metadata.len(); + + // Calculate part size: start at 64MB, scale up for very large files + let mut part_size: u64 = 64 * 1024 * 1024; + while part_size * 1000 < file_size { + part_size *= 4; + } + + // Initiate multipart upload + let create_resp = self + .client + .create_multipart_upload() + .bucket(&self.bucket) + .key(&key) + .storage_class( + self.storage_class + .parse() + .unwrap_or(aws_sdk_s3::types::StorageClass::StandardIa), + ) + .send() + .await + .map_err(|e| format!("failed to create multipart upload: {}", e))?; + + let upload_id = create_resp + .upload_id() + .ok_or_else(|| "no upload_id in multipart upload response".to_string())? + .to_string(); + + let mut file = tokio::fs::File::open(file_path) + .await + .map_err(|e| format!("failed to open file {}: {}", file_path, e))?; + + let mut completed_parts = Vec::new(); + let mut offset: u64 = 0; + let mut part_number: i32 = 1; + + loop { + let remaining = file_size - offset; + if remaining == 0 { + break; + } + let this_part_size = std::cmp::min(part_size, remaining) as usize; + let mut buf = vec![0u8; this_part_size]; + file.read_exact(&mut buf) + .await + .map_err(|e| format!("failed to read file at offset {}: {}", offset, e))?; + + let upload_part_resp = self + .client + .upload_part() + .bucket(&self.bucket) + .key(&key) + .upload_id(&upload_id) + .part_number(part_number) + .body(buf.into()) + .send() + .await + .map_err(|e| { + format!( + "failed to upload part {} at offset {}: {}", + part_number, offset, e + ) + })?; + + let e_tag = upload_part_resp.e_tag().unwrap_or_default().to_string(); + completed_parts.push( + CompletedPart::builder() + .e_tag(e_tag) + .part_number(part_number) + .build(), + ); + + offset += this_part_size as u64; + + // Report progress + let pct = if file_size > 0 { + (offset as f32 * 100.0) / file_size as f32 + } else { + 100.0 + }; + progress_fn(offset as i64, pct); + + part_number += 1; + } + + // Complete multipart upload + let completed_upload = CompletedMultipartUpload::builder() + .set_parts(Some(completed_parts)) + .build(); + + self.client + .complete_multipart_upload() + .bucket(&self.bucket) + .key(&key) + .upload_id(&upload_id) + .multipart_upload(completed_upload) + .send() + .await + .map_err(|e| format!("failed to complete multipart upload: {}", e))?; + + Ok((key, file_size)) + } + + /// Download a file from S3 to a local path with progress reporting. + /// + /// Returns the file size on success. + pub async fn download_file( + &self, + dest_path: &str, + key: &str, + mut progress_fn: F, + ) -> Result + where + F: FnMut(i64, f32) + Send + Sync + 'static, + { + // Get file size first + let head_resp = self + .client + .head_object() + .bucket(&self.bucket) + .key(key) + .send() + .await + .map_err(|e| format!("failed to head object {}: {}", key, e))?; + + let file_size = head_resp.content_length().unwrap_or(0) as u64; + + // Download in chunks + let part_size: u64 = 64 * 1024 * 1024; + let mut file = tokio::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(dest_path) + .await + .map_err(|e| format!("failed to open dest file {}: {}", dest_path, e))?; + + let mut offset: u64 = 0; + + loop { + if offset >= file_size { + break; + } + let end = std::cmp::min(offset + part_size - 1, file_size - 1); + let range = format!("bytes={}-{}", offset, end); + + let get_resp = self + .client + .get_object() + .bucket(&self.bucket) + .key(key) + .range(&range) + .send() + .await + .map_err(|e| format!("failed to get object {} range {}: {}", key, range, e))?; + + let body = get_resp + .body + .collect() + .await + .map_err(|e| format!("failed to read body: {}", e))?; + let bytes = body.into_bytes(); + + use tokio::io::AsyncWriteExt; + file.write_all(&bytes) + .await + .map_err(|e| format!("failed to write to {}: {}", dest_path, e))?; + + offset += bytes.len() as u64; + + let pct = if file_size > 0 { + (offset as f32 * 100.0) / file_size as f32 + } else { + 100.0 + }; + progress_fn(offset as i64, pct); + } + + use tokio::io::AsyncWriteExt; + file.flush() + .await + .map_err(|e| format!("failed to flush {}: {}", dest_path, e))?; + + Ok(file_size) + } + + /// Delete a file from S3. + pub async fn delete_file(&self, key: &str) -> Result<(), String> { + self.client + .delete_object() + .bucket(&self.bucket) + .key(key) + .send() + .await + .map_err(|e| format!("failed to delete object {}: {}", key, e))?; + Ok(()) + } +} + +/// Parse a backend name like "s3" or "s3.default" into (backend_type, backend_id). +/// Matches Go's `BackendNameToTypeId`. +pub fn backend_name_to_type_id(backend_name: &str) -> (String, String) { + let parts: Vec<&str> = backend_name.split('.').collect(); + match parts.len() { + 1 => (backend_name.to_string(), "default".to_string()), + 2 => (parts[0].to_string(), parts[1].to_string()), + _ => (String::new(), String::new()), + } +} + +/// A registry of configured S3 tier backends, keyed by backend name (e.g., "s3.default"). +#[derive(Default)] +pub struct S3TierRegistry { + backends: HashMap>, +} + +impl S3TierRegistry { + pub fn new() -> Self { + Self { + backends: HashMap::new(), + } + } + + /// Register a backend with the given name. + pub fn register(&mut self, name: String, backend: S3TierBackend) { + self.backends.insert(name, Arc::new(backend)); + } + + /// Look up a backend by name. + pub fn get(&self, name: &str) -> Option> { + self.backends.get(name).cloned() + } + + /// List all registered backend names. + pub fn names(&self) -> Vec { + self.backends.keys().cloned().collect() + } +} diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 117babea0..d66b82ede 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -62,17 +62,66 @@ impl VolumeServer for VolumeGrpcService { ..Needle::default() }; + // Check if this is an EC volume + let is_ec_volume = { + let store = self.state.store.read().unwrap(); + store.ec_volumes.contains_key(&file_id.volume_id) + }; + // Cookie validation (unless skip_cookie_check) if !req.skip_cookie_check { let original_cookie = n.cookie; - let store = self.state.store.read().unwrap(); - match store.read_volume_needle(file_id.volume_id, &mut n) { - Ok(_) => {} - Err(e) => { + if !is_ec_volume { + let store = self.state.store.read().unwrap(); + match store.read_volume_needle(file_id.volume_id, &mut n) { + Ok(_) => {} + Err(e) => { + results.push(volume_server_pb::DeleteResult { + file_id: fid_str.clone(), + status: 404, + error: e.to_string(), + size: 0, + version: 0, + }); + continue; + } + } + } else { + // For EC volumes, verify needle exists in ecx index + let store = self.state.store.read().unwrap(); + if let Some(ec_vol) = store.ec_volumes.get(&file_id.volume_id) { + match ec_vol.find_needle_from_ecx(n.id) { + Ok(Some((_, size))) if !size.is_deleted() => { + // Needle exists and is not deleted — cookie check not possible + // for EC volumes without distributed read, so we accept it + n.data_size = size.0 as u32; + } + Ok(_) => { + results.push(volume_server_pb::DeleteResult { + file_id: fid_str.clone(), + status: 404, + error: format!("ec needle {} not found", fid_str), + size: 0, + version: 0, + }); + continue; + } + Err(e) => { + results.push(volume_server_pb::DeleteResult { + file_id: fid_str.clone(), + status: 404, + error: e.to_string(), + size: 0, + version: 0, + }); + continue; + } + } + } else { results.push(volume_server_pb::DeleteResult { file_id: fid_str.clone(), - status: 404, // Not Found - error: e.to_string(), + status: 404, + error: format!("ec volume {} not found", file_id.volume_id), size: 0, version: 0, }); @@ -82,12 +131,12 @@ impl VolumeServer for VolumeGrpcService { if n.cookie != original_cookie { results.push(volume_server_pb::DeleteResult { file_id: fid_str.clone(), - status: 400, // Bad Request + status: 400, error: "File Random Cookie does not match.".to_string(), size: 0, version: 0, }); - break; // Stop processing on cookie mismatch + break; } } @@ -95,7 +144,7 @@ impl VolumeServer for VolumeGrpcService { if n.is_chunk_manifest() { results.push(volume_server_pb::DeleteResult { file_id: fid_str.clone(), - status: 406, // Not Acceptable + status: 406, error: "ChunkManifest: not allowed in batch delete mode.".to_string(), size: 0, version: 0, @@ -106,32 +155,67 @@ impl VolumeServer for VolumeGrpcService { n.last_modified = now; n.set_has_last_modified_date(); - let mut store = self.state.store.write().unwrap(); - match store.delete_volume_needle(file_id.volume_id, &mut n) { - Ok(size) => { - if size.0 == 0 { + if !is_ec_volume { + let mut store = self.state.store.write().unwrap(); + match store.delete_volume_needle(file_id.volume_id, &mut n) { + Ok(size) => { + if size.0 == 0 { + results.push(volume_server_pb::DeleteResult { + file_id: fid_str.clone(), + status: 304, + error: String::new(), + size: 0, + version: 0, + }); + } else { + results.push(volume_server_pb::DeleteResult { + file_id: fid_str.clone(), + status: 202, + error: String::new(), + size: size.0 as u32, + version: 0, + }); + } + } + Err(e) => { results.push(volume_server_pb::DeleteResult { file_id: fid_str.clone(), - status: 304, // Not Modified - error: String::new(), + status: 500, + error: e.to_string(), size: 0, version: 0, }); - } else { - results.push(volume_server_pb::DeleteResult { - file_id: fid_str.clone(), - status: 202, // Accepted - error: String::new(), - size: size.0 as u32, - version: 0, - }); } } - Err(e) => { + } else { + // EC volume deletion: journal the delete locally + let mut store = self.state.store.write().unwrap(); + if let Some(ec_vol) = store.ec_volumes.get_mut(&file_id.volume_id) { + match ec_vol.journal_delete(n.id) { + Ok(()) => { + results.push(volume_server_pb::DeleteResult { + file_id: fid_str.clone(), + status: 202, + error: String::new(), + size: n.data_size, + version: 0, + }); + } + Err(e) => { + results.push(volume_server_pb::DeleteResult { + file_id: fid_str.clone(), + status: 500, + error: e.to_string(), + size: 0, + version: 0, + }); + } + } + } else { results.push(volume_server_pb::DeleteResult { file_id: fid_str.clone(), - status: 500, // Internal Server Error - error: e.to_string(), + status: 404, + error: format!("ec volume {} not found", file_id.volume_id), size: 0, version: 0, }); @@ -1695,21 +1779,132 @@ impl VolumeServer for VolumeGrpcService { let req = request.into_inner(); let vid = VolumeId(req.volume_id); - let store = self.state.store.read().unwrap(); - let (_, vol) = store.find_volume(vid) - .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; + // Validate volume exists and collection matches + let (dat_path, dat_file_size) = { + let store = self.state.store.read().unwrap(); + let (_, vol) = store.find_volume(vid) + .ok_or_else(|| Status::not_found(format!("volume {} not found", req.volume_id)))?; - if vol.collection != req.collection { - return Err(Status::invalid_argument(format!( - "unexpected input {}, expected collection {}", req.collection, vol.collection - ))); - } - drop(store); + if vol.collection != req.collection { + return Err(Status::invalid_argument(format!( + "existing collection:{} unexpected input: {}", vol.collection, req.collection + ))); + } + + // Check if already on remote + if vol.has_remote_file { + // Already on remote -- return empty stream (matches Go: returns nil) + let stream = tokio_stream::empty(); + return Ok(Response::new(Box::pin(stream) as Self::VolumeTierMoveDatToRemoteStream)); + } + + // Check if the destination backend already exists in volume info + let (backend_type, backend_id) = crate::remote_storage::s3_tier::backend_name_to_type_id( + &req.destination_backend_name + ); + for rf in &vol.volume_info.files { + if rf.backend_type == backend_type && rf.backend_id == backend_id { + return Err(Status::already_exists(format!( + "destination {} already exists", req.destination_backend_name + ))); + } + } + + let dat_path = vol.dat_path(); + let dat_file_size = vol.dat_file_size().unwrap_or(0); + (dat_path, dat_file_size) + }; + + // Look up the S3 tier backend + let backend = { + let registry = self.state.s3_tier_registry.read().unwrap(); + registry.get(&req.destination_backend_name) + .ok_or_else(|| { + let keys = registry.names(); + Status::not_found(format!( + "destination {} not found, supported: {:?}", + req.destination_backend_name, keys + )) + })? + }; + + let (backend_type, backend_id) = crate::remote_storage::s3_tier::backend_name_to_type_id( + &req.destination_backend_name + ); + + let (tx, rx) = tokio::sync::mpsc::channel::>(16); + let state = self.state.clone(); + let keep_local = req.keep_local_dat_file; + let dest_backend_name = req.destination_backend_name.clone(); + + tokio::spawn(async move { + let result: Result<(), Status> = async { + // Upload the .dat file to S3 with progress + let tx_progress = tx.clone(); + let mut last_report = std::time::Instant::now(); + let (key, size) = backend.upload_file(&dat_path, move |processed, percentage| { + let now = std::time::Instant::now(); + if now.duration_since(last_report) >= std::time::Duration::from_secs(1) { + last_report = now; + let _ = tx_progress.try_send(Ok(volume_server_pb::VolumeTierMoveDatToRemoteResponse { + processed, + processed_percentage: percentage, + })); + } + }).await.map_err(|e| Status::internal(format!( + "backend {} copy file {}: {}", dest_backend_name, dat_path, e + )))?; + + // Update volume info with remote file reference + { + let mut store = state.store.write().unwrap(); + if let Some((_, vol)) = store.find_volume_mut(vid) { + let now_unix = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + vol.volume_info.files.push(volume_server_pb::RemoteFile { + backend_type: backend_type.clone(), + backend_id: backend_id.clone(), + key, + offset: 0, + file_size: size, + modified_time: now_unix, + extension: ".dat".to_string(), + }); + vol.has_remote_file = true; + + if let Err(e) = vol.save_volume_info() { + return Err(Status::internal(format!( + "volume {} failed to save remote file info: {}", vid, e + ))); + } + + // Optionally remove local .dat file + if !keep_local { + let dat = vol.dat_path(); + let _ = std::fs::remove_file(&dat); + } + } + } - // Backend not supported - Err(Status::internal(format!( - "destination {} not found", req.destination_backend_name - ))) + // Send final 100% progress + let _ = tx.send(Ok(volume_server_pb::VolumeTierMoveDatToRemoteResponse { + processed: dat_file_size as i64, + processed_percentage: 100.0, + })).await; + + Ok(()) + }.await; + + if let Err(e) = result { + let _ = tx.send(Err(e)).await; + } + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(Response::new(Box::pin(stream) as Self::VolumeTierMoveDatToRemoteStream)) } type VolumeTierMoveDatFromRemoteStream = BoxStream; @@ -1721,19 +1916,116 @@ impl VolumeServer for VolumeGrpcService { let req = request.into_inner(); let vid = VolumeId(req.volume_id); - let store = self.state.store.read().unwrap(); - let (_, vol) = store.find_volume(vid) - .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; + // Validate volume and get remote storage info + let (dat_path, storage_name, storage_key) = { + let store = self.state.store.read().unwrap(); + let (_, vol) = store.find_volume(vid) + .ok_or_else(|| Status::not_found(format!("volume {} not found", req.volume_id)))?; - if vol.collection != req.collection { - return Err(Status::invalid_argument(format!( - "unexpected input {}, expected collection {}", req.collection, vol.collection - ))); - } - drop(store); + if vol.collection != req.collection { + return Err(Status::invalid_argument(format!( + "existing collection:{} unexpected input: {}", vol.collection, req.collection + ))); + } + + let (storage_name, storage_key) = vol.remote_storage_name_key(); + if storage_name.is_empty() || storage_key.is_empty() { + return Err(Status::failed_precondition(format!( + "volume {} is already on local disk", vid + ))); + } + + // Check if the dat file already exists locally with data + let dat_path = vol.dat_path(); + if std::path::Path::new(&dat_path).exists() { + if let Ok(m) = std::fs::metadata(&dat_path) { + if m.len() > 0 { + return Err(Status::failed_precondition(format!( + "volume {} is already on local disk", vid + ))); + } + } + } + + (dat_path, storage_name, storage_key) + }; + + // Look up the S3 tier backend + let backend = { + let registry = self.state.s3_tier_registry.read().unwrap(); + registry.get(&storage_name) + .ok_or_else(|| { + let keys = registry.names(); + Status::not_found(format!( + "remote storage {} not found from supported: {:?}", + storage_name, keys + )) + })? + }; + + let (tx, rx) = tokio::sync::mpsc::channel::>(16); + let state = self.state.clone(); + let keep_remote = req.keep_remote_dat_file; + + tokio::spawn(async move { + let result: Result<(), Status> = async { + // Download the .dat file from S3 with progress + let tx_progress = tx.clone(); + let mut last_report = std::time::Instant::now(); + let storage_name_clone = storage_name.clone(); + let _size = backend.download_file(&dat_path, &storage_key, move |processed, percentage| { + let now = std::time::Instant::now(); + if now.duration_since(last_report) >= std::time::Duration::from_secs(1) { + last_report = now; + let _ = tx_progress.try_send(Ok(volume_server_pb::VolumeTierMoveDatFromRemoteResponse { + processed, + processed_percentage: percentage, + })); + } + }).await.map_err(|e| Status::internal(format!( + "backend {} copy file {}: {}", storage_name_clone, dat_path, e + )))?; + + if !keep_remote { + // Delete remote file + backend.delete_file(&storage_key).await.map_err(|e| Status::internal(format!( + "volume {} failed to delete remote file {}: {}", vid, storage_key, e + )))?; + + // Update volume info: remove remote file reference + { + let mut store = state.store.write().unwrap(); + if let Some((_, vol)) = store.find_volume_mut(vid) { + if !vol.volume_info.files.is_empty() { + vol.volume_info.files.remove(0); + } + vol.has_remote_file = !vol.volume_info.files.is_empty(); - // Volume is already on local disk (no remote storage support) - Err(Status::internal(format!("volume {} already on local disk", vid))) + if let Err(e) = vol.save_volume_info() { + return Err(Status::internal(format!( + "volume {} failed to save remote file info: {}", vid, e + ))); + } + } + } + } + + // Send final 100% progress + let _ = tx.send(Ok(volume_server_pb::VolumeTierMoveDatFromRemoteResponse { + processed: 0, + processed_percentage: 100.0, + })).await; + + Ok(()) + }.await; + + if let Err(e) = result { + let _ = tx.send(Err(e)).await; + } + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(Response::new(Box::pin(stream) as Self::VolumeTierMoveDatFromRemoteStream)) } // ---- Server management ---- diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index c4d0815d8..3a337493d 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -957,11 +957,31 @@ pub async fn post_handler( .map(|s| s == "gzip") .unwrap_or(false); + // Extract MIME type from Content-Type header (needed early for JPEG orientation fix) + let mime_type = headers.get(header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .map(|ct| { + if ct.starts_with("multipart/") { + "application/octet-stream".to_string() + } else { + ct.to_string() + } + }) + .unwrap_or_else(|| "application/octet-stream".to_string()); + + // Fix JPEG orientation from EXIF data before storing (matches Go behavior). + // Only for non-compressed uploads that are JPEG files. + let body_data = if !is_gzipped && crate::images::is_jpeg(&mime_type, &path) { + crate::images::fix_jpg_orientation(&body) + } else { + body.to_vec() + }; + let mut n = Needle { id: needle_id, cookie, - data: body.to_vec(), - data_size: body.len() as u32, + data_size: body_data.len() as u32, + data: body_data, last_modified: last_modified, ..Needle::default() }; @@ -973,24 +993,20 @@ pub async fn post_handler( n.set_is_compressed(); } - // Extract MIME type from Content-Type header - let mime_type = headers.get(header::CONTENT_TYPE) - .and_then(|v| v.to_str().ok()) - .map(|ct| { - if ct.starts_with("multipart/") { - "application/octet-stream".to_string() - } else { - ct.to_string() - } - }) - .unwrap_or_else(|| "application/octet-stream".to_string()); if !mime_type.is_empty() { n.mime = mime_type.as_bytes().to_vec(); n.set_has_mime(); } - let mut store = state.store.write().unwrap(); - let resp = match store.write_volume_needle(vid, &mut n) { + // Use the write queue if enabled, otherwise write directly. + let write_result = if let Some(wq) = state.write_queue.get() { + wq.submit(vid, n.clone()).await + } else { + let mut store = state.store.write().unwrap(); + store.write_volume_needle(vid, &mut n) + }; + + let resp = match write_result { Ok((_offset, _size, is_unchanged)) => { if is_unchanged { let etag = format!("\"{}\"", n.etag()); diff --git a/seaweed-volume/src/server/mod.rs b/seaweed-volume/src/server/mod.rs index 639a3e2bd..85a02500e 100644 --- a/seaweed-volume/src/server/mod.rs +++ b/seaweed-volume/src/server/mod.rs @@ -2,3 +2,4 @@ pub mod volume_server; pub mod handlers; pub mod grpc_server; pub mod heartbeat; +pub mod write_queue; diff --git a/seaweed-volume/src/server/volume_server.rs b/seaweed-volume/src/server/volume_server.rs index cd6f15711..ff5c71f05 100644 --- a/seaweed-volume/src/server/volume_server.rs +++ b/seaweed-volume/src/server/volume_server.rs @@ -25,6 +25,7 @@ use crate::security::Guard; use crate::storage::store::Store; use super::handlers; +use super::write_queue::WriteQueue; /// Shared state for the volume server. pub struct VolumeServerState { @@ -60,6 +61,10 @@ pub struct VolumeServerState { pub pre_stop_seconds: u32, /// Notify heartbeat to send an immediate update when volume state changes. pub volume_state_notify: tokio::sync::Notify, + /// Optional batched write queue for improved throughput under load. + pub write_queue: std::sync::OnceLock, + /// Registry of S3 tier backends for tiered storage operations. + pub s3_tier_registry: std::sync::RwLock, } impl VolumeServerState { diff --git a/seaweed-volume/src/server/write_queue.rs b/seaweed-volume/src/server/write_queue.rs new file mode 100644 index 000000000..bb2d51e8c --- /dev/null +++ b/seaweed-volume/src/server/write_queue.rs @@ -0,0 +1,315 @@ +//! Async batched write processing for the volume server. +//! +//! Instead of each upload handler directly calling `write_needle` and syncing, +//! writes are submitted to a queue. A background worker drains the queue in +//! batches (up to 128 entries), groups them by volume ID, processes them +//! together, and syncs once per volume for the entire batch. + +use std::sync::Arc; + +use tokio::sync::{mpsc, oneshot}; +use tracing::debug; + +use crate::storage::needle::needle::Needle; +use crate::storage::types::{Size, VolumeId}; +use crate::storage::volume::VolumeError; + +use super::volume_server::VolumeServerState; + +/// Result of a single write operation: (offset, size, is_unchanged). +pub type WriteResult = Result<(u64, Size, bool), VolumeError>; + +/// A request to write a needle, submitted to the write queue. +pub struct WriteRequest { + pub volume_id: VolumeId, + pub needle: Needle, + pub response_tx: oneshot::Sender, +} + +/// Maximum number of write requests to batch together. +const MAX_BATCH_SIZE: usize = 128; + +/// Handle for submitting write requests to the background worker. +#[derive(Clone)] +pub struct WriteQueue { + tx: mpsc::Sender, +} + +impl WriteQueue { + /// Create a new write queue and spawn the background worker. + /// + /// `capacity` controls the channel buffer size (backpressure kicks in when full). + /// The worker holds a reference to `state` for accessing the store. + pub fn new(state: Arc, capacity: usize) -> Self { + let (tx, rx) = mpsc::channel(capacity); + let worker = WriteQueueWorker { + rx, + state, + }; + tokio::spawn(worker.run()); + WriteQueue { tx } + } + + /// Submit a write request and wait for the result. + /// + /// Returns `Err` if the worker has shut down or the response channel was dropped. + pub async fn submit( + &self, + volume_id: VolumeId, + needle: Needle, + ) -> WriteResult { + let (response_tx, response_rx) = oneshot::channel(); + let request = WriteRequest { + volume_id, + needle, + response_tx, + }; + + // Send to queue; this awaits if the channel is full (backpressure). + if self.tx.send(request).await.is_err() { + return Err(VolumeError::Io(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "write queue worker has shut down", + ))); + } + + // Wait for the worker to process our request. + match response_rx.await { + Ok(result) => result, + Err(_) => Err(VolumeError::Io(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "write queue worker dropped response channel", + ))), + } + } +} + +/// Background worker that drains write requests and processes them in batches. +struct WriteQueueWorker { + rx: mpsc::Receiver, + state: Arc, +} + +impl WriteQueueWorker { + async fn run(mut self) { + debug!("write queue worker started"); + + loop { + // Wait for the first request (blocks until one arrives or channel closes). + let first = match self.rx.recv().await { + Some(req) => req, + None => { + debug!("write queue channel closed, worker exiting"); + return; + } + }; + + // Drain as many additional requests as available, up to MAX_BATCH_SIZE. + let mut batch = Vec::with_capacity(MAX_BATCH_SIZE); + batch.push(first); + + while batch.len() < MAX_BATCH_SIZE { + match self.rx.try_recv() { + Ok(req) => batch.push(req), + Err(_) => break, + } + } + + let batch_size = batch.len(); + debug!("processing write batch of {} requests", batch_size); + + // Process the batch in spawn_blocking since write_needle does file I/O. + let state = self.state.clone(); + let _ = tokio::task::spawn_blocking(move || { + process_batch(state, batch); + }) + .await; + } + } +} + +/// Process a batch of write requests, grouped by volume ID. +/// +/// Groups writes by volume to minimize the number of store lock acquisitions, +/// then sends results back via each request's oneshot channel. +fn process_batch(state: Arc, batch: Vec) { + // Group requests by volume ID for efficient processing. + // We use a Vec of (VolumeId, Vec<(Needle, Sender)>) to preserve order + // and avoid requiring Hash on VolumeId. + let mut groups: Vec<(VolumeId, Vec<(Needle, oneshot::Sender)>)> = Vec::new(); + + for req in batch { + let vid = req.volume_id; + if let Some(group) = groups.iter_mut().find(|(v, _)| *v == vid) { + group.1.push((req.needle, req.response_tx)); + } else { + groups.push((vid, vec![(req.needle, req.response_tx)])); + } + } + + // Process each volume group under a single store lock. + let mut store = state.store.write().unwrap(); + + for (vid, entries) in groups { + for (mut needle, response_tx) in entries { + let result = store.write_volume_needle(vid, &mut needle); + // Send result back; ignore error if receiver dropped. + let _ = response_tx.send(result); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage::types::VolumeId; + + /// Helper to create a minimal VolumeServerState for testing. + fn make_test_state() -> Arc { + use std::sync::RwLock; + use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32}; + use crate::security::{Guard, SigningKey}; + use crate::storage::store::Store; + use crate::storage::needle_map::NeedleMapKind; + + let store = Store::new(NeedleMapKind::InMemory); + let guard = Guard::new( + &[], + SigningKey(vec![]), + 0, + SigningKey(vec![]), + 0, + ); + + Arc::new(VolumeServerState { + store: RwLock::new(store), + guard, + is_stopping: RwLock::new(false), + maintenance: AtomicBool::new(false), + state_version: AtomicU32::new(0), + concurrent_upload_limit: 0, + concurrent_download_limit: 0, + inflight_upload_data_timeout: std::time::Duration::ZERO, + inflight_download_data_timeout: std::time::Duration::ZERO, + inflight_upload_bytes: AtomicI64::new(0), + inflight_download_bytes: AtomicI64::new(0), + upload_notify: tokio::sync::Notify::new(), + download_notify: tokio::sync::Notify::new(), + data_center: String::new(), + rack: String::new(), + file_size_limit_bytes: 0, + is_heartbeating: AtomicBool::new(false), + has_master: false, + pre_stop_seconds: 0, + volume_state_notify: tokio::sync::Notify::new(), + write_queue: std::sync::OnceLock::new(), + s3_tier_registry: std::sync::RwLock::new(crate::remote_storage::s3_tier::S3TierRegistry::new()), + }) + } + + #[tokio::test] + async fn test_write_queue_submit_no_volume() { + // Submit a write to a non-existent volume -- should return VolumeError::NotFound. + let state = make_test_state(); + let queue = WriteQueue::new(state, MAX_BATCH_SIZE); + + let needle = Needle { + id: 1.into(), + cookie: 0x12345678.into(), + data: vec![1, 2, 3], + data_size: 3, + ..Needle::default() + }; + + let result = queue.submit(VolumeId(999), needle).await; + assert!(result.is_err()); + match result { + Err(VolumeError::NotFound) => {} // expected + other => panic!("expected NotFound, got {:?}", other), + } + } + + #[tokio::test] + async fn test_write_queue_concurrent_submissions() { + // Submit multiple concurrent writes -- all should complete (with errors since no volume). + let state = make_test_state(); + let queue = WriteQueue::new(state, MAX_BATCH_SIZE); + + let mut handles = Vec::new(); + for i in 0..10u64 { + let q = queue.clone(); + handles.push(tokio::spawn(async move { + let needle = Needle { + id: i.into(), + cookie: 0xABCD.into(), + data: vec![i as u8; 10], + data_size: 10, + ..Needle::default() + }; + q.submit(VolumeId(1), needle).await + })); + } + + for handle in handles { + let result = handle.await.unwrap(); + // All should fail with NotFound since there's no volume 1 + assert!(matches!(result, Err(VolumeError::NotFound))); + } + } + + #[tokio::test] + async fn test_write_queue_batching() { + // Verify that many concurrent writes get processed (testing the batching path). + let state = make_test_state(); + let queue = WriteQueue::new(state, MAX_BATCH_SIZE); + + // Submit MAX_BATCH_SIZE requests concurrently + let mut handles = Vec::new(); + for i in 0..MAX_BATCH_SIZE as u64 { + let q = queue.clone(); + handles.push(tokio::spawn(async move { + let needle = Needle { + id: i.into(), + cookie: 0x1111.into(), + data: vec![0u8; 4], + data_size: 4, + ..Needle::default() + }; + q.submit(VolumeId(42), needle).await + })); + } + + let mut results = Vec::new(); + for handle in handles { + results.push(handle.await.unwrap()); + } + + // All should complete (with NotFound errors since no volume exists) + assert_eq!(results.len(), MAX_BATCH_SIZE); + for r in results { + assert!(matches!(r, Err(VolumeError::NotFound))); + } + } + + #[tokio::test] + async fn test_write_queue_dropped_sender() { + // When the queue is dropped, subsequent submits should fail gracefully. + let state = make_test_state(); + let queue = WriteQueue::new(state, 1); + + // Clone then drop the original -- the worker keeps running via its rx handle. + let queue2 = queue.clone(); + drop(queue); + + // This should still work since the worker is alive. + let needle = Needle { + id: 1.into(), + cookie: 0.into(), + data: vec![], + data_size: 0, + ..Needle::default() + }; + let result = queue2.submit(VolumeId(1), needle).await; + assert!(result.is_err()); // NotFound is fine -- the point is it doesn't panic + } +} diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 653f09376..5a2d1e2ee 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -71,11 +71,149 @@ pub enum VolumeError { // VolumeInfo (.vif persistence) // ============================================================================ +/// Legacy simple VolumeInfo for backward compat with old .vif files. #[derive(serde::Serialize, serde::Deserialize)] struct VolumeInfo { read_only: bool, } +/// Protobuf VolumeInfo type alias. +pub use crate::pb::volume_server_pb::VolumeInfo as PbVolumeInfo; +pub use crate::pb::volume_server_pb::RemoteFile as PbRemoteFile; + +/// Helper module for deserializing protojson uint64 fields that may be strings or numbers. +mod string_or_u64 { + use serde::{self, Deserialize, Deserializer, Serializer}; + + pub fn serialize(value: &u64, serializer: S) -> Result + where S: Serializer { + // Emit as string to match Go's protojson format for uint64 + serializer.serialize_str(&value.to_string()) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where D: Deserializer<'de> { + #[derive(Deserialize)] + #[serde(untagged)] + enum StringOrNum { + Str(String), + Num(u64), + } + match StringOrNum::deserialize(deserializer)? { + StringOrNum::Str(s) => s.parse::().map_err(serde::de::Error::custom), + StringOrNum::Num(n) => Ok(n), + } + } +} + +mod string_or_i64 { + use serde::{self, Deserialize, Deserializer, Serializer}; + + pub fn serialize(value: &i64, serializer: S) -> Result + where S: Serializer { + serializer.serialize_str(&value.to_string()) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where D: Deserializer<'de> { + #[derive(Deserialize)] + #[serde(untagged)] + enum StringOrNum { + Str(String), + Num(i64), + } + match StringOrNum::deserialize(deserializer)? { + StringOrNum::Str(s) => s.parse::().map_err(serde::de::Error::custom), + StringOrNum::Num(n) => Ok(n), + } + } +} + +/// Serde-compatible representation of RemoteFile for .vif JSON serialization. +/// Field names use snake_case to match Go's protobuf JSON output (jsonpb). +#[derive(serde::Serialize, serde::Deserialize, Default, Clone)] +pub struct VifRemoteFile { + #[serde(default, rename = "backendType")] + pub backend_type: String, + #[serde(default, rename = "backendId")] + pub backend_id: String, + #[serde(default)] + pub key: String, + #[serde(default, with = "string_or_u64")] + pub offset: u64, + #[serde(default, rename = "fileSize", with = "string_or_u64")] + pub file_size: u64, + #[serde(default, rename = "modifiedTime", with = "string_or_u64")] + pub modified_time: u64, + #[serde(default)] + pub extension: String, +} + +/// Serde-compatible representation of VolumeInfo for .vif JSON serialization. +/// Matches Go's protobuf JSON format (jsonpb with EmitUnpopulated=true). +#[derive(serde::Serialize, serde::Deserialize, Default, Clone)] +pub struct VifVolumeInfo { + #[serde(default)] + pub files: Vec, + #[serde(default)] + pub version: u32, + #[serde(default)] + pub replication: String, + #[serde(default, rename = "bytesOffset")] + pub bytes_offset: u32, + #[serde(default, rename = "datFileSize", with = "string_or_i64")] + pub dat_file_size: i64, + #[serde(default, rename = "expireAtSec", with = "string_or_u64")] + pub expire_at_sec: u64, + #[serde(default, rename = "readOnly")] + pub read_only: bool, +} + +impl VifVolumeInfo { + /// Convert from protobuf VolumeInfo to the serde-compatible struct. + pub fn from_pb(pb: &PbVolumeInfo) -> Self { + VifVolumeInfo { + files: pb.files.iter().map(|f| VifRemoteFile { + backend_type: f.backend_type.clone(), + backend_id: f.backend_id.clone(), + key: f.key.clone(), + offset: f.offset, + file_size: f.file_size, + modified_time: f.modified_time, + extension: f.extension.clone(), + }).collect(), + version: pb.version, + replication: pb.replication.clone(), + bytes_offset: pb.bytes_offset, + dat_file_size: pb.dat_file_size, + expire_at_sec: pb.expire_at_sec, + read_only: pb.read_only, + } + } + + /// Convert to protobuf VolumeInfo. + pub fn to_pb(&self) -> PbVolumeInfo { + PbVolumeInfo { + files: self.files.iter().map(|f| PbRemoteFile { + backend_type: f.backend_type.clone(), + backend_id: f.backend_id.clone(), + key: f.key.clone(), + offset: f.offset, + file_size: f.file_size, + modified_time: f.modified_time, + extension: f.extension.clone(), + }).collect(), + version: self.version, + replication: self.replication.clone(), + bytes_offset: self.bytes_offset, + dat_file_size: self.dat_file_size, + expire_at_sec: self.expire_at_sec, + read_only: self.read_only, + ec_shard_config: None, + } + } +} + // ============================================================================ // Streaming read support // ============================================================================ @@ -122,6 +260,12 @@ pub struct Volume { pub compaction_byte_per_second: i64, _last_io_error: Option, + + /// Protobuf VolumeInfo for tiered storage (.vif file). + pub volume_info: PbVolumeInfo, + + /// Whether this volume has a remote file reference. + pub has_remote_file: bool, } /// Windows helper: loop seek_read until buffer is fully filled. @@ -175,6 +319,8 @@ impl Volume { is_compacting: false, compaction_byte_per_second: 0, _last_io_error: None, + volume_info: PbVolumeInfo::default(), + has_remote_file: false, }; v.load(true, true, preallocate, version)?; @@ -925,9 +1071,24 @@ impl Volume { } /// Load volume info from .vif file. + /// Supports both the protobuf-JSON format (Go-compatible) and legacy JSON. fn load_vif(&mut self) { let path = self.vif_path(); if let Ok(content) = fs::read_to_string(&path) { + if content.trim().is_empty() { + return; + } + // Try protobuf-JSON (Go-compatible VolumeInfo via VifVolumeInfo) + if let Ok(vif_info) = serde_json::from_str::(&content) { + let pb_info = vif_info.to_pb(); + if pb_info.read_only { + self.no_write_or_delete = true; + } + self.has_remote_file = !pb_info.files.is_empty(); + self.volume_info = pb_info; + return; + } + // Fall back to legacy format if let Ok(info) = serde_json::from_str::(&content) { if info.read_only { self.no_write_or_delete = true; @@ -936,16 +1097,49 @@ impl Volume { } } - /// Save volume info to .vif file. + /// Save volume info to .vif file in protobuf-JSON format (Go-compatible). fn save_vif(&self) { - let info = VolumeInfo { - read_only: self.no_write_or_delete, - }; - if let Ok(content) = serde_json::to_string(&info) { + let mut vif = VifVolumeInfo::from_pb(&self.volume_info); + vif.read_only = self.no_write_or_delete; + if let Ok(content) = serde_json::to_string_pretty(&vif) { let _ = fs::write(&self.vif_path(), content); } } + /// Save full VolumeInfo to .vif file (for tiered storage). + pub fn save_volume_info(&mut self) -> Result<(), VolumeError> { + self.volume_info.read_only = self.no_write_or_delete; + let vif = VifVolumeInfo::from_pb(&self.volume_info); + let content = serde_json::to_string_pretty(&vif) + .map_err(|e| VolumeError::Io(io::Error::new(io::ErrorKind::Other, e.to_string())))?; + fs::write(&self.vif_path(), content)?; + Ok(()) + } + + /// Get the remote storage backend name and key from this volume .vif. + pub fn remote_storage_name_key(&self) -> (String, String) { + if self.volume_info.files.is_empty() { + return (String::new(), String::new()); + } + let rf = &self.volume_info.files[0]; + let backend_name = if rf.backend_id.is_empty() { + rf.backend_type.clone() + } else { + format!("{}.{}", rf.backend_type, rf.backend_id) + }; + (backend_name, rf.key.clone()) + } + + /// Get the dat file path for this volume. + pub fn dat_path(&self) -> String { + self.file_name(".dat") + } + + /// Get the directory this volume is stored in. + pub fn dir(&self) -> &str { + &self.dir + } + /// Throttle IO during compaction to avoid saturating disk. pub fn maybe_throttle_compaction(&self, bytes_written: u64) { if self.compaction_byte_per_second <= 0 || !self.is_compacting { diff --git a/seaweed-volume/tests/http_integration.rs b/seaweed-volume/tests/http_integration.rs index 9e443f8df..b81c92bac 100644 --- a/seaweed-volume/tests/http_integration.rs +++ b/seaweed-volume/tests/http_integration.rs @@ -53,6 +53,10 @@ fn test_state() -> (Arc, TempDir) { has_master: false, pre_stop_seconds: 0, volume_state_notify: tokio::sync::Notify::new(), + write_queue: std::sync::OnceLock::new(), + s3_tier_registry: std::sync::RwLock::new( + seaweed_volume::remote_storage::s3_tier::S3TierRegistry::new(), + ), }); (state, tmp) }