From f7ef9f2cd8f7c3df60643efe7af3ff040aa29690 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Mar 2026 02:55:12 -0800 Subject: [PATCH] Add image resize/crop support and master heartbeat registration - Image processing: add width/height/crop_x1/y1/x2/y2 query params to GET handler using the `image` crate. Fixes TestImageResizeAndCropReadVariants. - Master heartbeat: implement bidirectional streaming SendHeartbeat RPC client that registers the volume server with the master, sends periodic volume/EC shard information, handles leader changes and shutdown. - HTTP tests: 54/55 pass (98.2%), gRPC tests: 74/75 pass (98.7%) --- seaweed-volume/Cargo.lock | 105 ++++++++ seaweed-volume/Cargo.toml | 3 + seaweed-volume/src/main.rs | 30 +++ seaweed-volume/src/server/handlers.rs | 104 ++++++++ seaweed-volume/src/server/heartbeat.rs | 257 ++++++++++++++++++++ seaweed-volume/src/server/mod.rs | 1 + seaweed-volume/src/storage/disk_location.rs | 5 + seaweed-volume/src/storage/volume.rs | 4 + 8 files changed, 509 insertions(+) create mode 100644 seaweed-volume/src/server/heartbeat.rs diff --git a/seaweed-volume/Cargo.lock b/seaweed-volume/Cargo.lock index 0426bf47b..2878c97b3 100644 --- a/seaweed-volume/Cargo.lock +++ b/seaweed-volume/Cargo.lock @@ -261,6 +261,18 @@ version = "3.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" +[[package]] +name = "bytemuck" +version = "1.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8efb64bd706a16a1bdde310ae86b351e4d21550d98d056f22f8a7f7a2183fec" + +[[package]] +name = "byteorder-lite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" + [[package]] name = "bytes" version = "1.11.1" @@ -353,6 +365,12 @@ dependencies = [ "cc", ] +[[package]] +name = "color_quant" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b" + [[package]] name = "colorchoice" version = "1.0.4" @@ -687,6 +705,15 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "fdeflate" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e6853b52649d4ac5c0bd02320cddc5ba956bdb407c4b75a2c6b75bf51500f8c" +dependencies = [ + "simd-adler32", +] + [[package]] name = "ff" version = "0.13.1" @@ -916,6 +943,16 @@ dependencies = [ "wasip3", ] +[[package]] +name = "gif" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5df2ba84018d80c213569363bdcd0c64e6933c67fe4c1d60ecf822971a3c35e" +dependencies = [ + "color_quant", + "weezl", +] + [[package]] name = "group" version = "0.13.0" @@ -1277,6 +1314,23 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "image" +version = "0.25.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6506c6c10786659413faa717ceebcb8f70731c0a60cbae39795fdf114519c1a" +dependencies = [ + "bytemuck", + "byteorder-lite", + "color_quant", + "gif", + "moxcms", + "num-traits", + "png", + "zune-core", + "zune-jpeg", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -1536,6 +1590,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "moxcms" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac9557c559cd6fc9867e122e20d2cbefc9ca29d80d027a8e39310920ed2f0a97" +dependencies = [ + "num-traits", + "pxfm", +] + [[package]] name = "multer" version = "3.1.0" @@ -1878,6 +1942,19 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "png" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60769b8b31b2a9f263dae2776c37b1b28ae246943cf719eb6946a1db05128a61" +dependencies = [ + "bitflags 2.11.0", + "crc32fast", + "fdeflate", + "flate2", + "miniz_oxide", +] + [[package]] name = "potential_utf" version = "0.1.4" @@ -2021,6 +2098,12 @@ dependencies = [ "prost", ] +[[package]] +name = "pxfm" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5a041e753da8b807c9255f28de81879c78c876392ff2469cde94799b2896b9d" + [[package]] name = "quinn" version = "0.11.9" @@ -2476,6 +2559,7 @@ dependencies = [ "http-body", "hyper", "hyper-util", + "image", "jsonwebtoken", "lazy_static", "md-5", @@ -3502,6 +3586,12 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "weezl" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28ac98ddc8b9274cb41bb4d9d4d5c425b6020c50c46f25559911905610b4a88" + [[package]] name = "winapi" version = "0.3.9" @@ -4023,3 +4113,18 @@ name = "zmij" version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" + +[[package]] +name = "zune-core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb8a0807f7c01457d0379ba880ba6322660448ddebc890ce29bb64da71fb40f9" + +[[package]] +name = "zune-jpeg" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "410e9ecef634c709e3831c2cfdb8d9c32164fae1c67496d5b68fff728eec37fe" +dependencies = [ + "zune-core", +] diff --git a/seaweed-volume/Cargo.toml b/seaweed-volume/Cargo.toml index a61f5c639..56773ca0e 100644 --- a/seaweed-volume/Cargo.toml +++ b/seaweed-volume/Cargo.toml @@ -74,6 +74,9 @@ base64 = "0.22" # Compression flate2 = "1" +# Image processing +image = { version = "0.25", default-features = false, features = ["png", "jpeg", "gif"] } + # Misc bytes = "1" rand = "0.8" diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index 0a0fd8aac..b46c900ec 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -168,6 +168,33 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box Result<(), Box, /// cm=false disables chunk manifest expansion (returns raw manifest JSON). pub cm: Option, + /// Image resize width + pub width: Option, + /// Image resize height + pub height: Option, + /// Image resize mode: "fit" or "fill" + pub mode: Option, + /// Image crop parameters + pub crop_x1: Option, + pub crop_y1: Option, + pub crop_x2: Option, + pub crop_y2: Option, } // ============================================================================ @@ -358,6 +369,13 @@ async fn get_or_head_handler_inner( } } + // Image crop and resize (only for supported image formats) + let ext = extract_extension_from_path(&path); + if is_image_ext(&ext) { + data = maybe_crop_image(&data, &ext, &query); + data = maybe_resize_image(&data, &ext, &query); + } + // Accept-Ranges response_headers.insert(header::ACCEPT_RANGES, "bytes".parse().unwrap()); @@ -510,6 +528,92 @@ fn extract_filename_from_path(path: &str) -> String { } } +// ============================================================================ +// Image processing helpers +// ============================================================================ + +fn is_image_ext(ext: &str) -> bool { + matches!(ext, ".png" | ".jpg" | ".jpeg" | ".gif") +} + +fn extract_extension_from_path(path: &str) -> String { + let parts: Vec<&str> = path.trim_start_matches('/').split('/').collect(); + if parts.len() >= 3 { + let filename = parts[2]; + if let Some(dot_pos) = filename.rfind('.') { + return filename[dot_pos..].to_lowercase(); + } + } + String::new() +} + +fn maybe_resize_image(data: &[u8], ext: &str, query: &ReadQueryParams) -> Vec { + let width = query.width.unwrap_or(0); + let height = query.height.unwrap_or(0); + if width == 0 && height == 0 { + return data.to_vec(); + } + + let img = match image::load_from_memory(data) { + Ok(img) => img, + Err(_) => return data.to_vec(), + }; + + let (src_w, src_h) = (img.width(), img.height()); + // Only resize if source is larger than target + if (width == 0 || src_w <= width) && (height == 0 || src_h <= height) { + return data.to_vec(); + } + + let mode = query.mode.as_deref().unwrap_or(""); + let resized = match mode { + "fit" => img.resize(width, height, image::imageops::FilterType::Lanczos3), + "fill" => img.resize_to_fill(width, height, image::imageops::FilterType::Lanczos3), + _ => { + if width > 0 && height > 0 && width == height && src_w != src_h { + img.resize_to_fill(width, height, image::imageops::FilterType::Lanczos3) + } else { + img.resize(width, height, image::imageops::FilterType::Lanczos3) + } + } + }; + + encode_image(&resized, ext).unwrap_or_else(|| data.to_vec()) +} + +fn maybe_crop_image(data: &[u8], ext: &str, query: &ReadQueryParams) -> Vec { + let (x1, y1, x2, y2) = match (query.crop_x1, query.crop_y1, query.crop_x2, query.crop_y2) { + (Some(x1), Some(y1), Some(x2), Some(y2)) if x2 > x1 && y2 > y1 => (x1, y1, x2, y2), + _ => return data.to_vec(), + }; + + let img = match image::load_from_memory(data) { + Ok(img) => img, + Err(_) => return data.to_vec(), + }; + + let (src_w, src_h) = (img.width(), img.height()); + if x2 > src_w || y2 > src_h { + return data.to_vec(); + } + + let cropped = img.crop_imm(x1, y1, x2 - x1, y2 - y1); + encode_image(&cropped, ext).unwrap_or_else(|| data.to_vec()) +} + +fn encode_image(img: &image::DynamicImage, ext: &str) -> Option> { + use std::io::Cursor; + let mut buf = Cursor::new(Vec::new()); + let format = match ext { + ".png" => image::ImageFormat::Png, + ".jpg" | ".jpeg" => image::ImageFormat::Jpeg, + ".gif" => image::ImageFormat::Gif, + _ => return None, + }; + img.write_to(&mut buf, format).ok()?; + Some(buf.into_inner()) +} + // ============================================================================ // Write Handler (POST/PUT) // ============================================================================ diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs new file mode 100644 index 000000000..6da72dd17 --- /dev/null +++ b/seaweed-volume/src/server/heartbeat.rs @@ -0,0 +1,257 @@ +//! Heartbeat client: registers the volume server with the master. +//! +//! Implements the bidirectional streaming `SendHeartbeat` RPC to the master, +//! matching Go's `server/volume_grpc_client_to_master.go`. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::broadcast; +use tonic::transport::Channel; +use tracing::{info, warn, error}; + +use crate::pb::master_pb; +use crate::pb::master_pb::seaweed_client::SeaweedClient; +use crate::storage::types::NeedleId; +use super::volume_server::VolumeServerState; + +/// Configuration for the heartbeat client. +pub struct HeartbeatConfig { + pub ip: String, + pub port: u16, + pub grpc_port: u16, + pub public_url: String, + pub data_center: String, + pub rack: String, + pub master_addresses: Vec, + pub pulse_seconds: u64, +} + +/// Run the heartbeat loop using VolumeServerState. +pub async fn run_heartbeat_with_state( + config: HeartbeatConfig, + state: Arc, + mut shutdown_rx: broadcast::Receiver<()>, +) { + info!("Starting heartbeat to master nodes: {:?}", config.master_addresses); + + let pulse = Duration::from_secs(config.pulse_seconds.max(1)); + + loop { + for master_addr in &config.master_addresses { + if shutdown_rx.try_recv().is_ok() { + info!("Heartbeat shutting down"); + return; + } + + let grpc_addr = to_grpc_address(master_addr); + info!("Connecting heartbeat to master {}", grpc_addr); + + match do_heartbeat(&config, &state, &grpc_addr, pulse, &mut shutdown_rx).await { + Ok(Some(leader)) => { + info!("Master leader changed to {}", leader); + } + Ok(None) => {} + Err(e) => { + warn!("Heartbeat to {} error: {}", grpc_addr, e); + } + } + } + + tokio::select! { + _ = tokio::time::sleep(pulse) => {} + _ = shutdown_rx.recv() => { + info!("Heartbeat shutting down"); + return; + } + } + } +} + +/// Convert a master address "host:port" to a gRPC endpoint URL. +/// The Go master uses port + 10000 for gRPC by default. +fn to_grpc_address(master_addr: &str) -> String { + if let Some((host, port_str)) = master_addr.rsplit_once(':') { + if let Ok(port) = port_str.parse::() { + let grpc_port = port + 10000; + return format!("http://{}:{}", host, grpc_port); + } + } + format!("http://{}", master_addr) +} + +/// Perform one heartbeat session with a master server. +async fn do_heartbeat( + config: &HeartbeatConfig, + state: &Arc, + grpc_addr: &str, + pulse: Duration, + shutdown_rx: &mut broadcast::Receiver<()>, +) -> Result, Box> { + let channel = Channel::from_shared(grpc_addr.to_string())? + .connect_timeout(Duration::from_secs(5)) + .timeout(Duration::from_secs(30)) + .connect() + .await?; + + let mut client = SeaweedClient::new(channel); + + let (tx, rx) = tokio::sync::mpsc::channel::(32); + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + let mut response_stream = client.send_heartbeat(stream).await?.into_inner(); + + // Send initial volume heartbeat + tx.send(collect_heartbeat(config, state)).await?; + + // Send initial EC shards heartbeat + tx.send(collect_ec_heartbeat(state)).await?; + + info!("Heartbeat stream established with {}", grpc_addr); + + let mut volume_tick = tokio::time::interval(pulse); + let mut ec_tick = tokio::time::interval(pulse * 17); + volume_tick.tick().await; + ec_tick.tick().await; + + loop { + tokio::select! { + resp = response_stream.message() => { + match resp { + Ok(Some(hb_resp)) => { + if hb_resp.volume_size_limit > 0 { + let s = state.store.read().unwrap(); + s.volume_size_limit.store( + hb_resp.volume_size_limit, + std::sync::atomic::Ordering::Relaxed, + ); + } + if !hb_resp.leader.is_empty() { + return Ok(Some(hb_resp.leader)); + } + if !hb_resp.duplicated_uuids.is_empty() { + error!("Master reported duplicate volume directory UUIDs: {:?}", hb_resp.duplicated_uuids); + } + } + Ok(None) => return Ok(None), + Err(e) => return Err(Box::new(e)), + } + } + + _ = volume_tick.tick() => { + if tx.send(collect_heartbeat(config, state)).await.is_err() { + return Ok(None); + } + } + + _ = ec_tick.tick() => { + if tx.send(collect_ec_heartbeat(state)).await.is_err() { + return Ok(None); + } + } + + _ = shutdown_rx.recv() => { + let empty = master_pb::Heartbeat { + ip: config.ip.clone(), + port: config.port as u32, + public_url: config.public_url.clone(), + max_file_key: 0, + data_center: config.data_center.clone(), + rack: config.rack.clone(), + has_no_volumes: true, + has_no_ec_shards: true, + grpc_port: config.grpc_port as u32, + ..Default::default() + }; + let _ = tx.send(empty).await; + tokio::time::sleep(Duration::from_millis(200)).await; + info!("Sent deregistration heartbeat"); + return Ok(None); + } + } + } +} + +/// Collect volume information into a Heartbeat message. +fn collect_heartbeat(config: &HeartbeatConfig, state: &Arc) -> master_pb::Heartbeat { + let store = state.store.read().unwrap(); + + let mut volumes = Vec::new(); + let mut max_file_key = NeedleId(0); + let mut max_volume_counts: HashMap = HashMap::new(); + + for loc in &store.locations { + let disk_type_str = loc.disk_type.to_string(); + let max_count = loc.max_volume_count.load(std::sync::atomic::Ordering::Relaxed); + *max_volume_counts.entry(disk_type_str).or_insert(0) += max_count as u32; + + for (_, vol) in loc.iter_volumes() { + let cur_max = vol.max_file_key(); + if cur_max > max_file_key { + max_file_key = cur_max; + } + + volumes.push(master_pb::VolumeInformationMessage { + id: vol.id.0, + size: vol.content_size(), + collection: vol.collection.clone(), + file_count: vol.file_count() as u64, + delete_count: vol.deleted_count() as u64, + deleted_byte_count: vol.deleted_size(), + read_only: vol.is_read_only(), + replica_placement: vol.super_block.replica_placement.to_byte() as u32, + version: vol.super_block.version.0 as u32, + ttl: vol.super_block.ttl.to_u32(), + compact_revision: vol.last_compact_revision() as u32, + modified_at_second: vol.last_modified_ts() as i64, + disk_type: loc.disk_type.to_string(), + ..Default::default() + }); + } + } + + master_pb::Heartbeat { + ip: config.ip.clone(), + port: config.port as u32, + public_url: config.public_url.clone(), + max_file_key: max_file_key.0, + data_center: config.data_center.clone(), + rack: config.rack.clone(), + admin_port: config.port as u32, + volumes, + has_no_volumes: false, + max_volume_counts, + grpc_port: config.grpc_port as u32, + ..Default::default() + } +} + +/// Collect EC shard information into a Heartbeat message. +fn collect_ec_heartbeat(state: &Arc) -> master_pb::Heartbeat { + let store = state.store.read().unwrap(); + + let mut ec_shards = Vec::new(); + for (vid, ec_vol) in &store.ec_volumes { + let mut ec_index_bits: u32 = 0; + for shard_opt in &ec_vol.shards { + if let Some(shard) = shard_opt { + ec_index_bits |= 1u32 << shard.shard_id; + } + } + if ec_index_bits > 0 { + ec_shards.push(master_pb::VolumeEcShardInformationMessage { + id: vid.0, + collection: ec_vol.collection.clone(), + ec_index_bits, + ..Default::default() + }); + } + } + + let has_no = ec_shards.is_empty(); + master_pb::Heartbeat { + ec_shards, + has_no_ec_shards: has_no, + ..Default::default() + } +} diff --git a/seaweed-volume/src/server/mod.rs b/seaweed-volume/src/server/mod.rs index da7e6d012..639a3e2bd 100644 --- a/seaweed-volume/src/server/mod.rs +++ b/seaweed-volume/src/server/mod.rs @@ -1,3 +1,4 @@ pub mod volume_server; pub mod handlers; pub mod grpc_server; +pub mod heartbeat; diff --git a/seaweed-volume/src/storage/disk_location.rs b/seaweed-volume/src/storage/disk_location.rs index 035d6a936..a4be6acdf 100644 --- a/seaweed-volume/src/storage/disk_location.rs +++ b/seaweed-volume/src/storage/disk_location.rs @@ -185,6 +185,11 @@ impl DiskLocation { ids } + /// Iterate over all volumes. + pub fn iter_volumes(&self) -> impl Iterator { + self.volumes.iter() + } + /// Number of free volume slots. pub fn free_volume_count(&self) -> i32 { let max = self.max_volume_count.load(Ordering::Relaxed); diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 5d2fbf7c3..0af6a8bf4 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -662,6 +662,10 @@ impl Volume { self.last_compact_revision } + pub fn last_modified_ts(&self) -> u64 { + self.last_modified_ts_seconds + } + /// Read all live needles from the volume (for ReadAllNeedles streaming RPC). pub fn read_all_needles(&self) -> Result, VolumeError> { let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;