Browse Source

fix: wire slow read behavior into volume reads

rust-volume-server
Chris Lu 2 days ago
parent
commit
66e3900dc2
  1. 452
      seaweed-volume/src/server/handlers.rs
  2. 339
      seaweed-volume/src/storage/volume.rs

452
seaweed-volume/src/server/handlers.rs

@ -5,8 +5,8 @@
//! volume_server_handlers_admin.go.
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use axum::body::Body;
use axum::extract::{Query, State};
@ -14,11 +14,11 @@ use axum::http::{header, HeaderMap, Method, Request, StatusCode};
use axum::response::{IntoResponse, Response};
use serde::{Deserialize, Serialize};
use super::volume_server::VolumeServerState;
use crate::config::ReadMode;
use crate::metrics;
use crate::storage::needle::needle::Needle;
use crate::storage::types::*;
use super::volume_server::VolumeServerState;
// ============================================================================
// Inflight Throttle Guard
@ -63,7 +63,9 @@ impl http_body::Body for TrackedBody {
impl Drop for TrackedBody {
fn drop(&mut self) {
self.state.inflight_download_bytes.fetch_sub(self.bytes, Ordering::Relaxed);
self.state
.inflight_download_bytes
.fetch_sub(self.bytes, Ordering::Relaxed);
self.state.download_notify.notify_waiters();
}
}
@ -75,8 +77,8 @@ impl Drop for TrackedBody {
/// Threshold in bytes above which we stream needle data instead of buffering.
const STREAMING_THRESHOLD: u32 = 1024 * 1024; // 1 MB
/// Chunk size for streaming reads from the dat file.
const STREAMING_CHUNK_SIZE: usize = 64 * 1024; // 64 KB
/// Default chunk size for streaming reads from the dat file.
const DEFAULT_STREAMING_CHUNK_SIZE: usize = 64 * 1024; // 64 KB
/// A body that streams needle data from the dat file in chunks using pread,
/// avoiding loading the entire payload into memory at once.
@ -85,6 +87,10 @@ struct StreamingBody {
data_offset: u64,
data_size: u32,
pos: usize,
chunk_size: usize,
data_file_access_control: Arc<crate::storage::volume::DataFileAccessControl>,
hold_read_lock_for_stream: bool,
_held_read_lease: Option<crate::storage::volume::DataFileReadLease>,
/// Pending result from spawn_blocking, polled to completion.
pending: Option<tokio::task::JoinHandle<Result<bytes::Bytes, std::io::Error>>>,
/// For download throttling — released on drop.
@ -111,12 +117,17 @@ impl http_body::Body for StreamingBody {
Ok(Ok(chunk)) => {
let len = chunk.len();
self.pos += len;
return std::task::Poll::Ready(Some(Ok(http_body::Frame::data(chunk))));
return std::task::Poll::Ready(Some(Ok(http_body::Frame::data(
chunk,
))));
}
Ok(Err(e)) => return std::task::Poll::Ready(Some(Err(e))),
Err(e) => return std::task::Poll::Ready(Some(Err(
std::io::Error::new(std::io::ErrorKind::Other, e),
))),
Err(e) => {
return std::task::Poll::Ready(Some(Err(std::io::Error::new(
std::io::ErrorKind::Other,
e,
))))
}
}
}
}
@ -127,15 +138,22 @@ impl http_body::Body for StreamingBody {
return std::task::Poll::Ready(None);
}
let chunk_len = std::cmp::min(STREAMING_CHUNK_SIZE, total - self.pos);
let chunk_len = std::cmp::min(self.chunk_size, total - self.pos);
let file_offset = self.data_offset + self.pos as u64;
let file_clone = match self.dat_file.try_clone() {
Ok(f) => f,
Err(e) => return std::task::Poll::Ready(Some(Err(e))),
};
let data_file_access_control = self.data_file_access_control.clone();
let hold_read_lock_for_stream = self.hold_read_lock_for_stream;
let handle = tokio::task::spawn_blocking(move || {
let _lease = if hold_read_lock_for_stream {
None
} else {
Some(data_file_access_control.read_lock())
};
let mut buf = vec![0u8; chunk_len];
#[cfg(unix)]
{
@ -159,7 +177,8 @@ impl http_body::Body for StreamingBody {
impl Drop for StreamingBody {
fn drop(&mut self) {
if let Some(ref st) = self.state {
st.inflight_download_bytes.fetch_sub(self.tracked_bytes, Ordering::Relaxed);
st.inflight_download_bytes
.fetch_sub(self.tracked_bytes, Ordering::Relaxed);
st.download_notify.notify_waiters();
}
}
@ -196,6 +215,13 @@ fn extract_file_id(path: &str) -> String {
}
}
fn streaming_chunk_size(read_buffer_size_bytes: usize, data_size: usize) -> usize {
std::cmp::min(
read_buffer_size_bytes.max(DEFAULT_STREAMING_CHUNK_SIZE),
data_size.max(1),
)
}
fn parse_url_path(path: &str) -> Option<(VolumeId, NeedleId, Cookie)> {
let path = path.trim_start_matches('/');
@ -222,7 +248,8 @@ fn parse_url_path(path: &str) -> Option<(VolumeId, NeedleId, Cookie)> {
};
let vid = VolumeId::parse(vid_str).ok()?;
let (needle_id, cookie) = crate::storage::needle::needle::parse_needle_id_cookie(fid_str).ok()?;
let (needle_id, cookie) =
crate::storage::needle::needle::parse_needle_id_cookie(fid_str).ok()?;
Some((vid, needle_id, cookie))
}
@ -255,8 +282,15 @@ async fn lookup_volume(
volume_id: u32,
) -> Result<Vec<VolumeLocation>, String> {
let url = format!("http://{}/dir/lookup?volumeId={}", master_url, volume_id);
let resp = client.get(&url).send().await.map_err(|e| format!("lookup request failed: {}", e))?;
let result: LookupResult = resp.json().await.map_err(|e| format!("lookup parse failed: {}", e))?;
let resp = client
.get(&url)
.send()
.await
.map_err(|e| format!("lookup request failed: {}", e))?;
let result: LookupResult = resp
.json()
.await
.map_err(|e| format!("lookup parse failed: {}", e))?;
if let Some(err) = result.error {
if !err.is_empty() {
return Err(err);
@ -313,12 +347,8 @@ async fn proxy_or_redirect_to_target(
let target = candidates[0];
match state.read_mode {
ReadMode::Proxy => {
proxy_request(state, &info, target).await
}
ReadMode::Redirect => {
redirect_request(&info, target)
}
ReadMode::Proxy => proxy_request(state, &info, target).await,
ReadMode::Redirect => redirect_request(&info, target),
ReadMode::Local => unreachable!(),
}
}
@ -337,7 +367,10 @@ async fn proxy_request(
let target_url = if info.original_query.is_empty() {
format!("{}://{}/{}?proxied=true", scheme, target_host, path)
} else {
format!("{}://{}/{}?{}&proxied=true", scheme, target_host, path, info.original_query)
format!(
"{}://{}/{}?{}&proxied=true",
scheme, target_host, path, info.original_query
)
};
// Build the proxy request
@ -359,7 +392,8 @@ async fn proxy_request(
};
// Build response, copying headers and body from remote
let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let status =
StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let mut response_headers = HeaderMap::new();
for (name, value) in resp.headers() {
if name.as_str().eq_ignore_ascii_case("server") {
@ -383,10 +417,7 @@ async fn proxy_request(
}
/// Return a redirect response to the target volume server.
fn redirect_request(
info: &ProxyRequestInfo,
target: &VolumeLocation,
) -> Response {
fn redirect_request(info: &ProxyRequestInfo, target: &VolumeLocation) -> Response {
let scheme = "http";
let target_host = &target.public_url;
@ -405,12 +436,18 @@ fn redirect_request(
query_params.push("proxied=true".to_string());
let query = query_params.join("&");
let location = format!("{}://{}/{},{}?{}", scheme, target_host, &info.vid_str, &info.fid_str, query);
let location = format!(
"{}://{}/{},{}?{}",
scheme, target_host, &info.vid_str, &info.fid_str, query
);
Response::builder()
.status(StatusCode::MOVED_PERMANENTLY)
.header("Location", &location)
.body(Body::from(format!("<a href=\"{}\">Moved Permanently</a>.\n\n", location)))
.body(Body::from(format!(
"<a href=\"{}\">Moved Permanently</a>.\n\n",
location
)))
.unwrap_or_else(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response())
}
@ -468,7 +505,8 @@ pub async fn get_or_head_handler_from_request(
let headers = request.headers().clone();
// Parse query params manually from URI
let query_params: ReadQueryParams = uri.query()
let query_params: ReadQueryParams = uri
.query()
.and_then(|q| serde_urlencoded::from_str(q).ok())
.unwrap_or_default();
@ -504,7 +542,10 @@ async fn get_or_head_handler_inner(
// JWT check for reads
let file_id = extract_file_id(&path);
let token = extract_jwt(&headers, request.uri());
if let Err(e) = state.guard.check_jwt_for_file(token.as_deref(), &file_id, false) {
if let Err(e) = state
.guard
.check_jwt_for_file(token.as_deref(), &file_id, false)
{
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
}
@ -574,7 +615,10 @@ async fn get_or_head_handler_inner(
if current < state.concurrent_download_limit {
break;
}
if tokio::time::timeout_at(deadline, state.download_notify.notified()).await.is_err() {
if tokio::time::timeout_at(deadline, state.download_notify.notified())
.await
.is_err()
{
return (StatusCode::TOO_MANY_REQUESTS, "download limit exceeded").into_response();
}
}
@ -595,8 +639,10 @@ async fn get_or_head_handler_inner(
let has_range = headers.contains_key(header::RANGE);
let ext = extract_extension_from_path(&path);
let is_image = is_image_ext(&ext);
let has_image_ops = query.width.is_some() || query.height.is_some()
|| query.crop_x1.is_some() || query.crop_y1.is_some();
let has_image_ops = query.width.is_some()
|| query.height.is_some()
|| query.crop_x1.is_some()
|| query.crop_y1.is_some();
// Try meta-only read first for potential streaming
let store = state.store.read().unwrap();
@ -610,7 +656,11 @@ async fn get_or_head_handler_inner(
return StatusCode::NOT_FOUND.into_response();
}
Err(e) => {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("read error: {}", e)).into_response();
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("read error: {}", e),
)
.into_response();
}
};
drop(store);
@ -653,7 +703,11 @@ async fn get_or_head_handler_inner(
return StatusCode::NOT_FOUND.into_response();
}
Err(e) => {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("read error: {}", e)).into_response();
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("read error: {}", e),
)
.into_response();
}
}
drop(store);
@ -689,7 +743,9 @@ async fn get_or_head_handler_inner(
if let Some(ims_header) = headers.get(header::IF_MODIFIED_SINCE) {
if let Ok(ims_str) = ims_header.to_str() {
// Parse HTTP date format: "Mon, 02 Jan 2006 15:04:05 GMT"
if let Ok(ims_time) = chrono::NaiveDateTime::parse_from_str(ims_str, "%a, %d %b %Y %H:%M:%S GMT") {
if let Ok(ims_time) =
chrono::NaiveDateTime::parse_from_str(ims_str, "%a, %d %b %Y %H:%M:%S GMT")
{
if (n.last_modified as i64) <= ims_time.and_utc().timestamp() {
return StatusCode::NOT_MODIFIED.into_response();
}
@ -771,7 +827,9 @@ async fn get_or_head_handler_inner(
let tracked_bytes = info.data_size as i64;
let tracking_state = if download_guard.is_some() {
state.inflight_download_bytes.fetch_add(tracked_bytes, Ordering::Relaxed);
state
.inflight_download_bytes
.fetch_add(tracked_bytes, Ordering::Relaxed);
Some(state.clone())
} else {
None
@ -782,6 +840,17 @@ async fn get_or_head_handler_inner(
data_offset: info.data_file_offset,
data_size: info.data_size,
pos: 0,
chunk_size: streaming_chunk_size(
state.read_buffer_size_bytes,
info.data_size as usize,
),
_held_read_lease: if state.has_slow_read {
None
} else {
Some(info.data_file_access_control.read_lock())
},
data_file_access_control: info.data_file_access_control,
hold_read_lock_for_stream: !state.has_slow_read,
pending: None,
state: tracking_state,
tracked_bytes,
@ -801,7 +870,8 @@ async fn get_or_head_handler_inner(
let is_compressed = n.is_compressed();
let mut data = n.data;
if is_compressed {
let accept_encoding = headers.get(header::ACCEPT_ENCODING)
let accept_encoding = headers
.get(header::ACCEPT_ENCODING)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if accept_encoding.contains("gzip") {
@ -835,7 +905,10 @@ async fn get_or_head_handler_inner(
}
if method == Method::HEAD {
response_headers.insert(header::CONTENT_LENGTH, data.len().to_string().parse().unwrap());
response_headers.insert(
header::CONTENT_LENGTH,
data.len().to_string().parse().unwrap(),
);
return (StatusCode::OK, response_headers).into_response();
}
@ -846,7 +919,9 @@ async fn get_or_head_handler_inner(
// If download throttling is active, wrap the body so we track when it's fully sent
if download_guard.is_some() {
let data_len = data.len() as i64;
state.inflight_download_bytes.fetch_add(data_len, Ordering::Relaxed);
state
.inflight_download_bytes
.fetch_add(data_len, Ordering::Relaxed);
let tracked_body = TrackedBody {
data,
state: state.clone(),
@ -929,9 +1004,14 @@ fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) ->
let slice = &data[start..=end];
headers.insert(
"Content-Range",
format!("bytes {}-{}/{}", start, end, total).parse().unwrap(),
format!("bytes {}-{}/{}", start, end, total)
.parse()
.unwrap(),
);
headers.insert(
header::CONTENT_LENGTH,
slice.len().to_string().parse().unwrap(),
);
headers.insert(header::CONTENT_LENGTH, slice.len().to_string().parse().unwrap());
(StatusCode::PARTIAL_CONTENT, headers, slice.to_vec()).into_response()
} else {
// Multi-range: build multipart/byteranges response
@ -945,9 +1025,7 @@ fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) ->
let mut body = Vec::new();
for &(start, end) in &ranges {
body.extend_from_slice(format!("\r\n--{}\r\n", boundary).as_bytes());
body.extend_from_slice(
format!("Content-Type: {}\r\n", content_type).as_bytes(),
);
body.extend_from_slice(format!("Content-Type: {}\r\n", content_type).as_bytes());
body.extend_from_slice(
format!("Content-Range: bytes {}-{}/{}\r\n\r\n", start, end, total).as_bytes(),
);
@ -961,7 +1039,10 @@ fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) ->
.parse()
.unwrap(),
);
headers.insert(header::CONTENT_LENGTH, body.len().to_string().parse().unwrap());
headers.insert(
header::CONTENT_LENGTH,
body.len().to_string().parse().unwrap(),
);
(StatusCode::PARTIAL_CONTENT, headers, body).into_response()
}
}
@ -1094,13 +1175,17 @@ pub async fn post_handler(
// JWT check for writes
let file_id = extract_file_id(&path);
let token = extract_jwt(&headers, request.uri());
if let Err(e) = state.guard.check_jwt_for_file(token.as_deref(), &file_id, true) {
if let Err(e) = state
.guard
.check_jwt_for_file(token.as_deref(), &file_id, true)
{
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
}
// Upload throttling: check inflight bytes against limit
let is_replicate = query.split('&').any(|p| p == "type=replicate");
let content_length = headers.get(header::CONTENT_LENGTH)
let content_length = headers
.get(header::CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(0);
@ -1120,11 +1205,16 @@ pub async fn post_handler(
break;
}
// Wait for notification or timeout
if tokio::time::timeout_at(deadline, state.upload_notify.notified()).await.is_err() {
if tokio::time::timeout_at(deadline, state.upload_notify.notified())
.await
.is_err()
{
return (StatusCode::TOO_MANY_REQUESTS, "upload limit exceeded").into_response();
}
}
state.inflight_upload_bytes.fetch_add(content_length, Ordering::Relaxed);
state
.inflight_upload_bytes
.fetch_add(content_length, Ordering::Relaxed);
}
// RAII guard to release upload throttle on any exit path
@ -1139,19 +1229,25 @@ pub async fn post_handler(
};
// Check for chunk manifest flag
let is_chunk_manifest = query.split('&')
.any(|p| p == "cm=true" || p == "cm=1");
let is_chunk_manifest = query.split('&').any(|p| p == "cm=true" || p == "cm=1");
// Validate multipart/form-data has a boundary
if let Some(ct) = headers.get(header::CONTENT_TYPE) {
if let Ok(ct_str) = ct.to_str() {
if ct_str.starts_with("multipart/form-data") && !ct_str.contains("boundary=") {
return (StatusCode::BAD_REQUEST, "no multipart boundary param in Content-Type").into_response();
return (
StatusCode::BAD_REQUEST,
"no multipart boundary param in Content-Type",
)
.into_response();
}
}
}
let content_md5 = headers.get("Content-MD5").and_then(|v| v.to_str().ok()).map(|s| s.to_string());
let content_md5 = headers
.get("Content-MD5")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
// Read body
let body = match axum::body::to_bytes(request.into_body(), usize::MAX).await {
@ -1166,13 +1262,20 @@ pub async fn post_handler(
// Validate Content-MD5 if provided
if let Some(ref expected_md5) = content_md5 {
use md5::{Md5, Digest};
use base64::Engine;
use md5::{Digest, Md5};
let mut hasher = Md5::new();
hasher.update(&body);
let actual = base64::engine::general_purpose::STANDARD.encode(hasher.finalize());
if actual != *expected_md5 {
return (StatusCode::BAD_REQUEST, format!("Content-MD5 mismatch: expected {} got {}", expected_md5, actual)).into_response();
return (
StatusCode::BAD_REQUEST,
format!(
"Content-MD5 mismatch: expected {} got {}",
expected_md5, actual
),
)
.into_response();
}
}
@ -1182,7 +1285,8 @@ pub async fn post_handler(
.as_secs();
// Parse custom timestamp from query param
let ts_str = query.split('&')
let ts_str = query
.split('&')
.find_map(|p| p.strip_prefix("ts="))
.unwrap_or("");
let last_modified = if !ts_str.is_empty() {
@ -1192,13 +1296,15 @@ pub async fn post_handler(
};
// Check if upload is pre-compressed
let is_gzipped = headers.get(header::CONTENT_ENCODING)
let is_gzipped = headers
.get(header::CONTENT_ENCODING)
.and_then(|v| v.to_str().ok())
.map(|s| s == "gzip")
.unwrap_or(false);
// Extract MIME type from Content-Type header (needed early for JPEG orientation fix)
let mime_type = headers.get(header::CONTENT_TYPE)
let mime_type = headers
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map(|ct| {
if ct.starts_with("multipart/") {
@ -1210,7 +1316,8 @@ pub async fn post_handler(
.unwrap_or_else(|| "application/octet-stream".to_string());
// Parse TTL from query param (matches Go's r.FormValue("ttl"))
let ttl_str = query.split('&')
let ttl_str = query
.split('&')
.find_map(|p| p.strip_prefix("ttl="))
.unwrap_or("");
let ttl = if !ttl_str.is_empty() {
@ -1220,7 +1327,8 @@ pub async fn post_handler(
};
// Extract Seaweed-* custom metadata headers (pairs)
let pair_map: std::collections::HashMap<String, String> = headers.iter()
let pair_map: std::collections::HashMap<String, String> = headers
.iter()
.filter_map(|(k, v)| {
let key = k.as_str();
if key.len() > 8 && key[..8].eq_ignore_ascii_case("seaweed-") {
@ -1249,7 +1357,9 @@ pub async fn post_handler(
let (final_data, final_is_gzipped) = if !is_gzipped && !is_chunk_manifest {
let ext = {
let dot_pos = path.rfind('.');
dot_pos.map(|p| path[p..].to_lowercase()).unwrap_or_default()
dot_pos
.map(|p| path[p..].to_lowercase())
.unwrap_or_default()
};
if is_compressible_file_type(&ext, &mime_type) {
if let Some(compressed) = try_gzip_data(&body_data) {
@ -1347,9 +1457,11 @@ pub async fn post_handler(
Err(crate::storage::volume::VolumeError::ReadOnly) => {
(StatusCode::FORBIDDEN, "volume is read-only").into_response()
}
Err(e) => {
(StatusCode::INTERNAL_SERVER_ERROR, format!("write error: {}", e)).into_response()
}
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("write error: {}", e),
)
.into_response(),
};
// _upload_guard drops here, releasing inflight bytes
@ -1370,7 +1482,9 @@ pub async fn delete_handler(
request: Request<Body>,
) -> Response {
let start = std::time::Instant::now();
metrics::REQUEST_COUNTER.with_label_values(&["delete"]).inc();
metrics::REQUEST_COUNTER
.with_label_values(&["delete"])
.inc();
let path = request.uri().path().to_string();
let headers = request.headers().clone();
@ -1383,13 +1497,17 @@ pub async fn delete_handler(
// JWT check for writes (deletes use write key)
let file_id = extract_file_id(&path);
let token = extract_jwt(&headers, request.uri());
if let Err(e) = state.guard.check_jwt_for_file(token.as_deref(), &file_id, true) {
if let Err(e) = state
.guard
.check_jwt_for_file(token.as_deref(), &file_id, true)
{
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
}
// Parse custom timestamp from query param
let del_query = request.uri().query().unwrap_or("");
let del_ts_str = del_query.split('&')
let del_ts_str = del_query
.split('&')
.find_map(|p| p.strip_prefix("ts="))
.unwrap_or("");
let del_last_modified = if !del_ts_str.is_empty() {
@ -1417,7 +1535,11 @@ pub async fn delete_handler(
}
}
if n.cookie != original_cookie {
return (StatusCode::BAD_REQUEST, "File Random Cookie does not match.").into_response();
return (
StatusCode::BAD_REQUEST,
"File Random Cookie does not match.",
)
.into_response();
}
// Apply custom timestamp if provided
@ -1448,7 +1570,11 @@ pub async fn delete_handler(
let (chunk_vid, chunk_nid, chunk_cookie) = match parse_url_path(&chunk.fid) {
Some(p) => p,
None => {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("invalid chunk fid: {}", chunk.fid)).into_response();
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("invalid chunk fid: {}", chunk.fid),
)
.into_response();
}
};
let mut chunk_needle = Needle {
@ -1460,25 +1586,39 @@ pub async fn delete_handler(
{
let store = state.store.read().unwrap();
if let Err(e) = store.read_volume_needle(chunk_vid, &mut chunk_needle) {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("read chunk {}: {}", chunk.fid, e)).into_response();
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("read chunk {}: {}", chunk.fid, e),
)
.into_response();
}
}
// Delete the chunk
let mut store = state.store.write().unwrap();
if let Err(e) = store.delete_volume_needle(chunk_vid, &mut chunk_needle) {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("delete chunk {}: {}", chunk.fid, e)).into_response();
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("delete chunk {}: {}", chunk.fid, e),
)
.into_response();
}
}
// Delete the manifest itself
let mut store = state.store.write().unwrap();
if let Err(e) = store.delete_volume_needle(vid, &mut n) {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("delete manifest: {}", e)).into_response();
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("delete manifest: {}", e),
)
.into_response();
}
metrics::REQUEST_DURATION
.with_label_values(&["delete"])
.observe(start.elapsed().as_secs_f64());
// Return the manifest's declared size (matches Go behavior)
let result = DeleteResult { size: manifest.size as i32 };
let result = DeleteResult {
size: manifest.size as i32,
};
return (StatusCode::ACCEPTED, axum::Json(result)).into_response();
}
}
@ -1500,9 +1640,11 @@ pub async fn delete_handler(
let result = DeleteResult { size: 0 };
(StatusCode::NOT_FOUND, axum::Json(result)).into_response()
}
Err(e) => {
(StatusCode::INTERNAL_SERVER_ERROR, format!("delete error: {}", e)).into_response()
}
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("delete error: {}", e),
)
.into_response(),
}
}
@ -1521,12 +1663,30 @@ pub async fn status_handler(
for (_vid, vol) in loc.volumes() {
let mut vol_info = serde_json::Map::new();
vol_info.insert("Id".to_string(), serde_json::Value::from(vol.id.0));
vol_info.insert("Collection".to_string(), serde_json::Value::from(vol.collection.clone()));
vol_info.insert("Size".to_string(), serde_json::Value::from(vol.content_size()));
vol_info.insert("FileCount".to_string(), serde_json::Value::from(vol.file_count()));
vol_info.insert("DeleteCount".to_string(), serde_json::Value::from(vol.deleted_count()));
vol_info.insert("ReadOnly".to_string(), serde_json::Value::from(vol.is_read_only()));
vol_info.insert("Version".to_string(), serde_json::Value::from(vol.version().0));
vol_info.insert(
"Collection".to_string(),
serde_json::Value::from(vol.collection.clone()),
);
vol_info.insert(
"Size".to_string(),
serde_json::Value::from(vol.content_size()),
);
vol_info.insert(
"FileCount".to_string(),
serde_json::Value::from(vol.file_count()),
);
vol_info.insert(
"DeleteCount".to_string(),
serde_json::Value::from(vol.deleted_count()),
);
vol_info.insert(
"ReadOnly".to_string(),
serde_json::Value::from(vol.is_read_only()),
);
vol_info.insert(
"Version".to_string(),
serde_json::Value::from(vol.version().0),
);
volumes.push(serde_json::Value::Object(vol_info));
}
}
@ -1539,15 +1699,24 @@ pub async fn status_handler(
ds.insert("dir".to_string(), serde_json::Value::from(dir.clone()));
// Add disk stats if available
if let Ok(path) = std::path::Path::new(&dir).canonicalize() {
ds.insert("dir".to_string(), serde_json::Value::from(path.to_string_lossy().to_string()));
ds.insert(
"dir".to_string(),
serde_json::Value::from(path.to_string_lossy().to_string()),
);
}
disk_statuses.push(serde_json::Value::Object(ds));
}
let mut m = serde_json::Map::new();
m.insert("Version".to_string(), serde_json::Value::from(env!("CARGO_PKG_VERSION")));
m.insert(
"Version".to_string(),
serde_json::Value::from(env!("CARGO_PKG_VERSION")),
);
m.insert("Volumes".to_string(), serde_json::Value::Array(volumes));
m.insert("DiskStatuses".to_string(), serde_json::Value::Array(disk_statuses));
m.insert(
"DiskStatuses".to_string(),
serde_json::Value::Array(disk_statuses),
);
let json_value = serde_json::Value::Object(m);
@ -1576,9 +1745,7 @@ pub async fn status_handler(
// Health Check Handler
// ============================================================================
pub async fn healthz_handler(
State(state): State<Arc<VolumeServerState>>,
) -> Response {
pub async fn healthz_handler(State(state): State<Arc<VolumeServerState>>) -> Response {
let is_stopping = *state.is_stopping.read().unwrap();
if is_stopping {
return (StatusCode::SERVICE_UNAVAILABLE, "stopping").into_response();
@ -1598,7 +1765,10 @@ pub async fn metrics_handler() -> Response {
let body = metrics::gather_metrics();
(
StatusCode::OK,
[(header::CONTENT_TYPE, "text/plain; version=0.0.4; charset=utf-8")],
[(
header::CONTENT_TYPE,
"text/plain; version=0.0.4; charset=utf-8",
)],
body,
)
.into_response()
@ -1626,12 +1796,15 @@ pub async fn stats_memory_handler() -> Response {
"HeapReleased": 0,
},
});
(StatusCode::OK, [(header::CONTENT_TYPE, "application/json")], info.to_string()).into_response()
(
StatusCode::OK,
[(header::CONTENT_TYPE, "application/json")],
info.to_string(),
)
.into_response()
}
pub async fn stats_disk_handler(
State(state): State<Arc<VolumeServerState>>,
) -> Response {
pub async fn stats_disk_handler(State(state): State<Arc<VolumeServerState>>) -> Response {
let store = state.store.read().unwrap();
let mut ds = Vec::new();
for loc in &store.locations {
@ -1648,7 +1821,12 @@ pub async fn stats_disk_handler(
"Version": env!("CARGO_PKG_VERSION"),
"DiskStatuses": ds,
});
(StatusCode::OK, [(header::CONTENT_TYPE, "application/json")], info.to_string()).into_response()
(
StatusCode::OK,
[(header::CONTENT_TYPE, "application/json")],
info.to_string(),
)
.into_response()
}
// ============================================================================
@ -1669,18 +1847,13 @@ pub async fn favicon_handler() -> Response {
pub async fn static_asset_handler() -> Response {
// Return a minimal valid PNG (1x1 transparent)
let png: &[u8] = &[
0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, 0x00, 0x00, 0x00, 0x0D, 0x49, 0x48,
0x44, 0x52, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x06, 0x00, 0x00,
0x00, 0x1F, 0x15, 0xC4, 0x89, 0x00, 0x00, 0x00, 0x0A, 0x49, 0x44, 0x41, 0x54, 0x78,
0x9C, 0x62, 0x00, 0x00, 0x00, 0x02, 0x00, 0x01, 0xE5, 0x27, 0xDE, 0xFC, 0x00, 0x00,
0x00, 0x00, 0x49, 0x45, 0x4E, 0x44, 0xAE, 0x42, 0x60, 0x82,
0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, 0x00, 0x00, 0x00, 0x0D, 0x49, 0x48, 0x44,
0x52, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x06, 0x00, 0x00, 0x00, 0x1F,
0x15, 0xC4, 0x89, 0x00, 0x00, 0x00, 0x0A, 0x49, 0x44, 0x41, 0x54, 0x78, 0x9C, 0x62, 0x00,
0x00, 0x00, 0x02, 0x00, 0x01, 0xE5, 0x27, 0xDE, 0xFC, 0x00, 0x00, 0x00, 0x00, 0x49, 0x45,
0x4E, 0x44, 0xAE, 0x42, 0x60, 0x82,
];
(
StatusCode::OK,
[(header::CONTENT_TYPE, "image/png")],
png,
)
.into_response()
(StatusCode::OK, [(header::CONTENT_TYPE, "image/png")], png).into_response()
}
pub async fn ui_handler(
@ -1762,7 +1935,15 @@ fn try_expand_chunk_manifest(
for chunk in &manifest.chunks {
let (chunk_vid, chunk_nid, chunk_cookie) = match parse_url_path(&chunk.fid) {
Some(p) => p,
None => return Some((StatusCode::INTERNAL_SERVER_ERROR, format!("invalid chunk fid: {}", chunk.fid)).into_response()),
None => {
return Some(
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("invalid chunk fid: {}", chunk.fid),
)
.into_response(),
)
}
};
let mut chunk_needle = Needle {
id: chunk_nid,
@ -1771,7 +1952,15 @@ fn try_expand_chunk_manifest(
};
match store.read_volume_needle(chunk_vid, &mut chunk_needle) {
Ok(_) => {}
Err(e) => return Some((StatusCode::INTERNAL_SERVER_ERROR, format!("read chunk {}: {}", chunk.fid, e)).into_response()),
Err(e) => {
return Some(
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("read chunk {}: {}", chunk.fid, e),
)
.into_response(),
)
}
}
let chunk_data = if chunk_needle.is_compressed() {
use flate2::read::GzDecoder;
@ -1805,7 +1994,10 @@ fn try_expand_chunk_manifest(
response_headers.insert("X-File-Store", "chunked".parse().unwrap());
if *method == Method::HEAD {
response_headers.insert(header::CONTENT_LENGTH, result.len().to_string().parse().unwrap());
response_headers.insert(
header::CONTENT_LENGTH,
result.len().to_string().parse().unwrap(),
);
return Some((StatusCode::OK, response_headers).into_response());
}
@ -1886,10 +2078,18 @@ fn is_compressible_file_type(ext: &str, mtype: &str) -> bool {
}
// By MIME type
if mtype.starts_with("application/") {
if mtype.ends_with("zstd") { return false; }
if mtype.ends_with("xml") { return true; }
if mtype.ends_with("script") { return true; }
if mtype.ends_with("vnd.rar") { return false; }
if mtype.ends_with("zstd") {
return false;
}
if mtype.ends_with("xml") {
return true;
}
if mtype.ends_with("script") {
return true;
}
if mtype.ends_with("vnd.rar") {
return false;
}
}
if mtype.starts_with("audio/") {
let sub = mtype.strip_prefix("audio/").unwrap_or("");
@ -1970,7 +2170,10 @@ mod tests {
#[test]
fn test_extract_jwt_query_over_header() {
let mut headers = HeaderMap::new();
headers.insert(header::AUTHORIZATION, "Bearer header_token".parse().unwrap());
headers.insert(
header::AUTHORIZATION,
"Bearer header_token".parse().unwrap(),
);
let uri: axum::http::Uri = "/test?jwt=query_token".parse().unwrap();
assert_eq!(extract_jwt(&headers, &uri), Some("query_token".to_string()));
}
@ -2034,7 +2237,10 @@ mod tests {
assert!(!is_compressible_file_type("", "audio/mpeg"));
// Unknown
assert!(!is_compressible_file_type(".xyz", "application/octet-stream"));
assert!(!is_compressible_file_type(
".xyz",
"application/octet-stream"
));
}
#[test]
@ -2054,4 +2260,20 @@ mod tests {
decoder.read_to_end(&mut decompressed).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn test_streaming_chunk_size_respects_configured_read_buffer() {
assert_eq!(
streaming_chunk_size(4 * 1024 * 1024, 8 * 1024 * 1024),
4 * 1024 * 1024
);
assert_eq!(
streaming_chunk_size(32 * 1024, 512 * 1024),
DEFAULT_STREAMING_CHUNK_SIZE
);
assert_eq!(
streaming_chunk_size(8 * 1024 * 1024, 128 * 1024),
128 * 1024
);
}
}

339
seaweed-volume/src/storage/volume.rs

@ -12,13 +12,15 @@
use std::fs::{self, File, OpenOptions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::Arc;
use std::sync::{Condvar, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::warn;
use crate::storage::needle::needle::{self, Needle, NeedleError, get_actual_size};
use crate::storage::needle::needle::{self, get_actual_size, Needle, NeedleError};
use crate::storage::needle_map::{CompactNeedleMap, NeedleMap, NeedleMapKind, RedbNeedleMap};
use crate::storage::super_block::{SuperBlock, ReplicaPlacement, SUPER_BLOCK_SIZE};
use crate::storage::super_block::{ReplicaPlacement, SuperBlock, SUPER_BLOCK_SIZE};
use crate::storage::types::*;
// ============================================================================
@ -77,22 +79,26 @@ struct VolumeInfo {
read_only: bool,
}
pub use crate::pb::volume_server_pb::RemoteFile as PbRemoteFile;
/// Protobuf VolumeInfo type alias.
pub use crate::pb::volume_server_pb::VolumeInfo as PbVolumeInfo;
pub use crate::pb::volume_server_pb::RemoteFile as PbRemoteFile;
/// Helper module for deserializing protojson uint64 fields that may be strings or numbers.
mod string_or_u64 {
use serde::{self, Deserialize, Deserializer, Serializer};
pub fn serialize<S>(value: &u64, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer {
where
S: Serializer,
{
// Emit as string to match Go's protojson format for uint64
serializer.serialize_str(&value.to_string())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<u64, D::Error>
where D: Deserializer<'de> {
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum StringOrNum {
@ -110,12 +116,16 @@ mod string_or_i64 {
use serde::{self, Deserialize, Deserializer, Serializer};
pub fn serialize<S>(value: &i64, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer {
where
S: Serializer,
{
serializer.serialize_str(&value.to_string())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<i64, D::Error>
where D: Deserializer<'de> {
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum StringOrNum {
@ -173,15 +183,19 @@ impl VifVolumeInfo {
/// Convert from protobuf VolumeInfo to the serde-compatible struct.
pub fn from_pb(pb: &PbVolumeInfo) -> Self {
VifVolumeInfo {
files: pb.files.iter().map(|f| VifRemoteFile {
backend_type: f.backend_type.clone(),
backend_id: f.backend_id.clone(),
key: f.key.clone(),
offset: f.offset,
file_size: f.file_size,
modified_time: f.modified_time,
extension: f.extension.clone(),
}).collect(),
files: pb
.files
.iter()
.map(|f| VifRemoteFile {
backend_type: f.backend_type.clone(),
backend_id: f.backend_id.clone(),
key: f.key.clone(),
offset: f.offset,
file_size: f.file_size,
modified_time: f.modified_time,
extension: f.extension.clone(),
})
.collect(),
version: pb.version,
replication: pb.replication.clone(),
bytes_offset: pb.bytes_offset,
@ -194,15 +208,19 @@ impl VifVolumeInfo {
/// Convert to protobuf VolumeInfo.
pub fn to_pb(&self) -> PbVolumeInfo {
PbVolumeInfo {
files: self.files.iter().map(|f| PbRemoteFile {
backend_type: f.backend_type.clone(),
backend_id: f.backend_id.clone(),
key: f.key.clone(),
offset: f.offset,
file_size: f.file_size,
modified_time: f.modified_time,
extension: f.extension.clone(),
}).collect(),
files: self
.files
.iter()
.map(|f| PbRemoteFile {
backend_type: f.backend_type.clone(),
backend_id: f.backend_id.clone(),
key: f.key.clone(),
offset: f.offset,
file_size: f.file_size,
modified_time: f.modified_time,
extension: f.extension.clone(),
})
.collect(),
version: self.version,
replication: self.replication.clone(),
bytes_offset: self.bytes_offset,
@ -218,6 +236,70 @@ impl VifVolumeInfo {
// Streaming read support
// ============================================================================
#[derive(Default)]
struct DataFileAccessState {
readers: usize,
writer_active: bool,
}
#[derive(Default)]
pub struct DataFileAccessControl {
state: Mutex<DataFileAccessState>,
condvar: Condvar,
}
pub struct DataFileReadLease {
control: Arc<DataFileAccessControl>,
}
pub struct DataFileWriteLease {
control: Arc<DataFileAccessControl>,
}
impl DataFileAccessControl {
pub fn read_lock(self: &Arc<Self>) -> DataFileReadLease {
let mut state = self.state.lock().unwrap();
while state.writer_active {
state = self.condvar.wait(state).unwrap();
}
state.readers += 1;
drop(state);
DataFileReadLease {
control: self.clone(),
}
}
pub fn write_lock(self: &Arc<Self>) -> DataFileWriteLease {
let mut state = self.state.lock().unwrap();
while state.writer_active || state.readers > 0 {
state = self.condvar.wait(state).unwrap();
}
state.writer_active = true;
drop(state);
DataFileWriteLease {
control: self.clone(),
}
}
}
impl Drop for DataFileReadLease {
fn drop(&mut self) {
let mut state = self.control.state.lock().unwrap();
state.readers -= 1;
if state.readers == 0 {
self.control.condvar.notify_all();
}
}
}
impl Drop for DataFileWriteLease {
fn drop(&mut self) {
let mut state = self.control.state.lock().unwrap();
state.writer_active = false;
self.control.condvar.notify_all();
}
}
/// Information needed to stream needle data directly from the dat file
/// without loading the entire payload into memory.
pub struct NeedleStreamInfo {
@ -227,6 +309,8 @@ pub struct NeedleStreamInfo {
pub data_file_offset: u64,
/// Size of the data payload in bytes.
pub data_size: u32,
/// Per-volume file access lock used to match Go's slow-read behavior.
pub data_file_access_control: Arc<DataFileAccessControl>,
}
// ============================================================================
@ -242,6 +326,7 @@ pub struct Volume {
dat_file: Option<File>,
nm: Option<NeedleMap>,
needle_map_kind: NeedleMapKind,
data_file_access_control: Arc<DataFileAccessControl>,
pub super_block: SuperBlock,
@ -276,7 +361,10 @@ fn read_exact_at(file: &File, buf: &mut [u8], mut offset: u64) -> io::Result<()>
while filled < buf.len() {
let n = file.seek_read(&mut buf[filled..], offset)?;
if n == 0 {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF in seek_read"));
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF in seek_read",
));
}
filled += n;
offset += n as u64;
@ -305,6 +393,7 @@ impl Volume {
dat_file: None,
nm: None,
needle_map_kind,
data_file_access_control: Arc::new(DataFileAccessControl::default()),
super_block: SuperBlock {
replica_placement: replica_placement.unwrap_or_default(),
ttl: ttl.unwrap_or(crate::storage::needle::ttl::TTL::EMPTY),
@ -598,6 +687,7 @@ impl Volume {
}
pub fn read_needle_opt(&self, n: &mut Needle, read_deleted: bool) -> Result<i32, VolumeError> {
let _guard = self.data_file_access_control.read_lock();
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?;
@ -618,7 +708,7 @@ impl Volume {
return Ok(0);
}
self.read_needle_data_at(n, nv.offset.to_actual_offset(), read_size)?;
self.read_needle_data_at_unlocked(n, nv.offset.to_actual_offset(), read_size)?;
// TTL expiry check
if n.has_ttl() {
@ -641,7 +731,22 @@ impl Volume {
}
/// Read needle data from .dat file at given offset.
pub fn read_needle_data_at(&self, n: &mut Needle, offset: i64, size: Size) -> Result<(), VolumeError> {
pub fn read_needle_data_at(
&self,
n: &mut Needle,
offset: i64,
size: Size,
) -> Result<(), VolumeError> {
let _guard = self.data_file_access_control.read_lock();
self.read_needle_data_at_unlocked(n, offset, size)
}
fn read_needle_data_at_unlocked(
&self,
n: &mut Needle,
offset: i64,
size: Size,
) -> Result<(), VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
@ -671,6 +776,11 @@ impl Volume {
/// Read raw needle blob at a specific offset.
pub fn read_needle_blob(&self, offset: i64, size: Size) -> Result<Vec<u8>, VolumeError> {
let _guard = self.data_file_access_control.read_lock();
self.read_needle_blob_unlocked(offset, size)
}
fn read_needle_blob_unlocked(&self, offset: i64, size: Size) -> Result<Vec<u8>, VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
@ -696,7 +806,12 @@ impl Volume {
/// and return a `NeedleStreamInfo` that can be used to stream data directly from the dat file.
///
/// This is used for large needles to avoid loading the entire payload into memory.
pub fn read_needle_stream_info(&self, n: &mut Needle, read_deleted: bool) -> Result<NeedleStreamInfo, VolumeError> {
pub fn read_needle_stream_info(
&self,
n: &mut Needle,
read_deleted: bool,
) -> Result<NeedleStreamInfo, VolumeError> {
let _guard = self.data_file_access_control.read_lock();
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?;
@ -770,13 +885,19 @@ impl Volume {
dat_file: cloned_file,
data_file_offset,
data_size: n.data_size,
data_file_access_control: self.data_file_access_control.clone(),
})
}
// ---- Write ----
/// Write a needle to the volume (synchronous path).
pub fn write_needle(&mut self, n: &mut Needle, check_cookie: bool) -> Result<(u64, Size, bool), VolumeError> {
pub fn write_needle(
&mut self,
n: &mut Needle,
check_cookie: bool,
) -> Result<(u64, Size, bool), VolumeError> {
let _guard = self.data_file_access_control.write_lock();
if self.no_write_or_delete {
return Err(VolumeError::ReadOnly);
}
@ -784,7 +905,11 @@ impl Volume {
self.do_write_request(n, check_cookie)
}
fn do_write_request(&mut self, n: &mut Needle, check_cookie: bool) -> Result<(u64, Size, bool), VolumeError> {
fn do_write_request(
&mut self,
n: &mut Needle,
check_cookie: bool,
) -> Result<(u64, Size, bool), VolumeError> {
// Ensure checksum is computed before dedup check
if n.checksum == crate::storage::needle::crc::CRC(0) && !n.data.is_empty() {
n.checksum = crate::storage::needle::crc::CRC::new(&n.data);
@ -801,7 +926,7 @@ impl Volume {
if !nv.offset.is_zero() && nv.size.is_valid() {
let mut existing = Needle::default();
// Read only the header to check cookie
self.read_needle_header(&mut existing, nv.offset.to_actual_offset())?;
self.read_needle_header_unlocked(&mut existing, nv.offset.to_actual_offset())?;
if n.cookie.0 == 0 && !check_cookie {
n.cookie = existing.cookie;
@ -843,7 +968,7 @@ impl Volume {
Ok((offset, size, false))
}
fn read_needle_header(&self, n: &mut Needle, offset: i64) -> Result<(), VolumeError> {
fn read_needle_header_unlocked(&self, n: &mut Needle, offset: i64) -> Result<(), VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
@ -873,7 +998,14 @@ impl Volume {
if let Some(nv) = nm.get(n.id) {
if !nv.offset.is_zero() && nv.size.is_valid() {
let mut old = Needle::default();
if self.read_needle_data_at(&mut old, nv.offset.to_actual_offset(), nv.size).is_ok() {
if self
.read_needle_data_at_unlocked(
&mut old,
nv.offset.to_actual_offset(),
nv.size,
)
.is_ok()
{
if old.cookie == n.cookie
&& old.checksum == n.checksum
&& old.data == n.data
@ -907,6 +1039,7 @@ impl Volume {
/// Delete a needle from the volume.
pub fn delete_needle(&mut self, n: &mut Needle) -> Result<Size, VolumeError> {
let _guard = self.data_file_access_control.write_lock();
if self.no_write_or_delete {
return Err(VolumeError::ReadOnly);
}
@ -992,7 +1125,8 @@ impl Volume {
id: key,
..Needle::default()
};
if let Ok(()) = self.read_needle_data_at(&mut n, nv.offset.to_actual_offset(), nv.size) {
if let Ok(()) = self.read_needle_data_at(&mut n, nv.offset.to_actual_offset(), nv.size)
{
needles.push(n);
}
}
@ -1046,7 +1180,10 @@ impl Volume {
/// Scan raw needle entries from the .dat file starting at `from_offset`.
/// Returns (needle_header_bytes, needle_body_bytes, append_at_ns) for each needle.
/// Used by VolumeTailSender to stream raw bytes.
pub fn scan_raw_needles_from(&self, from_offset: u64) -> Result<Vec<(Vec<u8>, Vec<u8>, u64)>, VolumeError> {
pub fn scan_raw_needles_from(
&self,
from_offset: u64,
) -> Result<Vec<(Vec<u8>, Vec<u8>, u64)>, VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or(VolumeError::NotFound)?;
let version = self.super_block.version;
let dat_size = dat_file.metadata()?.len();
@ -1101,10 +1238,14 @@ impl Volume {
}
/// Insert or update a needle index entry (for low-level blob writes).
pub fn put_needle_index(&mut self, key: NeedleId, offset: Offset, size: Size) -> Result<(), VolumeError> {
pub fn put_needle_index(
&mut self,
key: NeedleId,
offset: Offset,
size: Size,
) -> Result<(), VolumeError> {
if let Some(ref mut nm) = self.nm {
nm.put(key, offset, size)
.map_err(VolumeError::Io)?;
nm.put(key, offset, size).map_err(VolumeError::Io)?;
}
Ok(())
}
@ -1203,7 +1344,8 @@ impl Volume {
return;
}
// Simple throttle: sleep based on bytes written vs allowed rate
let sleep_us = (bytes_written as f64 / self.compaction_byte_per_second as f64 * 1_000_000.0) as u64;
let sleep_us =
(bytes_written as f64 / self.compaction_byte_per_second as f64 * 1_000_000.0) as u64;
if sleep_us > 0 {
std::thread::sleep(std::time::Duration::from_micros(sleep_us));
}
@ -1321,7 +1463,10 @@ impl Volume {
/// Binary search through the .idx file to find the first needle
/// with append_at_ns > since_ns. Returns (offset, is_last).
/// Matches Go's BinarySearchByAppendAtNs in volume_backup.go.
pub fn binary_search_by_append_at_ns(&self, since_ns: u64) -> Result<(Offset, bool), VolumeError> {
pub fn binary_search_by_append_at_ns(
&self,
since_ns: u64,
) -> Result<(Offset, bool), VolumeError> {
let file_size = self.idx_file_size() as i64;
if file_size % NEEDLE_MAP_ENTRY_SIZE as i64 != 0 {
return Err(VolumeError::Io(io::Error::new(
@ -1380,7 +1525,11 @@ impl Volume {
}
/// Write a raw needle blob at a specific offset in the .dat file.
pub fn write_needle_blob(&mut self, offset: i64, needle_blob: &[u8]) -> Result<(), VolumeError> {
pub fn write_needle_blob(
&mut self,
offset: i64,
needle_blob: &[u8],
) -> Result<(), VolumeError> {
if self.no_write_or_delete {
return Err(VolumeError::ReadOnly);
}
@ -1417,7 +1566,8 @@ impl Volume {
/// Get the modification time of the .dat file as Unix seconds.
pub fn dat_file_mod_time(&self) -> u64 {
self.dat_file.as_ref()
self.dat_file
.as_ref()
.and_then(|f| f.metadata().ok())
.and_then(|m| m.modified().ok())
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
@ -1781,17 +1931,48 @@ mod tests {
fn make_test_volume(dir: &str) -> Volume {
Volume::new(
dir, dir, "", VolumeId(1),
dir,
dir,
"",
VolumeId(1),
NeedleMapKind::InMemory,
None, None, 0,
None,
None,
0,
Version::current(),
).unwrap()
)
.unwrap()
}
#[test]
fn test_data_file_access_control_blocks_writer_until_reader_releases() {
let control = Arc::new(DataFileAccessControl::default());
let read_lease = control.read_lock();
let writer_control = control.clone();
let acquired = Arc::new(std::sync::atomic::AtomicBool::new(false));
let acquired_clone = acquired.clone();
let writer = std::thread::spawn(move || {
let _write_lease = writer_control.write_lock();
acquired_clone.store(true, std::sync::atomic::Ordering::Relaxed);
});
std::thread::sleep(std::time::Duration::from_millis(50));
assert!(!acquired.load(std::sync::atomic::Ordering::Relaxed));
drop(read_lease);
writer.join().unwrap();
assert!(acquired.load(std::sync::atomic::Ordering::Relaxed));
}
#[test]
fn test_volume_file_name() {
assert_eq!(volume_file_name("/data", "", VolumeId(1)), "/data/1");
assert_eq!(volume_file_name("/data", "pics", VolumeId(42)), "/data/pics_42");
assert_eq!(
volume_file_name("/data", "pics", VolumeId(42)),
"/data/pics_42"
);
}
#[test]
@ -1831,7 +2012,10 @@ mod tests {
assert_eq!(v.file_count(), 1);
// Read it back
let mut read_n = Needle { id: NeedleId(1), ..Needle::default() };
let mut read_n = Needle {
id: NeedleId(1),
..Needle::default()
};
let count = v.read_needle(&mut read_n).unwrap();
assert_eq!(count, 11);
assert_eq!(read_n.data, b"hello world");
@ -1882,17 +2066,22 @@ mod tests {
v.write_needle(&mut n, true).unwrap();
assert_eq!(v.file_count(), 1);
let deleted_size = v.delete_needle(&mut Needle {
id: NeedleId(1),
cookie: Cookie(0xbb),
..Needle::default()
}).unwrap();
let deleted_size = v
.delete_needle(&mut Needle {
id: NeedleId(1),
cookie: Cookie(0xbb),
..Needle::default()
})
.unwrap();
assert!(deleted_size.0 > 0);
assert_eq!(v.file_count(), 0);
assert_eq!(v.deleted_count(), 1);
// Read should fail with Deleted
let mut read_n = Needle { id: NeedleId(1), ..Needle::default() };
let mut read_n = Needle {
id: NeedleId(1),
..Needle::default()
};
let err = v.read_needle(&mut read_n).unwrap_err();
assert!(matches!(err, VolumeError::Deleted));
}
@ -1919,7 +2108,10 @@ mod tests {
assert_eq!(v.max_file_key(), NeedleId(10));
// Read back needle 5
let mut n = Needle { id: NeedleId(5), ..Needle::default() };
let mut n = Needle {
id: NeedleId(5),
..Needle::default()
};
v.read_needle(&mut n).unwrap();
assert_eq!(n.data, b"needle data 5");
}
@ -1948,14 +2140,23 @@ mod tests {
// Reload and verify
let v = Volume::new(
dir, dir, "", VolumeId(1),
dir,
dir,
"",
VolumeId(1),
NeedleMapKind::InMemory,
None, None, 0,
None,
None,
0,
Version::current(),
).unwrap();
)
.unwrap();
assert_eq!(v.file_count(), 3);
let mut n = Needle { id: NeedleId(2), ..Needle::default() };
let mut n = Needle {
id: NeedleId(2),
..Needle::default()
};
v.read_needle(&mut n).unwrap();
assert_eq!(std::str::from_utf8(&n.data).unwrap(), "data 2");
}
@ -2065,19 +2266,31 @@ mod tests {
// Dat should be smaller (deleted needle removed)
let dat_size_after = v.dat_file_size().unwrap();
assert!(dat_size_after < dat_size_before, "dat should shrink after compact");
assert!(
dat_size_after < dat_size_before,
"dat should shrink after compact"
);
// Read back live needles
let mut n1 = Needle { id: NeedleId(1), ..Needle::default() };
let mut n1 = Needle {
id: NeedleId(1),
..Needle::default()
};
v.read_needle(&mut n1).unwrap();
assert_eq!(n1.data, b"data-1");
let mut n3 = Needle { id: NeedleId(3), ..Needle::default() };
let mut n3 = Needle {
id: NeedleId(3),
..Needle::default()
};
v.read_needle(&mut n3).unwrap();
assert_eq!(n3.data, b"data-3");
// Needle 2 should not exist
let mut n2 = Needle { id: NeedleId(2), ..Needle::default() };
let mut n2 = Needle {
id: NeedleId(2),
..Needle::default()
};
assert!(v.read_needle(&mut n2).is_err());
// Compact files should not exist after commit

Loading…
Cancel
Save