Browse Source

fix: Go parity review — align Rust volume server with Go behavior

Handlers (handlers.rs):
- Add needle pairs on read response, upload response fields (mime, contentMd5)
- Track original_data_size before compression for accurate size reporting
- Use json_error helper for consistent JSON error responses
- Switch to multer for multipart parsing, filter application/octet-stream
- Set Content-Disposition inline, extract needle name on read
- Delete sets LastModified to now, case-insensitive Bearer prefix

gRPC (grpc_server.rs):
- WriteNeedleBlob now updates needle map index (fixes round-trip test)
- CopyFile uses i64::MAX stop_offset, copies .ecj files for EC volumes
- EC reconstruction uses proper shard-level decoding
- TailSender uses binary search for since_ns > 0
- Drain timeout and collection validation on allocate

Storage:
- Fix needle padding (1-8 bytes, not 0-7) matching Go behavior
- Additive-only needle map metrics (never decrement counters)
- Track max_file_key on delete during idx loading
- Volume TTL inheritance for needles, garbage_level formula fix
- MAX_POSSIBLE_VOLUME_SIZE corrected to 32GB
- .vif path includes collection prefix

Config/Security:
- Parse [guard] white_list from security.toml
- Heartbeat leader redirect with 3s sleep
- Duplicate UUID exponential backoff retry

All tests pass: 82 gRPC + 61 HTTP + 9 Rust integration = 152/152
rust-volume-server
Chris Lu 2 days ago
parent
commit
216d193675
  1. 2
      seaweed-volume/Cargo.lock
  2. 6
      seaweed-volume/Cargo.toml
  3. 28
      seaweed-volume/src/config.rs
  4. 177
      seaweed-volume/src/server/grpc_server.rs
  5. 348
      seaweed-volume/src/server/handlers.rs
  6. 89
      seaweed-volume/src/server/heartbeat.rs
  7. 44
      seaweed-volume/src/storage/erasure_coding/ec_decoder.rs
  8. 4
      seaweed-volume/src/storage/needle/needle.rs
  9. 77
      seaweed-volume/src/storage/needle_map.rs
  10. 2
      seaweed-volume/src/storage/types.rs
  11. 73
      seaweed-volume/src/storage/volume.rs

2
seaweed-volume/Cargo.lock

