From 66e3900dc2d21fa6c6a4f21233b3db11200f2a14 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Mar 2026 19:50:16 -0800 Subject: [PATCH] fix: wire slow read behavior into volume reads --- seaweed-volume/src/server/handlers.rs | 452 +++++++++++++++++++------- seaweed-volume/src/storage/volume.rs | 339 +++++++++++++++---- 2 files changed, 613 insertions(+), 178 deletions(-) diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index b7c884d15..a666cd1df 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -5,8 +5,8 @@ //! volume_server_handlers_admin.go. use std::future::Future; -use std::sync::Arc; use std::sync::atomic::Ordering; +use std::sync::Arc; use axum::body::Body; use axum::extract::{Query, State}; @@ -14,11 +14,11 @@ use axum::http::{header, HeaderMap, Method, Request, StatusCode}; use axum::response::{IntoResponse, Response}; use serde::{Deserialize, Serialize}; +use super::volume_server::VolumeServerState; use crate::config::ReadMode; use crate::metrics; use crate::storage::needle::needle::Needle; use crate::storage::types::*; -use super::volume_server::VolumeServerState; // ============================================================================ // Inflight Throttle Guard @@ -63,7 +63,9 @@ impl http_body::Body for TrackedBody { impl Drop for TrackedBody { fn drop(&mut self) { - self.state.inflight_download_bytes.fetch_sub(self.bytes, Ordering::Relaxed); + self.state + .inflight_download_bytes + .fetch_sub(self.bytes, Ordering::Relaxed); self.state.download_notify.notify_waiters(); } } @@ -75,8 +77,8 @@ impl Drop for TrackedBody { /// Threshold in bytes above which we stream needle data instead of buffering. const STREAMING_THRESHOLD: u32 = 1024 * 1024; // 1 MB -/// Chunk size for streaming reads from the dat file. -const STREAMING_CHUNK_SIZE: usize = 64 * 1024; // 64 KB +/// Default chunk size for streaming reads from the dat file. +const DEFAULT_STREAMING_CHUNK_SIZE: usize = 64 * 1024; // 64 KB /// A body that streams needle data from the dat file in chunks using pread, /// avoiding loading the entire payload into memory at once. @@ -85,6 +87,10 @@ struct StreamingBody { data_offset: u64, data_size: u32, pos: usize, + chunk_size: usize, + data_file_access_control: Arc, + hold_read_lock_for_stream: bool, + _held_read_lease: Option, /// Pending result from spawn_blocking, polled to completion. pending: Option>>, /// For download throttling — released on drop. @@ -111,12 +117,17 @@ impl http_body::Body for StreamingBody { Ok(Ok(chunk)) => { let len = chunk.len(); self.pos += len; - return std::task::Poll::Ready(Some(Ok(http_body::Frame::data(chunk)))); + return std::task::Poll::Ready(Some(Ok(http_body::Frame::data( + chunk, + )))); } Ok(Err(e)) => return std::task::Poll::Ready(Some(Err(e))), - Err(e) => return std::task::Poll::Ready(Some(Err( - std::io::Error::new(std::io::ErrorKind::Other, e), - ))), + Err(e) => { + return std::task::Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::Other, + e, + )))) + } } } } @@ -127,15 +138,22 @@ impl http_body::Body for StreamingBody { return std::task::Poll::Ready(None); } - let chunk_len = std::cmp::min(STREAMING_CHUNK_SIZE, total - self.pos); + let chunk_len = std::cmp::min(self.chunk_size, total - self.pos); let file_offset = self.data_offset + self.pos as u64; let file_clone = match self.dat_file.try_clone() { Ok(f) => f, Err(e) => return std::task::Poll::Ready(Some(Err(e))), }; + let data_file_access_control = self.data_file_access_control.clone(); + let hold_read_lock_for_stream = self.hold_read_lock_for_stream; let handle = tokio::task::spawn_blocking(move || { + let _lease = if hold_read_lock_for_stream { + None + } else { + Some(data_file_access_control.read_lock()) + }; let mut buf = vec![0u8; chunk_len]; #[cfg(unix)] { @@ -159,7 +177,8 @@ impl http_body::Body for StreamingBody { impl Drop for StreamingBody { fn drop(&mut self) { if let Some(ref st) = self.state { - st.inflight_download_bytes.fetch_sub(self.tracked_bytes, Ordering::Relaxed); + st.inflight_download_bytes + .fetch_sub(self.tracked_bytes, Ordering::Relaxed); st.download_notify.notify_waiters(); } } @@ -196,6 +215,13 @@ fn extract_file_id(path: &str) -> String { } } +fn streaming_chunk_size(read_buffer_size_bytes: usize, data_size: usize) -> usize { + std::cmp::min( + read_buffer_size_bytes.max(DEFAULT_STREAMING_CHUNK_SIZE), + data_size.max(1), + ) +} + fn parse_url_path(path: &str) -> Option<(VolumeId, NeedleId, Cookie)> { let path = path.trim_start_matches('/'); @@ -222,7 +248,8 @@ fn parse_url_path(path: &str) -> Option<(VolumeId, NeedleId, Cookie)> { }; let vid = VolumeId::parse(vid_str).ok()?; - let (needle_id, cookie) = crate::storage::needle::needle::parse_needle_id_cookie(fid_str).ok()?; + let (needle_id, cookie) = + crate::storage::needle::needle::parse_needle_id_cookie(fid_str).ok()?; Some((vid, needle_id, cookie)) } @@ -255,8 +282,15 @@ async fn lookup_volume( volume_id: u32, ) -> Result, String> { let url = format!("http://{}/dir/lookup?volumeId={}", master_url, volume_id); - let resp = client.get(&url).send().await.map_err(|e| format!("lookup request failed: {}", e))?; - let result: LookupResult = resp.json().await.map_err(|e| format!("lookup parse failed: {}", e))?; + let resp = client + .get(&url) + .send() + .await + .map_err(|e| format!("lookup request failed: {}", e))?; + let result: LookupResult = resp + .json() + .await + .map_err(|e| format!("lookup parse failed: {}", e))?; if let Some(err) = result.error { if !err.is_empty() { return Err(err); @@ -313,12 +347,8 @@ async fn proxy_or_redirect_to_target( let target = candidates[0]; match state.read_mode { - ReadMode::Proxy => { - proxy_request(state, &info, target).await - } - ReadMode::Redirect => { - redirect_request(&info, target) - } + ReadMode::Proxy => proxy_request(state, &info, target).await, + ReadMode::Redirect => redirect_request(&info, target), ReadMode::Local => unreachable!(), } } @@ -337,7 +367,10 @@ async fn proxy_request( let target_url = if info.original_query.is_empty() { format!("{}://{}/{}?proxied=true", scheme, target_host, path) } else { - format!("{}://{}/{}?{}&proxied=true", scheme, target_host, path, info.original_query) + format!( + "{}://{}/{}?{}&proxied=true", + scheme, target_host, path, info.original_query + ) }; // Build the proxy request @@ -359,7 +392,8 @@ async fn proxy_request( }; // Build response, copying headers and body from remote - let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + let status = + StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); let mut response_headers = HeaderMap::new(); for (name, value) in resp.headers() { if name.as_str().eq_ignore_ascii_case("server") { @@ -383,10 +417,7 @@ async fn proxy_request( } /// Return a redirect response to the target volume server. -fn redirect_request( - info: &ProxyRequestInfo, - target: &VolumeLocation, -) -> Response { +fn redirect_request(info: &ProxyRequestInfo, target: &VolumeLocation) -> Response { let scheme = "http"; let target_host = &target.public_url; @@ -405,12 +436,18 @@ fn redirect_request( query_params.push("proxied=true".to_string()); let query = query_params.join("&"); - let location = format!("{}://{}/{},{}?{}", scheme, target_host, &info.vid_str, &info.fid_str, query); + let location = format!( + "{}://{}/{},{}?{}", + scheme, target_host, &info.vid_str, &info.fid_str, query + ); Response::builder() .status(StatusCode::MOVED_PERMANENTLY) .header("Location", &location) - .body(Body::from(format!("Moved Permanently.\n\n", location))) + .body(Body::from(format!( + "Moved Permanently.\n\n", + location + ))) .unwrap_or_else(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response()) } @@ -468,7 +505,8 @@ pub async fn get_or_head_handler_from_request( let headers = request.headers().clone(); // Parse query params manually from URI - let query_params: ReadQueryParams = uri.query() + let query_params: ReadQueryParams = uri + .query() .and_then(|q| serde_urlencoded::from_str(q).ok()) .unwrap_or_default(); @@ -504,7 +542,10 @@ async fn get_or_head_handler_inner( // JWT check for reads let file_id = extract_file_id(&path); let token = extract_jwt(&headers, request.uri()); - if let Err(e) = state.guard.check_jwt_for_file(token.as_deref(), &file_id, false) { + if let Err(e) = state + .guard + .check_jwt_for_file(token.as_deref(), &file_id, false) + { return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response(); } @@ -574,7 +615,10 @@ async fn get_or_head_handler_inner( if current < state.concurrent_download_limit { break; } - if tokio::time::timeout_at(deadline, state.download_notify.notified()).await.is_err() { + if tokio::time::timeout_at(deadline, state.download_notify.notified()) + .await + .is_err() + { return (StatusCode::TOO_MANY_REQUESTS, "download limit exceeded").into_response(); } } @@ -595,8 +639,10 @@ async fn get_or_head_handler_inner( let has_range = headers.contains_key(header::RANGE); let ext = extract_extension_from_path(&path); let is_image = is_image_ext(&ext); - let has_image_ops = query.width.is_some() || query.height.is_some() - || query.crop_x1.is_some() || query.crop_y1.is_some(); + let has_image_ops = query.width.is_some() + || query.height.is_some() + || query.crop_x1.is_some() + || query.crop_y1.is_some(); // Try meta-only read first for potential streaming let store = state.store.read().unwrap(); @@ -610,7 +656,11 @@ async fn get_or_head_handler_inner( return StatusCode::NOT_FOUND.into_response(); } Err(e) => { - return (StatusCode::INTERNAL_SERVER_ERROR, format!("read error: {}", e)).into_response(); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("read error: {}", e), + ) + .into_response(); } }; drop(store); @@ -653,7 +703,11 @@ async fn get_or_head_handler_inner( return StatusCode::NOT_FOUND.into_response(); } Err(e) => { - return (StatusCode::INTERNAL_SERVER_ERROR, format!("read error: {}", e)).into_response(); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("read error: {}", e), + ) + .into_response(); } } drop(store); @@ -689,7 +743,9 @@ async fn get_or_head_handler_inner( if let Some(ims_header) = headers.get(header::IF_MODIFIED_SINCE) { if let Ok(ims_str) = ims_header.to_str() { // Parse HTTP date format: "Mon, 02 Jan 2006 15:04:05 GMT" - if let Ok(ims_time) = chrono::NaiveDateTime::parse_from_str(ims_str, "%a, %d %b %Y %H:%M:%S GMT") { + if let Ok(ims_time) = + chrono::NaiveDateTime::parse_from_str(ims_str, "%a, %d %b %Y %H:%M:%S GMT") + { if (n.last_modified as i64) <= ims_time.and_utc().timestamp() { return StatusCode::NOT_MODIFIED.into_response(); } @@ -771,7 +827,9 @@ async fn get_or_head_handler_inner( let tracked_bytes = info.data_size as i64; let tracking_state = if download_guard.is_some() { - state.inflight_download_bytes.fetch_add(tracked_bytes, Ordering::Relaxed); + state + .inflight_download_bytes + .fetch_add(tracked_bytes, Ordering::Relaxed); Some(state.clone()) } else { None @@ -782,6 +840,17 @@ async fn get_or_head_handler_inner( data_offset: info.data_file_offset, data_size: info.data_size, pos: 0, + chunk_size: streaming_chunk_size( + state.read_buffer_size_bytes, + info.data_size as usize, + ), + _held_read_lease: if state.has_slow_read { + None + } else { + Some(info.data_file_access_control.read_lock()) + }, + data_file_access_control: info.data_file_access_control, + hold_read_lock_for_stream: !state.has_slow_read, pending: None, state: tracking_state, tracked_bytes, @@ -801,7 +870,8 @@ async fn get_or_head_handler_inner( let is_compressed = n.is_compressed(); let mut data = n.data; if is_compressed { - let accept_encoding = headers.get(header::ACCEPT_ENCODING) + let accept_encoding = headers + .get(header::ACCEPT_ENCODING) .and_then(|v| v.to_str().ok()) .unwrap_or(""); if accept_encoding.contains("gzip") { @@ -835,7 +905,10 @@ async fn get_or_head_handler_inner( } if method == Method::HEAD { - response_headers.insert(header::CONTENT_LENGTH, data.len().to_string().parse().unwrap()); + response_headers.insert( + header::CONTENT_LENGTH, + data.len().to_string().parse().unwrap(), + ); return (StatusCode::OK, response_headers).into_response(); } @@ -846,7 +919,9 @@ async fn get_or_head_handler_inner( // If download throttling is active, wrap the body so we track when it's fully sent if download_guard.is_some() { let data_len = data.len() as i64; - state.inflight_download_bytes.fetch_add(data_len, Ordering::Relaxed); + state + .inflight_download_bytes + .fetch_add(data_len, Ordering::Relaxed); let tracked_body = TrackedBody { data, state: state.clone(), @@ -929,9 +1004,14 @@ fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) -> let slice = &data[start..=end]; headers.insert( "Content-Range", - format!("bytes {}-{}/{}", start, end, total).parse().unwrap(), + format!("bytes {}-{}/{}", start, end, total) + .parse() + .unwrap(), + ); + headers.insert( + header::CONTENT_LENGTH, + slice.len().to_string().parse().unwrap(), ); - headers.insert(header::CONTENT_LENGTH, slice.len().to_string().parse().unwrap()); (StatusCode::PARTIAL_CONTENT, headers, slice.to_vec()).into_response() } else { // Multi-range: build multipart/byteranges response @@ -945,9 +1025,7 @@ fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) -> let mut body = Vec::new(); for &(start, end) in &ranges { body.extend_from_slice(format!("\r\n--{}\r\n", boundary).as_bytes()); - body.extend_from_slice( - format!("Content-Type: {}\r\n", content_type).as_bytes(), - ); + body.extend_from_slice(format!("Content-Type: {}\r\n", content_type).as_bytes()); body.extend_from_slice( format!("Content-Range: bytes {}-{}/{}\r\n\r\n", start, end, total).as_bytes(), ); @@ -961,7 +1039,10 @@ fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) -> .parse() .unwrap(), ); - headers.insert(header::CONTENT_LENGTH, body.len().to_string().parse().unwrap()); + headers.insert( + header::CONTENT_LENGTH, + body.len().to_string().parse().unwrap(), + ); (StatusCode::PARTIAL_CONTENT, headers, body).into_response() } } @@ -1094,13 +1175,17 @@ pub async fn post_handler( // JWT check for writes let file_id = extract_file_id(&path); let token = extract_jwt(&headers, request.uri()); - if let Err(e) = state.guard.check_jwt_for_file(token.as_deref(), &file_id, true) { + if let Err(e) = state + .guard + .check_jwt_for_file(token.as_deref(), &file_id, true) + { return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response(); } // Upload throttling: check inflight bytes against limit let is_replicate = query.split('&').any(|p| p == "type=replicate"); - let content_length = headers.get(header::CONTENT_LENGTH) + let content_length = headers + .get(header::CONTENT_LENGTH) .and_then(|v| v.to_str().ok()) .and_then(|s| s.parse::().ok()) .unwrap_or(0); @@ -1120,11 +1205,16 @@ pub async fn post_handler( break; } // Wait for notification or timeout - if tokio::time::timeout_at(deadline, state.upload_notify.notified()).await.is_err() { + if tokio::time::timeout_at(deadline, state.upload_notify.notified()) + .await + .is_err() + { return (StatusCode::TOO_MANY_REQUESTS, "upload limit exceeded").into_response(); } } - state.inflight_upload_bytes.fetch_add(content_length, Ordering::Relaxed); + state + .inflight_upload_bytes + .fetch_add(content_length, Ordering::Relaxed); } // RAII guard to release upload throttle on any exit path @@ -1139,19 +1229,25 @@ pub async fn post_handler( }; // Check for chunk manifest flag - let is_chunk_manifest = query.split('&') - .any(|p| p == "cm=true" || p == "cm=1"); + let is_chunk_manifest = query.split('&').any(|p| p == "cm=true" || p == "cm=1"); // Validate multipart/form-data has a boundary if let Some(ct) = headers.get(header::CONTENT_TYPE) { if let Ok(ct_str) = ct.to_str() { if ct_str.starts_with("multipart/form-data") && !ct_str.contains("boundary=") { - return (StatusCode::BAD_REQUEST, "no multipart boundary param in Content-Type").into_response(); + return ( + StatusCode::BAD_REQUEST, + "no multipart boundary param in Content-Type", + ) + .into_response(); } } } - let content_md5 = headers.get("Content-MD5").and_then(|v| v.to_str().ok()).map(|s| s.to_string()); + let content_md5 = headers + .get("Content-MD5") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); // Read body let body = match axum::body::to_bytes(request.into_body(), usize::MAX).await { @@ -1166,13 +1262,20 @@ pub async fn post_handler( // Validate Content-MD5 if provided if let Some(ref expected_md5) = content_md5 { - use md5::{Md5, Digest}; use base64::Engine; + use md5::{Digest, Md5}; let mut hasher = Md5::new(); hasher.update(&body); let actual = base64::engine::general_purpose::STANDARD.encode(hasher.finalize()); if actual != *expected_md5 { - return (StatusCode::BAD_REQUEST, format!("Content-MD5 mismatch: expected {} got {}", expected_md5, actual)).into_response(); + return ( + StatusCode::BAD_REQUEST, + format!( + "Content-MD5 mismatch: expected {} got {}", + expected_md5, actual + ), + ) + .into_response(); } } @@ -1182,7 +1285,8 @@ pub async fn post_handler( .as_secs(); // Parse custom timestamp from query param - let ts_str = query.split('&') + let ts_str = query + .split('&') .find_map(|p| p.strip_prefix("ts=")) .unwrap_or(""); let last_modified = if !ts_str.is_empty() { @@ -1192,13 +1296,15 @@ pub async fn post_handler( }; // Check if upload is pre-compressed - let is_gzipped = headers.get(header::CONTENT_ENCODING) + let is_gzipped = headers + .get(header::CONTENT_ENCODING) .and_then(|v| v.to_str().ok()) .map(|s| s == "gzip") .unwrap_or(false); // Extract MIME type from Content-Type header (needed early for JPEG orientation fix) - let mime_type = headers.get(header::CONTENT_TYPE) + let mime_type = headers + .get(header::CONTENT_TYPE) .and_then(|v| v.to_str().ok()) .map(|ct| { if ct.starts_with("multipart/") { @@ -1210,7 +1316,8 @@ pub async fn post_handler( .unwrap_or_else(|| "application/octet-stream".to_string()); // Parse TTL from query param (matches Go's r.FormValue("ttl")) - let ttl_str = query.split('&') + let ttl_str = query + .split('&') .find_map(|p| p.strip_prefix("ttl=")) .unwrap_or(""); let ttl = if !ttl_str.is_empty() { @@ -1220,7 +1327,8 @@ pub async fn post_handler( }; // Extract Seaweed-* custom metadata headers (pairs) - let pair_map: std::collections::HashMap = headers.iter() + let pair_map: std::collections::HashMap = headers + .iter() .filter_map(|(k, v)| { let key = k.as_str(); if key.len() > 8 && key[..8].eq_ignore_ascii_case("seaweed-") { @@ -1249,7 +1357,9 @@ pub async fn post_handler( let (final_data, final_is_gzipped) = if !is_gzipped && !is_chunk_manifest { let ext = { let dot_pos = path.rfind('.'); - dot_pos.map(|p| path[p..].to_lowercase()).unwrap_or_default() + dot_pos + .map(|p| path[p..].to_lowercase()) + .unwrap_or_default() }; if is_compressible_file_type(&ext, &mime_type) { if let Some(compressed) = try_gzip_data(&body_data) { @@ -1347,9 +1457,11 @@ pub async fn post_handler( Err(crate::storage::volume::VolumeError::ReadOnly) => { (StatusCode::FORBIDDEN, "volume is read-only").into_response() } - Err(e) => { - (StatusCode::INTERNAL_SERVER_ERROR, format!("write error: {}", e)).into_response() - } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("write error: {}", e), + ) + .into_response(), }; // _upload_guard drops here, releasing inflight bytes @@ -1370,7 +1482,9 @@ pub async fn delete_handler( request: Request, ) -> Response { let start = std::time::Instant::now(); - metrics::REQUEST_COUNTER.with_label_values(&["delete"]).inc(); + metrics::REQUEST_COUNTER + .with_label_values(&["delete"]) + .inc(); let path = request.uri().path().to_string(); let headers = request.headers().clone(); @@ -1383,13 +1497,17 @@ pub async fn delete_handler( // JWT check for writes (deletes use write key) let file_id = extract_file_id(&path); let token = extract_jwt(&headers, request.uri()); - if let Err(e) = state.guard.check_jwt_for_file(token.as_deref(), &file_id, true) { + if let Err(e) = state + .guard + .check_jwt_for_file(token.as_deref(), &file_id, true) + { return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response(); } // Parse custom timestamp from query param let del_query = request.uri().query().unwrap_or(""); - let del_ts_str = del_query.split('&') + let del_ts_str = del_query + .split('&') .find_map(|p| p.strip_prefix("ts=")) .unwrap_or(""); let del_last_modified = if !del_ts_str.is_empty() { @@ -1417,7 +1535,11 @@ pub async fn delete_handler( } } if n.cookie != original_cookie { - return (StatusCode::BAD_REQUEST, "File Random Cookie does not match.").into_response(); + return ( + StatusCode::BAD_REQUEST, + "File Random Cookie does not match.", + ) + .into_response(); } // Apply custom timestamp if provided @@ -1448,7 +1570,11 @@ pub async fn delete_handler( let (chunk_vid, chunk_nid, chunk_cookie) = match parse_url_path(&chunk.fid) { Some(p) => p, None => { - return (StatusCode::INTERNAL_SERVER_ERROR, format!("invalid chunk fid: {}", chunk.fid)).into_response(); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("invalid chunk fid: {}", chunk.fid), + ) + .into_response(); } }; let mut chunk_needle = Needle { @@ -1460,25 +1586,39 @@ pub async fn delete_handler( { let store = state.store.read().unwrap(); if let Err(e) = store.read_volume_needle(chunk_vid, &mut chunk_needle) { - return (StatusCode::INTERNAL_SERVER_ERROR, format!("read chunk {}: {}", chunk.fid, e)).into_response(); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("read chunk {}: {}", chunk.fid, e), + ) + .into_response(); } } // Delete the chunk let mut store = state.store.write().unwrap(); if let Err(e) = store.delete_volume_needle(chunk_vid, &mut chunk_needle) { - return (StatusCode::INTERNAL_SERVER_ERROR, format!("delete chunk {}: {}", chunk.fid, e)).into_response(); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("delete chunk {}: {}", chunk.fid, e), + ) + .into_response(); } } // Delete the manifest itself let mut store = state.store.write().unwrap(); if let Err(e) = store.delete_volume_needle(vid, &mut n) { - return (StatusCode::INTERNAL_SERVER_ERROR, format!("delete manifest: {}", e)).into_response(); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("delete manifest: {}", e), + ) + .into_response(); } metrics::REQUEST_DURATION .with_label_values(&["delete"]) .observe(start.elapsed().as_secs_f64()); // Return the manifest's declared size (matches Go behavior) - let result = DeleteResult { size: manifest.size as i32 }; + let result = DeleteResult { + size: manifest.size as i32, + }; return (StatusCode::ACCEPTED, axum::Json(result)).into_response(); } } @@ -1500,9 +1640,11 @@ pub async fn delete_handler( let result = DeleteResult { size: 0 }; (StatusCode::NOT_FOUND, axum::Json(result)).into_response() } - Err(e) => { - (StatusCode::INTERNAL_SERVER_ERROR, format!("delete error: {}", e)).into_response() - } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("delete error: {}", e), + ) + .into_response(), } } @@ -1521,12 +1663,30 @@ pub async fn status_handler( for (_vid, vol) in loc.volumes() { let mut vol_info = serde_json::Map::new(); vol_info.insert("Id".to_string(), serde_json::Value::from(vol.id.0)); - vol_info.insert("Collection".to_string(), serde_json::Value::from(vol.collection.clone())); - vol_info.insert("Size".to_string(), serde_json::Value::from(vol.content_size())); - vol_info.insert("FileCount".to_string(), serde_json::Value::from(vol.file_count())); - vol_info.insert("DeleteCount".to_string(), serde_json::Value::from(vol.deleted_count())); - vol_info.insert("ReadOnly".to_string(), serde_json::Value::from(vol.is_read_only())); - vol_info.insert("Version".to_string(), serde_json::Value::from(vol.version().0)); + vol_info.insert( + "Collection".to_string(), + serde_json::Value::from(vol.collection.clone()), + ); + vol_info.insert( + "Size".to_string(), + serde_json::Value::from(vol.content_size()), + ); + vol_info.insert( + "FileCount".to_string(), + serde_json::Value::from(vol.file_count()), + ); + vol_info.insert( + "DeleteCount".to_string(), + serde_json::Value::from(vol.deleted_count()), + ); + vol_info.insert( + "ReadOnly".to_string(), + serde_json::Value::from(vol.is_read_only()), + ); + vol_info.insert( + "Version".to_string(), + serde_json::Value::from(vol.version().0), + ); volumes.push(serde_json::Value::Object(vol_info)); } } @@ -1539,15 +1699,24 @@ pub async fn status_handler( ds.insert("dir".to_string(), serde_json::Value::from(dir.clone())); // Add disk stats if available if let Ok(path) = std::path::Path::new(&dir).canonicalize() { - ds.insert("dir".to_string(), serde_json::Value::from(path.to_string_lossy().to_string())); + ds.insert( + "dir".to_string(), + serde_json::Value::from(path.to_string_lossy().to_string()), + ); } disk_statuses.push(serde_json::Value::Object(ds)); } let mut m = serde_json::Map::new(); - m.insert("Version".to_string(), serde_json::Value::from(env!("CARGO_PKG_VERSION"))); + m.insert( + "Version".to_string(), + serde_json::Value::from(env!("CARGO_PKG_VERSION")), + ); m.insert("Volumes".to_string(), serde_json::Value::Array(volumes)); - m.insert("DiskStatuses".to_string(), serde_json::Value::Array(disk_statuses)); + m.insert( + "DiskStatuses".to_string(), + serde_json::Value::Array(disk_statuses), + ); let json_value = serde_json::Value::Object(m); @@ -1576,9 +1745,7 @@ pub async fn status_handler( // Health Check Handler // ============================================================================ -pub async fn healthz_handler( - State(state): State>, -) -> Response { +pub async fn healthz_handler(State(state): State>) -> Response { let is_stopping = *state.is_stopping.read().unwrap(); if is_stopping { return (StatusCode::SERVICE_UNAVAILABLE, "stopping").into_response(); @@ -1598,7 +1765,10 @@ pub async fn metrics_handler() -> Response { let body = metrics::gather_metrics(); ( StatusCode::OK, - [(header::CONTENT_TYPE, "text/plain; version=0.0.4; charset=utf-8")], + [( + header::CONTENT_TYPE, + "text/plain; version=0.0.4; charset=utf-8", + )], body, ) .into_response() @@ -1626,12 +1796,15 @@ pub async fn stats_memory_handler() -> Response { "HeapReleased": 0, }, }); - (StatusCode::OK, [(header::CONTENT_TYPE, "application/json")], info.to_string()).into_response() + ( + StatusCode::OK, + [(header::CONTENT_TYPE, "application/json")], + info.to_string(), + ) + .into_response() } -pub async fn stats_disk_handler( - State(state): State>, -) -> Response { +pub async fn stats_disk_handler(State(state): State>) -> Response { let store = state.store.read().unwrap(); let mut ds = Vec::new(); for loc in &store.locations { @@ -1648,7 +1821,12 @@ pub async fn stats_disk_handler( "Version": env!("CARGO_PKG_VERSION"), "DiskStatuses": ds, }); - (StatusCode::OK, [(header::CONTENT_TYPE, "application/json")], info.to_string()).into_response() + ( + StatusCode::OK, + [(header::CONTENT_TYPE, "application/json")], + info.to_string(), + ) + .into_response() } // ============================================================================ @@ -1669,18 +1847,13 @@ pub async fn favicon_handler() -> Response { pub async fn static_asset_handler() -> Response { // Return a minimal valid PNG (1x1 transparent) let png: &[u8] = &[ - 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, 0x00, 0x00, 0x00, 0x0D, 0x49, 0x48, - 0x44, 0x52, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x06, 0x00, 0x00, - 0x00, 0x1F, 0x15, 0xC4, 0x89, 0x00, 0x00, 0x00, 0x0A, 0x49, 0x44, 0x41, 0x54, 0x78, - 0x9C, 0x62, 0x00, 0x00, 0x00, 0x02, 0x00, 0x01, 0xE5, 0x27, 0xDE, 0xFC, 0x00, 0x00, - 0x00, 0x00, 0x49, 0x45, 0x4E, 0x44, 0xAE, 0x42, 0x60, 0x82, + 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, 0x00, 0x00, 0x00, 0x0D, 0x49, 0x48, 0x44, + 0x52, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x06, 0x00, 0x00, 0x00, 0x1F, + 0x15, 0xC4, 0x89, 0x00, 0x00, 0x00, 0x0A, 0x49, 0x44, 0x41, 0x54, 0x78, 0x9C, 0x62, 0x00, + 0x00, 0x00, 0x02, 0x00, 0x01, 0xE5, 0x27, 0xDE, 0xFC, 0x00, 0x00, 0x00, 0x00, 0x49, 0x45, + 0x4E, 0x44, 0xAE, 0x42, 0x60, 0x82, ]; - ( - StatusCode::OK, - [(header::CONTENT_TYPE, "image/png")], - png, - ) - .into_response() + (StatusCode::OK, [(header::CONTENT_TYPE, "image/png")], png).into_response() } pub async fn ui_handler( @@ -1762,7 +1935,15 @@ fn try_expand_chunk_manifest( for chunk in &manifest.chunks { let (chunk_vid, chunk_nid, chunk_cookie) = match parse_url_path(&chunk.fid) { Some(p) => p, - None => return Some((StatusCode::INTERNAL_SERVER_ERROR, format!("invalid chunk fid: {}", chunk.fid)).into_response()), + None => { + return Some( + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("invalid chunk fid: {}", chunk.fid), + ) + .into_response(), + ) + } }; let mut chunk_needle = Needle { id: chunk_nid, @@ -1771,7 +1952,15 @@ fn try_expand_chunk_manifest( }; match store.read_volume_needle(chunk_vid, &mut chunk_needle) { Ok(_) => {} - Err(e) => return Some((StatusCode::INTERNAL_SERVER_ERROR, format!("read chunk {}: {}", chunk.fid, e)).into_response()), + Err(e) => { + return Some( + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("read chunk {}: {}", chunk.fid, e), + ) + .into_response(), + ) + } } let chunk_data = if chunk_needle.is_compressed() { use flate2::read::GzDecoder; @@ -1805,7 +1994,10 @@ fn try_expand_chunk_manifest( response_headers.insert("X-File-Store", "chunked".parse().unwrap()); if *method == Method::HEAD { - response_headers.insert(header::CONTENT_LENGTH, result.len().to_string().parse().unwrap()); + response_headers.insert( + header::CONTENT_LENGTH, + result.len().to_string().parse().unwrap(), + ); return Some((StatusCode::OK, response_headers).into_response()); } @@ -1886,10 +2078,18 @@ fn is_compressible_file_type(ext: &str, mtype: &str) -> bool { } // By MIME type if mtype.starts_with("application/") { - if mtype.ends_with("zstd") { return false; } - if mtype.ends_with("xml") { return true; } - if mtype.ends_with("script") { return true; } - if mtype.ends_with("vnd.rar") { return false; } + if mtype.ends_with("zstd") { + return false; + } + if mtype.ends_with("xml") { + return true; + } + if mtype.ends_with("script") { + return true; + } + if mtype.ends_with("vnd.rar") { + return false; + } } if mtype.starts_with("audio/") { let sub = mtype.strip_prefix("audio/").unwrap_or(""); @@ -1970,7 +2170,10 @@ mod tests { #[test] fn test_extract_jwt_query_over_header() { let mut headers = HeaderMap::new(); - headers.insert(header::AUTHORIZATION, "Bearer header_token".parse().unwrap()); + headers.insert( + header::AUTHORIZATION, + "Bearer header_token".parse().unwrap(), + ); let uri: axum::http::Uri = "/test?jwt=query_token".parse().unwrap(); assert_eq!(extract_jwt(&headers, &uri), Some("query_token".to_string())); } @@ -2034,7 +2237,10 @@ mod tests { assert!(!is_compressible_file_type("", "audio/mpeg")); // Unknown - assert!(!is_compressible_file_type(".xyz", "application/octet-stream")); + assert!(!is_compressible_file_type( + ".xyz", + "application/octet-stream" + )); } #[test] @@ -2054,4 +2260,20 @@ mod tests { decoder.read_to_end(&mut decompressed).unwrap(); assert_eq!(decompressed, data); } + + #[test] + fn test_streaming_chunk_size_respects_configured_read_buffer() { + assert_eq!( + streaming_chunk_size(4 * 1024 * 1024, 8 * 1024 * 1024), + 4 * 1024 * 1024 + ); + assert_eq!( + streaming_chunk_size(32 * 1024, 512 * 1024), + DEFAULT_STREAMING_CHUNK_SIZE + ); + assert_eq!( + streaming_chunk_size(8 * 1024 * 1024, 128 * 1024), + 128 * 1024 + ); + } } diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 855c3df8b..086c51130 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -12,13 +12,15 @@ use std::fs::{self, File, OpenOptions}; use std::io::{self, Read, Seek, SeekFrom, Write}; use std::path::Path; +use std::sync::Arc; +use std::sync::{Condvar, Mutex}; use std::time::{SystemTime, UNIX_EPOCH}; use tracing::warn; -use crate::storage::needle::needle::{self, Needle, NeedleError, get_actual_size}; +use crate::storage::needle::needle::{self, get_actual_size, Needle, NeedleError}; use crate::storage::needle_map::{CompactNeedleMap, NeedleMap, NeedleMapKind, RedbNeedleMap}; -use crate::storage::super_block::{SuperBlock, ReplicaPlacement, SUPER_BLOCK_SIZE}; +use crate::storage::super_block::{ReplicaPlacement, SuperBlock, SUPER_BLOCK_SIZE}; use crate::storage::types::*; // ============================================================================ @@ -77,22 +79,26 @@ struct VolumeInfo { read_only: bool, } +pub use crate::pb::volume_server_pb::RemoteFile as PbRemoteFile; /// Protobuf VolumeInfo type alias. pub use crate::pb::volume_server_pb::VolumeInfo as PbVolumeInfo; -pub use crate::pb::volume_server_pb::RemoteFile as PbRemoteFile; /// Helper module for deserializing protojson uint64 fields that may be strings or numbers. mod string_or_u64 { use serde::{self, Deserialize, Deserializer, Serializer}; pub fn serialize(value: &u64, serializer: S) -> Result - where S: Serializer { + where + S: Serializer, + { // Emit as string to match Go's protojson format for uint64 serializer.serialize_str(&value.to_string()) } pub fn deserialize<'de, D>(deserializer: D) -> Result - where D: Deserializer<'de> { + where + D: Deserializer<'de>, + { #[derive(Deserialize)] #[serde(untagged)] enum StringOrNum { @@ -110,12 +116,16 @@ mod string_or_i64 { use serde::{self, Deserialize, Deserializer, Serializer}; pub fn serialize(value: &i64, serializer: S) -> Result - where S: Serializer { + where + S: Serializer, + { serializer.serialize_str(&value.to_string()) } pub fn deserialize<'de, D>(deserializer: D) -> Result - where D: Deserializer<'de> { + where + D: Deserializer<'de>, + { #[derive(Deserialize)] #[serde(untagged)] enum StringOrNum { @@ -173,15 +183,19 @@ impl VifVolumeInfo { /// Convert from protobuf VolumeInfo to the serde-compatible struct. pub fn from_pb(pb: &PbVolumeInfo) -> Self { VifVolumeInfo { - files: pb.files.iter().map(|f| VifRemoteFile { - backend_type: f.backend_type.clone(), - backend_id: f.backend_id.clone(), - key: f.key.clone(), - offset: f.offset, - file_size: f.file_size, - modified_time: f.modified_time, - extension: f.extension.clone(), - }).collect(), + files: pb + .files + .iter() + .map(|f| VifRemoteFile { + backend_type: f.backend_type.clone(), + backend_id: f.backend_id.clone(), + key: f.key.clone(), + offset: f.offset, + file_size: f.file_size, + modified_time: f.modified_time, + extension: f.extension.clone(), + }) + .collect(), version: pb.version, replication: pb.replication.clone(), bytes_offset: pb.bytes_offset, @@ -194,15 +208,19 @@ impl VifVolumeInfo { /// Convert to protobuf VolumeInfo. pub fn to_pb(&self) -> PbVolumeInfo { PbVolumeInfo { - files: self.files.iter().map(|f| PbRemoteFile { - backend_type: f.backend_type.clone(), - backend_id: f.backend_id.clone(), - key: f.key.clone(), - offset: f.offset, - file_size: f.file_size, - modified_time: f.modified_time, - extension: f.extension.clone(), - }).collect(), + files: self + .files + .iter() + .map(|f| PbRemoteFile { + backend_type: f.backend_type.clone(), + backend_id: f.backend_id.clone(), + key: f.key.clone(), + offset: f.offset, + file_size: f.file_size, + modified_time: f.modified_time, + extension: f.extension.clone(), + }) + .collect(), version: self.version, replication: self.replication.clone(), bytes_offset: self.bytes_offset, @@ -218,6 +236,70 @@ impl VifVolumeInfo { // Streaming read support // ============================================================================ +#[derive(Default)] +struct DataFileAccessState { + readers: usize, + writer_active: bool, +} + +#[derive(Default)] +pub struct DataFileAccessControl { + state: Mutex, + condvar: Condvar, +} + +pub struct DataFileReadLease { + control: Arc, +} + +pub struct DataFileWriteLease { + control: Arc, +} + +impl DataFileAccessControl { + pub fn read_lock(self: &Arc) -> DataFileReadLease { + let mut state = self.state.lock().unwrap(); + while state.writer_active { + state = self.condvar.wait(state).unwrap(); + } + state.readers += 1; + drop(state); + DataFileReadLease { + control: self.clone(), + } + } + + pub fn write_lock(self: &Arc) -> DataFileWriteLease { + let mut state = self.state.lock().unwrap(); + while state.writer_active || state.readers > 0 { + state = self.condvar.wait(state).unwrap(); + } + state.writer_active = true; + drop(state); + DataFileWriteLease { + control: self.clone(), + } + } +} + +impl Drop for DataFileReadLease { + fn drop(&mut self) { + let mut state = self.control.state.lock().unwrap(); + state.readers -= 1; + if state.readers == 0 { + self.control.condvar.notify_all(); + } + } +} + +impl Drop for DataFileWriteLease { + fn drop(&mut self) { + let mut state = self.control.state.lock().unwrap(); + state.writer_active = false; + self.control.condvar.notify_all(); + } +} + /// Information needed to stream needle data directly from the dat file /// without loading the entire payload into memory. pub struct NeedleStreamInfo { @@ -227,6 +309,8 @@ pub struct NeedleStreamInfo { pub data_file_offset: u64, /// Size of the data payload in bytes. pub data_size: u32, + /// Per-volume file access lock used to match Go's slow-read behavior. + pub data_file_access_control: Arc, } // ============================================================================ @@ -242,6 +326,7 @@ pub struct Volume { dat_file: Option, nm: Option, needle_map_kind: NeedleMapKind, + data_file_access_control: Arc, pub super_block: SuperBlock, @@ -276,7 +361,10 @@ fn read_exact_at(file: &File, buf: &mut [u8], mut offset: u64) -> io::Result<()> while filled < buf.len() { let n = file.seek_read(&mut buf[filled..], offset)?; if n == 0 { - return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF in seek_read")); + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "unexpected EOF in seek_read", + )); } filled += n; offset += n as u64; @@ -305,6 +393,7 @@ impl Volume { dat_file: None, nm: None, needle_map_kind, + data_file_access_control: Arc::new(DataFileAccessControl::default()), super_block: SuperBlock { replica_placement: replica_placement.unwrap_or_default(), ttl: ttl.unwrap_or(crate::storage::needle::ttl::TTL::EMPTY), @@ -598,6 +687,7 @@ impl Volume { } pub fn read_needle_opt(&self, n: &mut Needle, read_deleted: bool) -> Result { + let _guard = self.data_file_access_control.read_lock(); let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?; @@ -618,7 +708,7 @@ impl Volume { return Ok(0); } - self.read_needle_data_at(n, nv.offset.to_actual_offset(), read_size)?; + self.read_needle_data_at_unlocked(n, nv.offset.to_actual_offset(), read_size)?; // TTL expiry check if n.has_ttl() { @@ -641,7 +731,22 @@ impl Volume { } /// Read needle data from .dat file at given offset. - pub fn read_needle_data_at(&self, n: &mut Needle, offset: i64, size: Size) -> Result<(), VolumeError> { + pub fn read_needle_data_at( + &self, + n: &mut Needle, + offset: i64, + size: Size, + ) -> Result<(), VolumeError> { + let _guard = self.data_file_access_control.read_lock(); + self.read_needle_data_at_unlocked(n, offset, size) + } + + fn read_needle_data_at_unlocked( + &self, + n: &mut Needle, + offset: i64, + size: Size, + ) -> Result<(), VolumeError> { let dat_file = self.dat_file.as_ref().ok_or_else(|| { VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open")) })?; @@ -671,6 +776,11 @@ impl Volume { /// Read raw needle blob at a specific offset. pub fn read_needle_blob(&self, offset: i64, size: Size) -> Result, VolumeError> { + let _guard = self.data_file_access_control.read_lock(); + self.read_needle_blob_unlocked(offset, size) + } + + fn read_needle_blob_unlocked(&self, offset: i64, size: Size) -> Result, VolumeError> { let dat_file = self.dat_file.as_ref().ok_or_else(|| { VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open")) })?; @@ -696,7 +806,12 @@ impl Volume { /// and return a `NeedleStreamInfo` that can be used to stream data directly from the dat file. /// /// This is used for large needles to avoid loading the entire payload into memory. - pub fn read_needle_stream_info(&self, n: &mut Needle, read_deleted: bool) -> Result { + pub fn read_needle_stream_info( + &self, + n: &mut Needle, + read_deleted: bool, + ) -> Result { + let _guard = self.data_file_access_control.read_lock(); let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?; @@ -770,13 +885,19 @@ impl Volume { dat_file: cloned_file, data_file_offset, data_size: n.data_size, + data_file_access_control: self.data_file_access_control.clone(), }) } // ---- Write ---- /// Write a needle to the volume (synchronous path). - pub fn write_needle(&mut self, n: &mut Needle, check_cookie: bool) -> Result<(u64, Size, bool), VolumeError> { + pub fn write_needle( + &mut self, + n: &mut Needle, + check_cookie: bool, + ) -> Result<(u64, Size, bool), VolumeError> { + let _guard = self.data_file_access_control.write_lock(); if self.no_write_or_delete { return Err(VolumeError::ReadOnly); } @@ -784,7 +905,11 @@ impl Volume { self.do_write_request(n, check_cookie) } - fn do_write_request(&mut self, n: &mut Needle, check_cookie: bool) -> Result<(u64, Size, bool), VolumeError> { + fn do_write_request( + &mut self, + n: &mut Needle, + check_cookie: bool, + ) -> Result<(u64, Size, bool), VolumeError> { // Ensure checksum is computed before dedup check if n.checksum == crate::storage::needle::crc::CRC(0) && !n.data.is_empty() { n.checksum = crate::storage::needle::crc::CRC::new(&n.data); @@ -801,7 +926,7 @@ impl Volume { if !nv.offset.is_zero() && nv.size.is_valid() { let mut existing = Needle::default(); // Read only the header to check cookie - self.read_needle_header(&mut existing, nv.offset.to_actual_offset())?; + self.read_needle_header_unlocked(&mut existing, nv.offset.to_actual_offset())?; if n.cookie.0 == 0 && !check_cookie { n.cookie = existing.cookie; @@ -843,7 +968,7 @@ impl Volume { Ok((offset, size, false)) } - fn read_needle_header(&self, n: &mut Needle, offset: i64) -> Result<(), VolumeError> { + fn read_needle_header_unlocked(&self, n: &mut Needle, offset: i64) -> Result<(), VolumeError> { let dat_file = self.dat_file.as_ref().ok_or_else(|| { VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open")) })?; @@ -873,7 +998,14 @@ impl Volume { if let Some(nv) = nm.get(n.id) { if !nv.offset.is_zero() && nv.size.is_valid() { let mut old = Needle::default(); - if self.read_needle_data_at(&mut old, nv.offset.to_actual_offset(), nv.size).is_ok() { + if self + .read_needle_data_at_unlocked( + &mut old, + nv.offset.to_actual_offset(), + nv.size, + ) + .is_ok() + { if old.cookie == n.cookie && old.checksum == n.checksum && old.data == n.data @@ -907,6 +1039,7 @@ impl Volume { /// Delete a needle from the volume. pub fn delete_needle(&mut self, n: &mut Needle) -> Result { + let _guard = self.data_file_access_control.write_lock(); if self.no_write_or_delete { return Err(VolumeError::ReadOnly); } @@ -992,7 +1125,8 @@ impl Volume { id: key, ..Needle::default() }; - if let Ok(()) = self.read_needle_data_at(&mut n, nv.offset.to_actual_offset(), nv.size) { + if let Ok(()) = self.read_needle_data_at(&mut n, nv.offset.to_actual_offset(), nv.size) + { needles.push(n); } } @@ -1046,7 +1180,10 @@ impl Volume { /// Scan raw needle entries from the .dat file starting at `from_offset`. /// Returns (needle_header_bytes, needle_body_bytes, append_at_ns) for each needle. /// Used by VolumeTailSender to stream raw bytes. - pub fn scan_raw_needles_from(&self, from_offset: u64) -> Result, Vec, u64)>, VolumeError> { + pub fn scan_raw_needles_from( + &self, + from_offset: u64, + ) -> Result, Vec, u64)>, VolumeError> { let dat_file = self.dat_file.as_ref().ok_or(VolumeError::NotFound)?; let version = self.super_block.version; let dat_size = dat_file.metadata()?.len(); @@ -1101,10 +1238,14 @@ impl Volume { } /// Insert or update a needle index entry (for low-level blob writes). - pub fn put_needle_index(&mut self, key: NeedleId, offset: Offset, size: Size) -> Result<(), VolumeError> { + pub fn put_needle_index( + &mut self, + key: NeedleId, + offset: Offset, + size: Size, + ) -> Result<(), VolumeError> { if let Some(ref mut nm) = self.nm { - nm.put(key, offset, size) - .map_err(VolumeError::Io)?; + nm.put(key, offset, size).map_err(VolumeError::Io)?; } Ok(()) } @@ -1203,7 +1344,8 @@ impl Volume { return; } // Simple throttle: sleep based on bytes written vs allowed rate - let sleep_us = (bytes_written as f64 / self.compaction_byte_per_second as f64 * 1_000_000.0) as u64; + let sleep_us = + (bytes_written as f64 / self.compaction_byte_per_second as f64 * 1_000_000.0) as u64; if sleep_us > 0 { std::thread::sleep(std::time::Duration::from_micros(sleep_us)); } @@ -1321,7 +1463,10 @@ impl Volume { /// Binary search through the .idx file to find the first needle /// with append_at_ns > since_ns. Returns (offset, is_last). /// Matches Go's BinarySearchByAppendAtNs in volume_backup.go. - pub fn binary_search_by_append_at_ns(&self, since_ns: u64) -> Result<(Offset, bool), VolumeError> { + pub fn binary_search_by_append_at_ns( + &self, + since_ns: u64, + ) -> Result<(Offset, bool), VolumeError> { let file_size = self.idx_file_size() as i64; if file_size % NEEDLE_MAP_ENTRY_SIZE as i64 != 0 { return Err(VolumeError::Io(io::Error::new( @@ -1380,7 +1525,11 @@ impl Volume { } /// Write a raw needle blob at a specific offset in the .dat file. - pub fn write_needle_blob(&mut self, offset: i64, needle_blob: &[u8]) -> Result<(), VolumeError> { + pub fn write_needle_blob( + &mut self, + offset: i64, + needle_blob: &[u8], + ) -> Result<(), VolumeError> { if self.no_write_or_delete { return Err(VolumeError::ReadOnly); } @@ -1417,7 +1566,8 @@ impl Volume { /// Get the modification time of the .dat file as Unix seconds. pub fn dat_file_mod_time(&self) -> u64 { - self.dat_file.as_ref() + self.dat_file + .as_ref() .and_then(|f| f.metadata().ok()) .and_then(|m| m.modified().ok()) .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) @@ -1781,17 +1931,48 @@ mod tests { fn make_test_volume(dir: &str) -> Volume { Volume::new( - dir, dir, "", VolumeId(1), + dir, + dir, + "", + VolumeId(1), NeedleMapKind::InMemory, - None, None, 0, + None, + None, + 0, Version::current(), - ).unwrap() + ) + .unwrap() + } + + #[test] + fn test_data_file_access_control_blocks_writer_until_reader_releases() { + let control = Arc::new(DataFileAccessControl::default()); + let read_lease = control.read_lock(); + let writer_control = control.clone(); + let acquired = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let acquired_clone = acquired.clone(); + + let writer = std::thread::spawn(move || { + let _write_lease = writer_control.write_lock(); + acquired_clone.store(true, std::sync::atomic::Ordering::Relaxed); + }); + + std::thread::sleep(std::time::Duration::from_millis(50)); + assert!(!acquired.load(std::sync::atomic::Ordering::Relaxed)); + + drop(read_lease); + writer.join().unwrap(); + + assert!(acquired.load(std::sync::atomic::Ordering::Relaxed)); } #[test] fn test_volume_file_name() { assert_eq!(volume_file_name("/data", "", VolumeId(1)), "/data/1"); - assert_eq!(volume_file_name("/data", "pics", VolumeId(42)), "/data/pics_42"); + assert_eq!( + volume_file_name("/data", "pics", VolumeId(42)), + "/data/pics_42" + ); } #[test] @@ -1831,7 +2012,10 @@ mod tests { assert_eq!(v.file_count(), 1); // Read it back - let mut read_n = Needle { id: NeedleId(1), ..Needle::default() }; + let mut read_n = Needle { + id: NeedleId(1), + ..Needle::default() + }; let count = v.read_needle(&mut read_n).unwrap(); assert_eq!(count, 11); assert_eq!(read_n.data, b"hello world"); @@ -1882,17 +2066,22 @@ mod tests { v.write_needle(&mut n, true).unwrap(); assert_eq!(v.file_count(), 1); - let deleted_size = v.delete_needle(&mut Needle { - id: NeedleId(1), - cookie: Cookie(0xbb), - ..Needle::default() - }).unwrap(); + let deleted_size = v + .delete_needle(&mut Needle { + id: NeedleId(1), + cookie: Cookie(0xbb), + ..Needle::default() + }) + .unwrap(); assert!(deleted_size.0 > 0); assert_eq!(v.file_count(), 0); assert_eq!(v.deleted_count(), 1); // Read should fail with Deleted - let mut read_n = Needle { id: NeedleId(1), ..Needle::default() }; + let mut read_n = Needle { + id: NeedleId(1), + ..Needle::default() + }; let err = v.read_needle(&mut read_n).unwrap_err(); assert!(matches!(err, VolumeError::Deleted)); } @@ -1919,7 +2108,10 @@ mod tests { assert_eq!(v.max_file_key(), NeedleId(10)); // Read back needle 5 - let mut n = Needle { id: NeedleId(5), ..Needle::default() }; + let mut n = Needle { + id: NeedleId(5), + ..Needle::default() + }; v.read_needle(&mut n).unwrap(); assert_eq!(n.data, b"needle data 5"); } @@ -1948,14 +2140,23 @@ mod tests { // Reload and verify let v = Volume::new( - dir, dir, "", VolumeId(1), + dir, + dir, + "", + VolumeId(1), NeedleMapKind::InMemory, - None, None, 0, + None, + None, + 0, Version::current(), - ).unwrap(); + ) + .unwrap(); assert_eq!(v.file_count(), 3); - let mut n = Needle { id: NeedleId(2), ..Needle::default() }; + let mut n = Needle { + id: NeedleId(2), + ..Needle::default() + }; v.read_needle(&mut n).unwrap(); assert_eq!(std::str::from_utf8(&n.data).unwrap(), "data 2"); } @@ -2065,19 +2266,31 @@ mod tests { // Dat should be smaller (deleted needle removed) let dat_size_after = v.dat_file_size().unwrap(); - assert!(dat_size_after < dat_size_before, "dat should shrink after compact"); + assert!( + dat_size_after < dat_size_before, + "dat should shrink after compact" + ); // Read back live needles - let mut n1 = Needle { id: NeedleId(1), ..Needle::default() }; + let mut n1 = Needle { + id: NeedleId(1), + ..Needle::default() + }; v.read_needle(&mut n1).unwrap(); assert_eq!(n1.data, b"data-1"); - let mut n3 = Needle { id: NeedleId(3), ..Needle::default() }; + let mut n3 = Needle { + id: NeedleId(3), + ..Needle::default() + }; v.read_needle(&mut n3).unwrap(); assert_eq!(n3.data, b"data-3"); // Needle 2 should not exist - let mut n2 = Needle { id: NeedleId(2), ..Needle::default() }; + let mut n2 = Needle { + id: NeedleId(2), + ..Needle::default() + }; assert!(v.read_needle(&mut n2).is_err()); // Compact files should not exist after commit