@ -3526,6 +3526,8 @@ dependencies = [
"libc",
"md-5",
"memmap2",
"mime_guess",
"multer",
"parking_lot 0.12.5",
"pprof",
"prometheus",

6
seaweed-volume/Cargo.toml

@ -82,6 +82,12 @@ flate2 = "1"
image = { version = "0.25", default-features = false, features = ["png", "jpeg", "gif"] }
kamadak-exif = "0.5"
# Multipart form-data parsing
multer = "3"
# MIME type guessing from file extensions
mime_guess = "2"
# Misc
bytes = "1"
rand = "0.8"

28
seaweed-volume/src/config.rs

@ -538,21 +538,22 @@ fn resolve_config(cli: Cli) -> VolumeServerConfig {
other => panic!("Unknown readMode: {}. Use local|proxy|redirect", other),
};
// Parse whitelist
let white_list: Vec<String> = cli
// Parse security config from TOML file
let sec = parse_security_config(&cli.security_file);
// Parse whitelist: merge CLI --whiteList with guard.white_list from security.toml
let mut white_list: Vec<String> = cli
.white_list
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
white_list.extend(sec.guard_white_list.iter().cloned());
// Parse durations
let inflight_upload_data_timeout = parse_duration(&cli.inflight_upload_data_timeout);
let inflight_download_data_timeout = parse_duration(&cli.inflight_download_data_timeout);
// Parse security config from TOML file
let sec = parse_security_config(&cli.security_file);
VolumeServerConfig {
port: cli.port,
grpc_port,
@ -615,6 +616,8 @@ pub struct SecurityConfig {
pub https_key_file: String,
pub grpc_cert_file: String,
pub grpc_key_file: String,
/// IPs from [guard] white_list in security.toml
pub guard_white_list: Vec<String>,
}
/// Parse a security.toml file to extract JWT signing keys and TLS configuration.
@ -654,6 +657,7 @@ fn parse_security_config(path: &str) -> SecurityConfig {
JwtSigningRead,
HttpsVolume,
GrpcVolume,
Guard,
}
let mut section = Section::None;
@ -679,6 +683,10 @@ fn parse_security_config(path: &str) -> SecurityConfig {
section = Section::GrpcVolume;
continue;
}
if trimmed == "[guard]" {
section = Section::Guard;
continue;
}
if trimmed.starts_with('[') {
section = Section::None;
continue;
@ -708,6 +716,16 @@ fn parse_security_config(path: &str) -> SecurityConfig {
"key" => cfg.grpc_key_file = value.to_string(),
_ => {}
},
Section::Guard => match key {
"white_list" => {
cfg.guard_white_list = value
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
}
_ => {}
},
Section::None => {}
}
}

177
seaweed-volume/src/server/grpc_server.rs

@ -1089,22 +1089,15 @@ impl VolumeServer for VolumeGrpcService {
self.state.check_maintenance()?;
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let needle_id = NeedleId(req.needle_id);
let size = Size(req.size);
let mut store = self.state.store.write().unwrap();
let (_, vol) = store.find_volume_mut(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
// Write the raw needle blob at the end of the dat file (append)
let dat_size = vol.dat_file_size()
.map_err(|e| Status::internal(e.to_string()))? as i64;
vol.write_needle_blob(dat_size, &req.needle_blob)
.map_err(|e| Status::internal(e.to_string()))?;
// Update the needle index so the written blob is discoverable
let needle_id = NeedleId(req.needle_id);
let size = Size(req.size);
vol.put_needle_index(needle_id, Offset::from_actual_offset(dat_size), size)
.map_err(|e| Status::internal(e.to_string()))?;
vol.write_needle_blob_and_index(needle_id, &req.needle_blob, size)
.map_err(|e| Status::internal(format!("write blob needle {} size {}: {}", needle_id.0, size.0, e)))?;
Ok(Response::new(volume_server_pb::WriteNeedleBlobResponse {}))
}
@ -1184,11 +1177,25 @@ impl VolumeServer for VolumeGrpcService {
let mut draining_seconds = idle_timeout;
loop {
// Scan for new needles
// Use binary search to find starting offset, then scan from there
let scan_result = {
let store = state.store.read().unwrap();
if let Some((_, vol)) = store.find_volume(vid) {
vol.scan_raw_needles_from(sb_size)
let start_offset = if last_timestamp_ns > 0 {
match vol.binary_search_by_append_at_ns(last_timestamp_ns) {
Ok((offset, _is_last)) => {
if offset.is_zero() {
sb_size
} else {
offset.to_actual_offset() as u64
}
}
Err(_) => sb_size,
}
} else {
sb_size
};
vol.scan_raw_needles_from(start_offset)
} else {
break;
}
@ -1240,7 +1247,7 @@ impl VolumeServer for VolumeGrpcService {
}
}
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
if idle_timeout == 0 {
last_timestamp_ns = last_processed_ns;
@ -1351,12 +1358,19 @@ impl VolumeServer for VolumeGrpcService {
let vid = VolumeId(req.volume_id);
let collection = &req.collection;
// Find the volume's directory
let dir = {
// Find the volume's directory and validate collection
let (dir, vol_version, dat_file_size) = {
let store = self.state.store.read().unwrap();
let (loc_idx, _) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
store.locations[loc_idx].directory.clone()
let (loc_idx, vol) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
if vol.collection != req.collection {
return Err(Status::internal(format!(
"existing collection:{} unexpected input: {}", vol.collection, req.collection
)));
}
let version = vol.version().0 as u32;
let dat_size = vol.dat_file_size().unwrap_or(0) as i64;
(store.locations[loc_idx].directory.clone(), version, dat_size)
};
let (data_shards, parity_shards) = crate::storage::erasure_coding::ec_volume::read_ec_shard_config(&dir, vid);
@ -1364,6 +1378,25 @@ impl VolumeServer for VolumeGrpcService {
crate::storage::erasure_coding::ec_encoder::write_ec_files(&dir, collection, vid, data_shards as usize, parity_shards as usize)
.map_err(|e| Status::internal(e.to_string()))?;
// Write .vif file with EC shard metadata
{
let base = crate::storage::volume::volume_file_name(&dir, collection, vid);
let vif_path = format!("{}.vif", base);
let vif = crate::storage::volume::VifVolumeInfo {
version: vol_version,
dat_file_size,
ec_shard_config: Some(crate::storage::volume::VifEcShardConfig {
data_shards: data_shards,
parity_shards: parity_shards,
}),
..Default::default()
};
let content = serde_json::to_string_pretty(&vif)
.map_err(|e| Status::internal(format!("serialize vif: {}", e)))?;
std::fs::write(&vif_path, content)
.map_err(|e| Status::internal(format!("write vif: {}", e)))?;
}
Ok(Response::new(volume_server_pb::VolumeEcShardsGenerateResponse {}))
}
@ -1467,7 +1500,7 @@ impl VolumeServer for VolumeGrpcService {
is_ec_volume: true,
ext: ext.clone(),
compaction_revision: u32::MAX,
stop_offset: 0,
stop_offset: i64::MAX as u64,
..Default::default()
};
let mut stream = client.copy_file(copy_req).await
@ -1496,7 +1529,7 @@ impl VolumeServer for VolumeGrpcService {
is_ec_volume: true,
ext: ".ecx".to_string(),
compaction_revision: u32::MAX,
stop_offset: 0,
stop_offset: i64::MAX as u64,
..Default::default()
};
let mut stream = client.copy_file(copy_req).await
@ -1517,6 +1550,36 @@ impl VolumeServer for VolumeGrpcService {
}
}
// Copy .ecj file if requested
if req.copy_ecj_file {
let copy_req = volume_server_pb::CopyFileRequest {
volume_id: req.volume_id,
collection: req.collection.clone(),
is_ec_volume: true,
ext: ".ecj".to_string(),
compaction_revision: u32::MAX,
stop_offset: i64::MAX as u64,
ignore_source_file_not_found: true,
..Default::default()
};
let mut stream = client.copy_file(copy_req).await
.map_err(|e| Status::internal(format!("VolumeEcShardsCopy volume {} copy .ecj: {}", vid, e)))?
.into_inner();
let file_path = {
let base = crate::storage::volume::volume_file_name(&dest_dir, &req.collection, vid);
format!("{}.ecj", base)
};
let mut file = std::fs::File::create(&file_path)
.map_err(|e| Status::internal(format!("create {}: {}", file_path, e)))?;
while let Some(chunk) = stream.message().await
.map_err(|e| Status::internal(format!("recv .ecj: {}", e)))? {
use std::io::Write;
file.write_all(&chunk.file_content)
.map_err(|e| Status::internal(format!("write {}: {}", file_path, e)))?;
}
}
// Copy .vif file if requested
if req.copy_vif_file {
let copy_req = volume_server_pb::CopyFileRequest {
@ -1525,7 +1588,7 @@ impl VolumeServer for VolumeGrpcService {
is_ec_volume: true,
ext: ".vif".to_string(),
compaction_revision: u32::MAX,
stop_offset: 0,
stop_offset: i64::MAX as u64,
..Default::default()
};
let mut stream = client.copy_file(copy_req).await
@ -1672,38 +1735,41 @@ impl VolumeServer for VolumeGrpcService {
let ec_vol = store.ec_volumes.get(&vid)
.ok_or_else(|| Status::not_found(format!("ec volume {} not found", req.volume_id)))?;
// Check that all 10 data shards are present
use crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT;
for shard_id in 0..DATA_SHARDS_COUNT as u8 {
if ec_vol.shards.get(shard_id as usize).map(|s| s.is_none()).unwrap_or(true) {
if ec_vol.collection != req.collection {
return Err(Status::internal(format!(
"existing collection:{} unexpected input: {}", ec_vol.collection, req.collection
)));
}
// Use EC context data shard count from the volume
let data_shards = ec_vol.data_shards as usize;
// Check that all data shards are present
for shard_id in 0..data_shards {
if ec_vol.shards.get(shard_id).map(|s| s.is_none()).unwrap_or(true) {
return Err(Status::internal(format!(
"ec volume {} missing shard {}", req.volume_id, shard_id
)));
}
}
// Read the .ecx index to find all needles
// Read the .ecx index to check for live entries
let ecx_path = ec_vol.ecx_file_name();
let ecx_data = std::fs::read(&ecx_path)
.map_err(|e| Status::internal(format!("read ecx: {}", e)))?;
let entry_count = ecx_data.len() / NEEDLE_MAP_ENTRY_SIZE;
// Read deleted needles from .ecj
let deleted_needles = ec_vol.read_deleted_needles()
.map_err(|e| Status::internal(e.to_string()))?;
// Count live entries
let mut live_count = 0;
let mut has_live = false;
for i in 0..entry_count {
let start = i * NEEDLE_MAP_ENTRY_SIZE;
let (key, _offset, size) = idx_entry_from_bytes(&ecx_data[start..start + NEEDLE_MAP_ENTRY_SIZE]);
if size.is_deleted() || deleted_needles.contains(&key) {
continue;
let (_, _, size) = idx_entry_from_bytes(&ecx_data[start..start + NEEDLE_MAP_ENTRY_SIZE]);
if !size.is_deleted() {
has_live = true;
break;
}
live_count += 1;
}
if live_count == 0 {
if !has_live {
return Err(Status::failed_precondition(format!(
"ec volume {} has no live entries", req.volume_id
)));
@ -1714,34 +1780,17 @@ impl VolumeServer for VolumeGrpcService {
let collection = ec_vol.collection.clone();
drop(store);
// Read all shard data and reconstruct the .dat file
// For simplicity, concatenate the first DATA_SHARDS_COUNT shards
let mut dat_data = Vec::new();
{
let store = self.state.store.read().unwrap();
let ec_vol = store.ec_volumes.get(&vid).unwrap();
for shard_id in 0..DATA_SHARDS_COUNT as u8 {
if let Some(Some(shard)) = ec_vol.shards.get(shard_id as usize) {
let shard_size = shard.file_size() as usize;
let mut buf = vec![0u8; shard_size];
let n = shard.read_at(&mut buf, 0)
.map_err(|e| Status::internal(format!("read shard {}: {}", shard_id, e)))?;
buf.truncate(n);
dat_data.extend_from_slice(&buf);
}
}
}
// Calculate .dat file size from .ecx entries
let dat_file_size = crate::storage::erasure_coding::ec_decoder::find_dat_file_size(&dir, &collection, vid)
.map_err(|e| Status::internal(format!("FindDatFileSize: {}", e)))?;
// Write the reconstructed .dat file
let base = crate::storage::volume::volume_file_name(&dir, &collection, vid);
let dat_path = format!("{}.dat", base);
std::fs::write(&dat_path, &dat_data)
.map_err(|e| Status::internal(format!("write dat: {}", e)))?;
// Write .dat file using block-interleaved reading from shards
crate::storage::erasure_coding::ec_decoder::write_dat_file_from_shards(&dir, &collection, vid, dat_file_size, data_shards)
.map_err(|e| Status::internal(format!("WriteDatFile: {}", e)))?;
// Copy the .ecx to .idx (they have the same format)
let idx_path = format!("{}.idx", base);
std::fs::copy(&ecx_path, &idx_path)
.map_err(|e| Status::internal(format!("copy ecx to idx: {}", e)))?;
// Write .idx file from .ecx and .ecj files
crate::storage::erasure_coding::ec_decoder::write_idx_file_from_ec_index(&dir, &collection, vid)
.map_err(|e| Status::internal(format!("WriteIdxFileFromEcIndex: {}", e)))?;
// Unmount EC shards and mount the reconstructed volume
{

348
seaweed-volume/src/server/handlers.rs

@ -844,15 +844,77 @@ async fn get_or_head_handler_inner(
let mut response_headers = HeaderMap::new();
response_headers.insert(header::ETAG, etag.parse().unwrap());
// Set Content-Type: use response-content-type query param override, else from needle mime
// H1: Emit pairs as response headers
if n.has_pairs() && !n.pairs.is_empty() {
if let Ok(pair_map) =
serde_json::from_slice::<std::collections::HashMap<String, String>>(&n.pairs)
{
for (k, v) in &pair_map {
if let (Ok(hname), Ok(hval)) = (
axum::http::HeaderName::from_bytes(k.as_bytes()),
axum::http::HeaderValue::from_str(v),
) {
response_headers.insert(hname, hval);
}
}
}
}
// H8: Use needle stored name when URL path has no filename (only vid,fid)
let mut filename = extract_filename_from_path(&path);
let mut ext = ext;
if n.name_size > 0 && filename.is_empty() {
filename = String::from_utf8_lossy(&n.name).to_string();
if ext.is_empty() {
if let Some(dot_pos) = filename.rfind('.') {
ext = filename[dot_pos..].to_lowercase();
}
}
}
// H6: Determine Content-Type: filter application/octet-stream, use mime_guess
let content_type = if let Some(ref ct) = query.response_content_type {
ct.clone()
} else if !n.mime.is_empty() {
String::from_utf8_lossy(&n.mime).to_string()
Some(ct.clone())
} else {
"application/octet-stream".to_string()
// Get MIME from needle, but filter out application/octet-stream
let needle_mime = if !n.mime.is_empty() {
let mt = String::from_utf8_lossy(&n.mime).to_string();
if mt.starts_with("application/octet-stream") {
String::new()
} else {
mt
}
} else {
String::new()
};
if !needle_mime.is_empty() {
Some(needle_mime)
} else {
// Fall through to extension-based detection
let detect_ext = if !ext.is_empty() {
ext.clone()
} else if !filename.is_empty() {
if let Some(dot_pos) = filename.rfind('.') {
filename[dot_pos..].to_lowercase()
} else {
String::new()
}
} else {
String::new()
};
if !detect_ext.is_empty() {
mime_guess::from_ext(detect_ext.trim_start_matches('.'))
.first()
.map(|m| m.to_string())
} else {
None // Omit Content-Type entirely
}
}
};
response_headers.insert(header::CONTENT_TYPE, content_type.parse().unwrap());
if let Some(ref ct) = content_type {
response_headers.insert(header::CONTENT_TYPE, ct.parse().unwrap());
}
// Cache-Control override from query param
if let Some(ref cc) = query.response_cache_control {
@ -878,16 +940,23 @@ async fn get_or_head_handler_inner(
response_headers.insert(header::LAST_MODIFIED, lm.parse().unwrap());
}
// Content-Disposition for download
if query.dl.is_some() {
// Extract filename from URL path
let filename = extract_filename_from_path(&path);
let disposition = if filename.is_empty() {
"attachment".to_string()
// H7: Content-Disposition — inline by default, attachment only when dl is truthy
// Only set if not already set by response-content-disposition query param
if !response_headers.contains_key(header::CONTENT_DISPOSITION) && !filename.is_empty() {
let disposition_type = if let Some(ref dl_val) = query.dl {
// Parse dl as bool: "true", "1" -> attachment; anything else -> inline
if dl_val == "true" || dl_val == "1" {
"attachment"
} else {
"inline"
}
} else {
format!("attachment; filename=\"{}\"", filename)
"inline"
};
response_headers.insert(header::CONTENT_DISPOSITION, disposition.parse().unwrap());
let disposition = format!("{}; filename=\"{}\"", disposition_type, filename);
if let Ok(hval) = disposition.parse() {
response_headers.insert(header::CONTENT_DISPOSITION, hval);
}
}
// ---- Streaming path: large uncompressed files ----
@ -1232,6 +1301,10 @@ struct UploadResult {
size: u32,
#[serde(rename = "eTag")]
etag: String,
#[serde(skip_serializing_if = "Option::is_none")]
mime: Option<String>,
#[serde(rename = "contentMd5", skip_serializing_if = "Option::is_none")]
content_md5: Option<String>,
}
pub async fn post_handler(
@ -1247,7 +1320,7 @@ pub async fn post_handler(
let (vid, needle_id, cookie) = match parse_url_path(&path) {
Some(parsed) => parsed,
None => return (StatusCode::BAD_REQUEST, "invalid URL path").into_response(),
None => return json_error(StatusCode::BAD_REQUEST, "invalid URL path"),
};
// JWT check for writes
@ -1257,7 +1330,7 @@ pub async fn post_handler(
.guard
.check_jwt_for_file(token.as_deref(), &file_id, true)
{
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
return json_error(StatusCode::UNAUTHORIZED, format!("JWT error: {}", e));
}
// Upload throttling: check inflight bytes against limit
@ -1287,7 +1360,7 @@ pub async fn post_handler(
.await
.is_err()
{
return (StatusCode::TOO_MANY_REQUESTS, "upload limit exceeded").into_response();
return json_error(StatusCode::TOO_MANY_REQUESTS, "upload limit exceeded");
}
}
state
@ -1313,11 +1386,10 @@ pub async fn post_handler(
if let Some(ct) = headers.get(header::CONTENT_TYPE) {
if let Ok(ct_str) = ct.to_str() {
if ct_str.starts_with("multipart/form-data") && !ct_str.contains("boundary=") {
return (
return json_error(
StatusCode::BAD_REQUEST,
"no multipart boundary param in Content-Type",
)
.into_response();
);
}
}
}
@ -1330,12 +1402,83 @@ pub async fn post_handler(
// Read body
let body = match axum::body::to_bytes(request.into_body(), usize::MAX).await {
Ok(b) => b,
Err(e) => return (StatusCode::BAD_REQUEST, format!("read body: {}", e)).into_response(),
Err(e) => return json_error(StatusCode::BAD_REQUEST, format!("read body: {}", e)),
};
// H5: Multipart form-data parsing
let content_type_str = headers
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_string();
let (body_data_raw, parsed_filename, parsed_content_type, _parsed_content_encoding) =
if content_type_str.starts_with("multipart/form-data") {
// Extract boundary from Content-Type
let boundary = content_type_str
.split(';')
.find_map(|part| {
let part = part.trim();
if let Some(val) = part.strip_prefix("boundary=") {
Some(val.trim_matches('"').to_string())
} else {
None
}
})
.unwrap_or_default();
let mut multipart =
multer::Multipart::new(futures::stream::once(async { Ok::<_, std::io::Error>(body.clone()) }), boundary);
let mut file_data: Option<Vec<u8>> = None;
let mut file_name: Option<String> = None;
let mut file_content_type: Option<String> = None;
let mut file_content_encoding: Option<String> = None;
while let Ok(Some(field)) = multipart.next_field().await {
let field_name = field.name().map(|s| s.to_string());
let fname = field.file_name().map(|s| {
// Clean Windows backslashes
let cleaned = s.replace('\\', "/");
cleaned
.rsplit('/')
.next()
.unwrap_or(&cleaned)
.to_string()
});
let fct = field.content_type().map(|m| m.to_string());
if let Ok(data) = field.bytes().await {
if file_data.is_none() {
// First file field
file_data = Some(data.to_vec());
file_name = fname;
file_content_type = fct;
// Content-Encoding comes from headers, not multipart field
file_content_encoding = None;
}
let _ = field_name; // suppress unused warning
}
}
if let Some(data) = file_data {
(
data,
file_name.unwrap_or_default(),
file_content_type,
file_content_encoding,
)
} else {
// No file field found, use raw body
(body.to_vec(), String::new(), None, None)
}
} else {
(body.to_vec(), String::new(), None, None)
};
// Check file size limit
if state.file_size_limit_bytes > 0 && body.len() as i64 > state.file_size_limit_bytes {
return (StatusCode::BAD_REQUEST, "file size limit exceeded").into_response();
if state.file_size_limit_bytes > 0 && body_data_raw.len() as i64 > state.file_size_limit_bytes {
return json_error(StatusCode::BAD_REQUEST, "file size limit exceeded");
}
// Validate Content-MD5 if provided
@ -1343,17 +1486,16 @@ pub async fn post_handler(
use base64::Engine;
use md5::{Digest, Md5};
let mut hasher = Md5::new();
hasher.update(&body);
hasher.update(&body_data_raw);
let actual = base64::engine::general_purpose::STANDARD.encode(hasher.finalize());
if actual != *expected_md5 {
return (
return json_error(
StatusCode::BAD_REQUEST,
format!(
"Content-MD5 mismatch: expected {} got {}",
expected_md5, actual
),
)
.into_response();
);
}
}
@ -1380,18 +1522,22 @@ pub async fn post_handler(
.map(|s| s == "gzip")
.unwrap_or(false);
// Extract MIME type from Content-Type header (needed early for JPEG orientation fix)
let mime_type = headers
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map(|ct| {
if ct.starts_with("multipart/") {
"application/octet-stream".to_string()
} else {
ct.to_string()
}
})
.unwrap_or_else(|| "application/octet-stream".to_string());
// Extract MIME type: prefer multipart-parsed content type, else from Content-Type header
let mime_type = if let Some(ref pct) = parsed_content_type {
pct.clone()
} else {
headers
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map(|ct| {
if ct.starts_with("multipart/") {
"application/octet-stream".to_string()
} else {
ct.to_string()
}
})
.unwrap_or_else(|| "application/octet-stream".to_string())
};
// Parse TTL from query param (matches Go's r.FormValue("ttl"))
let ttl_str = query
@ -1425,11 +1571,14 @@ pub async fn post_handler(
// Fix JPEG orientation from EXIF data before storing (matches Go behavior).
// Only for non-compressed uploads that are JPEG files.
let body_data = if !is_gzipped && crate::images::is_jpeg(&mime_type, &path) {
crate::images::fix_jpg_orientation(&body)
crate::images::fix_jpg_orientation(&body_data_raw)
} else {
body.to_vec()
body_data_raw
};
// H3: Capture original data size BEFORE auto-compression
let original_data_size = body_data.len() as u32;
// Auto-compress compressible file types (matches Go's IsCompressableFileType).
// Only compress if not already gzipped and compression saves >10%.
let (final_data, final_is_gzipped) = if !is_gzipped && !is_chunk_manifest {
@ -1497,7 +1646,12 @@ pub async fn post_handler(
}
// Set filename on needle (matches Go: n.Name = []byte(pu.FileName))
let filename = extract_filename_from_path(&path);
// Prefer multipart-parsed filename, else extract from URL path
let filename = if !parsed_filename.is_empty() {
parsed_filename
} else {
extract_filename_from_path(&path)
};
if !filename.is_empty() && filename.len() < 256 {
n.name = filename.as_bytes().to_vec();
n.name_size = filename.len() as u8;
@ -1533,11 +1687,10 @@ pub async fn post_handler(
.await
{
tracing::error!("replicated write failed: {}", e);
return (
return json_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("replication failed: {}", e),
)
.into_response();
);
}
}
}
@ -1548,28 +1701,46 @@ pub async fn post_handler(
let etag = format!("\"{}\"", n.etag());
(StatusCode::NO_CONTENT, [(header::ETAG, etag)]).into_response()
} else {
// H2: Compute Content-MD5 as base64(md5(original_data))
let content_md5_value = {
use base64::Engine;
use md5::{Digest, Md5};
let mut hasher = Md5::new();
hasher.update(&n.data);
base64::engine::general_purpose::STANDARD.encode(hasher.finalize())
};
let result = UploadResult {
name: filename.clone(),
size: n.data_size,
size: original_data_size, // H3: use original size, not compressed
etag: n.etag(),
mime: if mime_type.is_empty() {
None
} else {
Some(mime_type.clone())
},
content_md5: Some(content_md5_value.clone()),
};
metrics::REQUEST_DURATION
.with_label_values(&["write"])
.observe(start.elapsed().as_secs_f64());
(StatusCode::CREATED, axum::Json(result)).into_response()
let mut resp = (StatusCode::CREATED, axum::Json(result)).into_response();
resp.headers_mut().insert(
"Content-MD5",
content_md5_value.parse().unwrap(),
);
resp
}
}
Err(crate::storage::volume::VolumeError::NotFound) => {
(StatusCode::NOT_FOUND, "volume not found").into_response()
json_error(StatusCode::NOT_FOUND, "volume not found")
}
Err(crate::storage::volume::VolumeError::ReadOnly) => {
(StatusCode::FORBIDDEN, "volume is read-only").into_response()
json_error(StatusCode::FORBIDDEN, "volume is read-only")
}
Err(e) => (
Err(e) => json_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("write error: {}", e),
)
.into_response(),
),
};
// _upload_guard drops here, releasing inflight bytes
@ -1599,7 +1770,7 @@ pub async fn delete_handler(
let (vid, needle_id, cookie) = match parse_url_path(&path) {
Some(parsed) => parsed,
None => return (StatusCode::BAD_REQUEST, "invalid URL path").into_response(),
None => return json_error(StatusCode::BAD_REQUEST, "invalid URL path"),
};
// JWT check for writes (deletes use write key)
@ -1609,19 +1780,27 @@ pub async fn delete_handler(
.guard
.check_jwt_for_file(token.as_deref(), &file_id, true)
{
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
return json_error(StatusCode::UNAUTHORIZED, format!("JWT error: {}", e));
}
// Parse custom timestamp from query param
// H9: Parse custom timestamp from query param; default to now (not 0)
let del_query = request.uri().query().unwrap_or("");
let del_ts_str = del_query
.split('&')
.find_map(|p| p.strip_prefix("ts="))
.unwrap_or("");
let del_last_modified = if !del_ts_str.is_empty() {
del_ts_str.parse::<u64>().unwrap_or(0)
del_ts_str.parse::<u64>().unwrap_or_else(|_| {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
})
} else {
0
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
};
let mut n = Needle {
@ -1643,18 +1822,15 @@ pub async fn delete_handler(
}
}
if n.cookie != original_cookie {
return (
return json_error(
StatusCode::BAD_REQUEST,
"File Random Cookie does not match.",
)
.into_response();
);
}
// Apply custom timestamp if provided
if del_last_modified > 0 {
n.last_modified = del_last_modified;
n.set_has_last_modified_date();
}
// Apply custom timestamp (always set — defaults to now per H9)
n.last_modified = del_last_modified;
n.set_has_last_modified_date();
// If this is a chunk manifest, delete child chunks first
if n.is_chunk_manifest() {
@ -1678,11 +1854,10 @@ pub async fn delete_handler(
let (chunk_vid, chunk_nid, chunk_cookie) = match parse_url_path(&chunk.fid) {
Some(p) => p,
None => {
return (
return json_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("invalid chunk fid: {}", chunk.fid),
)
.into_response();
);
}
};
let mut chunk_needle = Needle {
@ -1694,31 +1869,28 @@ pub async fn delete_handler(
{
let store = state.store.read().unwrap();
if let Err(e) = store.read_volume_needle(chunk_vid, &mut chunk_needle) {
return (
return json_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("read chunk {}: {}", chunk.fid, e),
)
.into_response();
);
}
}
// Delete the chunk
let mut store = state.store.write().unwrap();
if let Err(e) = store.delete_volume_needle(chunk_vid, &mut chunk_needle) {
return (
return json_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("delete chunk {}: {}", chunk.fid, e),
)
.into_response();
);
}
}
// Delete the manifest itself
let mut store = state.store.write().unwrap();
if let Err(e) = store.delete_volume_needle(vid, &mut n) {
return (
return json_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("delete manifest: {}", e),
)
.into_response();
);
}
metrics::REQUEST_DURATION
.with_label_values(&["delete"])
@ -1757,11 +1929,10 @@ pub async fn delete_handler(
.await
{
tracing::error!("replicated delete failed: {}", e);
return (
return json_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("replication failed: {}", e),
)
.into_response();
);
}
}
}
@ -1782,11 +1953,10 @@ pub async fn delete_handler(
let result = DeleteResult { size: 0 };
(StatusCode::NOT_FOUND, axum::Json(result)).into_response()
}
Err(e) => (
Err(e) => json_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("delete error: {}", e),
)
.into_response(),
),
}
}
@ -2150,6 +2320,12 @@ fn try_expand_chunk_manifest(
// Helpers
// ============================================================================
/// Return a JSON error response: `{"error": "<msg>"}`.
fn json_error(status: StatusCode, msg: impl Into<String>) -> Response {
let body = serde_json::json!({"error": msg.into()});
(status, axum::Json(body)).into_response()
}
/// Extract JWT token from query param, Authorization header, or Cookie.
/// Query param takes precedence over header, header over cookie.
fn extract_jwt(headers: &HeaderMap, uri: &axum::http::Uri) -> Option<String> {
@ -2164,11 +2340,11 @@ fn extract_jwt(headers: &HeaderMap, uri: &axum::http::Uri) -> Option<String> {
}
}
// 2. Check Authorization: Bearer <token>
// 2. Check Authorization: Bearer <token> (case-insensitive prefix)
if let Some(auth) = headers.get(header::AUTHORIZATION) {
if let Ok(auth_str) = auth.to_str() {
if let Some(token) = auth_str.strip_prefix("Bearer ") {
return Some(token.to_string());
if auth_str.len() >= 7 && auth_str[..7].eq_ignore_ascii_case("bearer ") {
return Some(auth_str[7..].to_string());
}
}
}

89
seaweed-volume/src/server/heartbeat.rs

@ -31,6 +31,11 @@ pub struct HeartbeatConfig {
}
/// Run the heartbeat loop using VolumeServerState.
///
/// Mirrors Go's `volume_grpc_client_to_master.go` heartbeat():
/// - On leader redirect: sleep 3s, then connect directly to the new leader
/// - On duplicate UUID error: exponential backoff (2s, 4s, 8s), exit after 3 retries
/// - On other errors: sleep pulse interval, reset to seed master list iteration
pub async fn run_heartbeat_with_state(
config: HeartbeatConfig,
state: Arc<VolumeServerState>,
@ -42,6 +47,8 @@ pub async fn run_heartbeat_with_state(
);
let pulse = Duration::from_secs(config.pulse_seconds.max(1));
let mut new_leader: Option<String> = None;
let mut duplicate_retry_count: u32 = 0;
loop {
for master_addr in &config.master_addresses {
@ -51,21 +58,93 @@ pub async fn run_heartbeat_with_state(
return;
}
let grpc_addr = to_grpc_address(master_addr);
// If we have a leader redirect, sleep 3s then connect to the leader
// instead of iterating through the seed list
let target_addr = if let Some(ref leader) = new_leader {
tokio::time::sleep(Duration::from_secs(3)).await;
leader.clone()
} else {
master_addr.clone()
};
let grpc_addr = to_grpc_address(&target_addr);
info!("Connecting heartbeat to master {}", grpc_addr);
match do_heartbeat(&config, &state, &grpc_addr, pulse, &mut shutdown_rx).await {
// Determine what action to take after the heartbeat attempt.
// We convert the error to a string immediately so the non-Send
// Box<dyn Error> is dropped before any .await point.
enum PostAction {
LeaderRedirect(String),
Done,
SleepDuplicate(Duration),
SleepPulse,
}
let action = match do_heartbeat(&config, &state, &grpc_addr, pulse, &mut shutdown_rx).await {
Ok(Some(leader)) => {
info!("Master leader changed to {}", leader);
PostAction::LeaderRedirect(leader)
}
Ok(None) => {
duplicate_retry_count = 0;
PostAction::Done
}
Ok(None) => {}
Err(e) => {
let err_msg = e.to_string();
// Drop `e` (non-Send) before any .await
drop(e);
state.is_heartbeating.store(false, Ordering::Relaxed);
warn!("Heartbeat to {} error: {}", grpc_addr, e);
warn!("Heartbeat to {} error: {}", grpc_addr, err_msg);
if err_msg.contains("duplicate") && err_msg.contains("UUID") {
duplicate_retry_count += 1;
if duplicate_retry_count > 3 {
error!("Shut down Volume Server due to persistent duplicate volume directories after 3 retries");
error!("Please check if another volume server is using the same directory");
std::process::exit(1);
}
let retry_delay = Duration::from_secs(2u64.pow(duplicate_retry_count));
warn!(
"Waiting {:?} before retrying due to duplicate UUID detection (attempt {}/3)...",
retry_delay, duplicate_retry_count
);
PostAction::SleepDuplicate(retry_delay)
} else {
duplicate_retry_count = 0;
PostAction::SleepPulse
}
}
};
match action {
PostAction::LeaderRedirect(leader) => {
new_leader = Some(leader);
break;
}
PostAction::Done => {
new_leader = None;
}
PostAction::SleepDuplicate(delay) => {
new_leader = None;
tokio::time::sleep(delay).await;
}
PostAction::SleepPulse => {
new_leader = None;
tokio::time::sleep(pulse).await;
}
}
// If we connected to a leader (not seed list), break out after one attempt
// so we either reconnect to the new leader or fall back to seed list
if new_leader.is_some() {
break;
}
}
// If we have a leader redirect, skip the sleep and reconnect immediately
if new_leader.is_some() {
continue;
}
tokio::select! {
_ = tokio::time::sleep(pulse) => {}
_ = shutdown_rx.recv() => {
@ -213,6 +292,8 @@ async fn do_heartbeat(
let delta_hb = master_pb::Heartbeat {
ip: config.ip.clone(),
port: config.port as u32,
grpc_port: config.grpc_port as u32,
public_url: config.public_url.clone(),
data_center: config.data_center.clone(),
rack: config.rack.clone(),
new_volumes: new_vols,

44
seaweed-volume/src/storage/erasure_coding/ec_decoder.rs

@ -4,13 +4,55 @@
//! and the sorted index (.ecx) + deletion journal (.ecj).
use std::fs::File;
use std::io::{self, Write};
use std::io::{self, Read, Write};
use crate::storage::erasure_coding::ec_shard::*;
use crate::storage::idx;
use crate::storage::needle::needle::get_actual_size;
use crate::storage::super_block::SUPER_BLOCK_SIZE;
use crate::storage::types::*;
use crate::storage::volume::volume_file_name;
/// Calculate .dat file size from the max offset entry in .ecx.
/// Reads the volume version from the first EC shard (.ec00) superblock,
/// then scans .ecx entries to find the largest (offset + needle_actual_size).
pub fn find_dat_file_size(
dir: &str,
collection: &str,
volume_id: VolumeId,
) -> io::Result<i64> {
let base = volume_file_name(dir, collection, volume_id);
// Read volume version from .ec00 superblock
let ec00_path = format!("{}.ec00", base);
let mut ec00 = File::open(&ec00_path)?;
let mut sb_buf = [0u8; SUPER_BLOCK_SIZE];
ec00.read_exact(&mut sb_buf)?;
let version = Version(sb_buf[0]);
// Start with at least the superblock size
let mut dat_size: i64 = SUPER_BLOCK_SIZE as i64;
// Scan .ecx entries
let ecx_path = format!("{}.ecx", base);
let ecx_data = std::fs::read(&ecx_path)?;
let entry_count = ecx_data.len() / NEEDLE_MAP_ENTRY_SIZE;
for i in 0..entry_count {
let start = i * NEEDLE_MAP_ENTRY_SIZE;
let (_, offset, size) = idx_entry_from_bytes(&ecx_data[start..start + NEEDLE_MAP_ENTRY_SIZE]);
if size.is_deleted() {
continue;
}
let entry_stop = offset.to_actual_offset() + get_actual_size(size, version);
if entry_stop > dat_size {
dat_size = entry_stop;
}
}
Ok(dat_size)
}
/// Reconstruct a .dat file from EC data shards.
///
/// Reads from .ec00-.ec09 and writes a new .dat file.

4
seaweed-volume/src/storage/needle/needle.rs

@ -446,9 +446,9 @@ impl Needle {
/// Compute padding to align needle to NEEDLE_PADDING_SIZE (8 bytes).
pub fn padding_length(needle_size: Size, version: Version) -> Size {
if version == VERSION_3 {
Size((NEEDLE_PADDING_SIZE as i32 - ((NEEDLE_HEADER_SIZE as i32 + needle_size.0 + NEEDLE_CHECKSUM_SIZE as i32 + TIMESTAMP_SIZE as i32) % NEEDLE_PADDING_SIZE as i32)) % NEEDLE_PADDING_SIZE as i32)
Size(NEEDLE_PADDING_SIZE as i32 - ((NEEDLE_HEADER_SIZE as i32 + needle_size.0 + NEEDLE_CHECKSUM_SIZE as i32 + TIMESTAMP_SIZE as i32) % NEEDLE_PADDING_SIZE as i32))
} else {
Size((NEEDLE_PADDING_SIZE as i32 - ((NEEDLE_HEADER_SIZE as i32 + needle_size.0 + NEEDLE_CHECKSUM_SIZE as i32) % NEEDLE_PADDING_SIZE as i32)) % NEEDLE_PADDING_SIZE as i32)
Size(NEEDLE_PADDING_SIZE as i32 - ((NEEDLE_HEADER_SIZE as i32 + needle_size.0 + NEEDLE_CHECKSUM_SIZE as i32) % NEEDLE_PADDING_SIZE as i32))
}
}

77
seaweed-volume/src/storage/needle_map.rs

@ -59,21 +59,30 @@ pub struct NeedleMapMetric {
}
impl NeedleMapMetric {
/// Update metrics based on a Put operation.
/// Update metrics based on a Put operation (additive-only, matching Go's logPut).
fn on_put(&self, key: NeedleId, old: Option<&NeedleValue>, new_size: Size) {
if new_size.is_valid() {
if old.is_none() || !old.unwrap().size.is_valid() {
self.file_count.fetch_add(1, Ordering::Relaxed);
}
self.file_byte_count.fetch_add(new_size.0 as u64, Ordering::Relaxed);
if let Some(old_val) = old {
if old_val.size.is_valid() {
self.file_byte_count.fetch_sub(old_val.size.0 as u64, Ordering::Relaxed);
// Track overwritten bytes as garbage for compaction (garbage_level)
self.deletion_byte_count.fetch_add(old_val.size.0 as u64, Ordering::Relaxed);
}
self.maybe_set_max_file_key(key);
// Go: always LogFileCounter(newSize) which does FileCounter++ and FileByteCounter += newSize
self.file_count.fetch_add(1, Ordering::Relaxed);
self.file_byte_count.fetch_add(new_size.0 as u64, Ordering::Relaxed);
// Go: if oldSize > 0 && oldSize.IsValid() { LogDeletionCounter(oldSize) }
if let Some(old_val) = old {
if old_val.size.0 > 0 && old_val.size.is_valid() {
self.deletion_count.fetch_add(1, Ordering::Relaxed);
self.deletion_byte_count.fetch_add(old_val.size.0 as u64, Ordering::Relaxed);
}
}
}
/// Update metrics based on a Delete operation (additive-only, matching Go's logDelete).
fn on_delete(&self, old: &NeedleValue) {
if old.size.0 > 0 {
self.deletion_count.fetch_add(1, Ordering::Relaxed);
self.deletion_byte_count.fetch_add(old.size.0 as u64, Ordering::Relaxed);
}
}
fn maybe_set_max_file_key(&self, key: NeedleId) {
let key_val: u64 = key.into();
loop {
let current = self.max_file_key.load(Ordering::Relaxed);
@ -85,16 +94,6 @@ impl NeedleMapMetric {
}
}
}
/// Update metrics based on a Delete operation.
fn on_delete(&self, old: &NeedleValue) {
if old.size.is_valid() {
self.deletion_count.fetch_add(1, Ordering::Relaxed);
self.deletion_byte_count.fetch_add(old.size.0 as u64, Ordering::Relaxed);
self.file_count.fetch_sub(1, Ordering::Relaxed);
self.file_byte_count.fetch_sub(old.size.0 as u64, Ordering::Relaxed);
}
}
}
// ============================================================================
@ -221,6 +220,7 @@ impl CompactNeedleMap {
/// Remove from map during loading (handle deletions in idx walk).
fn delete_from_map(&mut self, key: NeedleId) {
self.metric.maybe_set_max_file_key(key);
if let Some(old) = self.map.get(&key).cloned() {
if old.size.is_valid() {
self.metric.on_delete(&old);
@ -815,7 +815,8 @@ mod tests {
let deleted = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
assert_eq!(deleted, Some(Size(100)));
assert_eq!(nm.file_count(), 0);
// Additive-only: file_count stays at 1 after delete
assert_eq!(nm.file_count(), 1);
assert_eq!(nm.deleted_count(), 1);
assert_eq!(nm.deleted_size(), 100);
}
@ -831,15 +832,15 @@ mod tests {
assert_eq!(nm.content_size(), 600);
assert_eq!(nm.max_file_key(), NeedleId(3));
// Update existing
// Update existing — additive-only: file_count increments, content_size adds
nm.put(NeedleId(2), Offset::from_actual_offset(700), Size(250)).unwrap();
assert_eq!(nm.file_count(), 3); // still 3
assert_eq!(nm.content_size(), 650); // 100 + 250 + 300
assert_eq!(nm.file_count(), 4); // 3 + 1 (always increments)
assert_eq!(nm.content_size(), 850); // 600 + 250 (always adds)
// Delete
// Delete — additive-only: file_count unchanged
nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
assert_eq!(nm.file_count(), 2);
assert_eq!(nm.deleted_count(), 1);
assert_eq!(nm.file_count(), 4); // unchanged
assert_eq!(nm.deleted_count(), 2); // 1 from overwrite + 1 from delete
}
#[test]
@ -859,7 +860,8 @@ mod tests {
assert!(nm.get(NeedleId(1)).is_some());
assert!(nm.get(NeedleId(2)).is_none()); // deleted
assert!(nm.get(NeedleId(3)).is_some());
assert_eq!(nm.file_count(), 2);
// Additive-only: put(1)+put(2)+put(3) = 3, delete doesn't decrement
assert_eq!(nm.file_count(), 3);
}
#[test]
@ -909,7 +911,8 @@ mod tests {
let deleted = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
assert_eq!(deleted, Some(Size(100)));
assert_eq!(nm.file_count(), 0);
// Additive-only: file_count stays at 1 after delete
assert_eq!(nm.file_count(), 1);
assert_eq!(nm.deleted_count(), 1);
assert_eq!(nm.deleted_size(), 100);
@ -932,15 +935,15 @@ mod tests {
assert_eq!(nm.content_size(), 600);
assert_eq!(nm.max_file_key(), NeedleId(3));
// Update existing
// Update existing — additive-only: file_count increments, content_size adds
nm.put(NeedleId(2), Offset::from_actual_offset(700), Size(250)).unwrap();
assert_eq!(nm.file_count(), 3);
assert_eq!(nm.content_size(), 650); // 100 + 250 + 300
assert_eq!(nm.file_count(), 4); // 3 + 1 (always increments)
assert_eq!(nm.content_size(), 850); // 600 + 250 (always adds)
// Delete
// Delete — additive-only: file_count unchanged
nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
assert_eq!(nm.file_count(), 2);
assert_eq!(nm.deleted_count(), 1);
assert_eq!(nm.file_count(), 4); // unchanged
assert_eq!(nm.deleted_count(), 2); // 1 from overwrite + 1 from delete
}
#[test]

2
seaweed-volume/src/storage/types.rs

@ -23,7 +23,7 @@ pub const NEEDLE_CHECKSUM_SIZE: usize = 4;
/// Maximum possible volume size with 4-byte offset: 32GB
/// Formula: 4 * 1024 * 1024 * 1024 * 8
pub const MAX_POSSIBLE_VOLUME_SIZE: u64 = 4 * 1024 * 1024 * 1024 * 8 * 256;
pub const MAX_POSSIBLE_VOLUME_SIZE: u64 = 4 * 1024 * 1024 * 1024 * 8;
// ============================================================================
// NeedleId

73
seaweed-volume/src/storage/volume.rs

@ -927,6 +927,16 @@ impl Volume {
n: &mut Needle,
check_cookie: bool,
) -> Result<(u64, Size, bool), VolumeError> {
// TTL inheritance from volume (matching Go's writeNeedle2)
{
use crate::storage::needle::ttl::TTL;
let needle_ttl = n.ttl.unwrap_or(TTL::EMPTY);
if needle_ttl == TTL::EMPTY && self.super_block.ttl != TTL::EMPTY {
n.set_has_ttl();
n.ttl = Some(self.super_block.ttl);
}
}
// Ensure checksum is computed before dedup check
if n.checksum == crate::storage::needle::crc::CRC(0) && !n.data.is_empty() {
n.checksum = crate::storage::needle::crc::CRC::new(&n.data);
@ -1047,7 +1057,20 @@ impl Volume {
})?;
let offset = dat_file.seek(SeekFrom::End(0))?;
dat_file.write_all(&bytes)?;
// Check volume size limit before writing (matching Go's Append)
if offset >= MAX_POSSIBLE_VOLUME_SIZE && !n.data.is_empty() {
return Err(VolumeError::SizeLimitExceeded {
current: offset,
limit: MAX_POSSIBLE_VOLUME_SIZE,
});
}
if let Err(e) = dat_file.write_all(&bytes) {
// Truncate back to pre-write position on error (matching Go)
let _ = dat_file.set_len(offset);
return Err(VolumeError::Io(e));
}
Ok((offset, n.size, actual_size))
}
@ -1282,7 +1305,7 @@ impl Volume {
/// Path to .vif file.
fn vif_path(&self) -> String {
format!("{}/{}.vif", self.dir, self.id.0)
format!("{}.vif", self.data_file_name())
}
/// Load volume info from .vif file.
@ -1558,19 +1581,49 @@ impl Volume {
Ok(())
}
/// Write a needle blob and update the needle map index.
/// Matches Go's Volume.WriteNeedleBlob which appends to dat and calls nm.Put.
pub fn write_needle_blob_and_index(
&mut self,
needle_id: NeedleId,
needle_blob: &[u8],
size: Size,
) -> Result<(), VolumeError> {
// Check volume size limit
let content_size = self.content_size();
if MAX_POSSIBLE_VOLUME_SIZE < content_size + needle_blob.len() as u64 {
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::Other,
format!("volume size limit {} exceeded! current size is {}", MAX_POSSIBLE_VOLUME_SIZE, content_size),
)));
}
// Append blob at end of dat file
let dat_size = self.dat_file_size()? as i64;
self.write_needle_blob(dat_size, needle_blob)?;
// Update needle map index
let offset = Offset::from_actual_offset(dat_size);
if let Some(ref mut nm) = self.nm {
nm.put(needle_id, offset, size)?;
}
Ok(())
}
pub fn needs_replication(&self) -> bool {
self.super_block.replica_placement.get_copy_count() > 1
}
/// Garbage ratio: deleted_size / (content_size + deleted_size)
/// Garbage ratio: deleted_size / content_size (matching Go's garbageLevel).
/// content_size is the additive-only FileByteCounter.
pub fn garbage_level(&self) -> f64 {
let content = self.content_size();
let deleted = self.deleted_size();
let total = content + deleted;
if total == 0 {
if content == 0 {
return 0.0;
}
deleted as f64 / total as f64
let deleted = self.deleted_size();
deleted as f64 / content as f64
}
pub fn dat_file_size(&self) -> io::Result<u64> {
@ -2213,7 +2266,8 @@ mod tests {
})
.unwrap();
assert!(deleted_size.0 > 0);
assert_eq!(v.file_count(), 0);
// Additive-only: file_count stays at 1 after delete
assert_eq!(v.file_count(), 1);
assert_eq!(v.deleted_count(), 1);
// Read should fail with Deleted
@ -2384,7 +2438,8 @@ mod tests {
..Needle::default()
};
v.delete_needle(&mut del).unwrap();
assert_eq!(v.file_count(), 2);
// Additive-only: file_count stays at 3 after delete
assert_eq!(v.file_count(), 3);
assert_eq!(v.deleted_count(), 1);
let dat_size_before = v.dat_file_size().unwrap();

Loading…
Cancel
Save