From 216d193675d9e2fe42c2a940aae5e55e38ad1992 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2026 13:21:37 -0700 Subject: [PATCH] =?UTF-8?q?fix:=20Go=20parity=20review=20=E2=80=94=20align?= =?UTF-8?q?=20Rust=20volume=20server=20with=20Go=20behavior?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Handlers (handlers.rs): - Add needle pairs on read response, upload response fields (mime, contentMd5) - Track original_data_size before compression for accurate size reporting - Use json_error helper for consistent JSON error responses - Switch to multer for multipart parsing, filter application/octet-stream - Set Content-Disposition inline, extract needle name on read - Delete sets LastModified to now, case-insensitive Bearer prefix gRPC (grpc_server.rs): - WriteNeedleBlob now updates needle map index (fixes round-trip test) - CopyFile uses i64::MAX stop_offset, copies .ecj files for EC volumes - EC reconstruction uses proper shard-level decoding - TailSender uses binary search for since_ns > 0 - Drain timeout and collection validation on allocate Storage: - Fix needle padding (1-8 bytes, not 0-7) matching Go behavior - Additive-only needle map metrics (never decrement counters) - Track max_file_key on delete during idx loading - Volume TTL inheritance for needles, garbage_level formula fix - MAX_POSSIBLE_VOLUME_SIZE corrected to 32GB - .vif path includes collection prefix Config/Security: - Parse [guard] white_list from security.toml - Heartbeat leader redirect with 3s sleep - Duplicate UUID exponential backoff retry All tests pass: 82 gRPC + 61 HTTP + 9 Rust integration = 152/152 --- seaweed-volume/Cargo.lock | 2 + seaweed-volume/Cargo.toml | 6 + seaweed-volume/src/config.rs | 28 +- seaweed-volume/src/server/grpc_server.rs | 177 +++++---- seaweed-volume/src/server/handlers.rs | 348 +++++++++++++----- seaweed-volume/src/server/heartbeat.rs | 89 ++++- .../src/storage/erasure_coding/ec_decoder.rs | 44 ++- seaweed-volume/src/storage/needle/needle.rs | 4 +- seaweed-volume/src/storage/needle_map.rs | 77 ++-- seaweed-volume/src/storage/types.rs | 2 +- seaweed-volume/src/storage/volume.rs | 73 +++- 11 files changed, 641 insertions(+), 209 deletions(-) diff --git a/seaweed-volume/Cargo.lock b/seaweed-volume/Cargo.lock index 2bc2f38ba..bb74aff2d 100644 --- a/seaweed-volume/Cargo.lock +++ b/seaweed-volume/Cargo.lock @@ -3526,6 +3526,8 @@ dependencies = [ "libc", "md-5", "memmap2", + "mime_guess", + "multer", "parking_lot 0.12.5", "pprof", "prometheus", diff --git a/seaweed-volume/Cargo.toml b/seaweed-volume/Cargo.toml index 7aab1bbfa..ca48e708a 100644 --- a/seaweed-volume/Cargo.toml +++ b/seaweed-volume/Cargo.toml @@ -82,6 +82,12 @@ flate2 = "1" image = { version = "0.25", default-features = false, features = ["png", "jpeg", "gif"] } kamadak-exif = "0.5" +# Multipart form-data parsing +multer = "3" + +# MIME type guessing from file extensions +mime_guess = "2" + # Misc bytes = "1" rand = "0.8" diff --git a/seaweed-volume/src/config.rs b/seaweed-volume/src/config.rs index 6844a8bd0..69ed838f6 100644 --- a/seaweed-volume/src/config.rs +++ b/seaweed-volume/src/config.rs @@ -538,21 +538,22 @@ fn resolve_config(cli: Cli) -> VolumeServerConfig { other => panic!("Unknown readMode: {}. Use local|proxy|redirect", other), }; - // Parse whitelist - let white_list: Vec = cli + // Parse security config from TOML file + let sec = parse_security_config(&cli.security_file); + + // Parse whitelist: merge CLI --whiteList with guard.white_list from security.toml + let mut white_list: Vec = cli .white_list .split(',') .map(|s| s.trim().to_string()) .filter(|s| !s.is_empty()) .collect(); + white_list.extend(sec.guard_white_list.iter().cloned()); // Parse durations let inflight_upload_data_timeout = parse_duration(&cli.inflight_upload_data_timeout); let inflight_download_data_timeout = parse_duration(&cli.inflight_download_data_timeout); - // Parse security config from TOML file - let sec = parse_security_config(&cli.security_file); - VolumeServerConfig { port: cli.port, grpc_port, @@ -615,6 +616,8 @@ pub struct SecurityConfig { pub https_key_file: String, pub grpc_cert_file: String, pub grpc_key_file: String, + /// IPs from [guard] white_list in security.toml + pub guard_white_list: Vec, } /// Parse a security.toml file to extract JWT signing keys and TLS configuration. @@ -654,6 +657,7 @@ fn parse_security_config(path: &str) -> SecurityConfig { JwtSigningRead, HttpsVolume, GrpcVolume, + Guard, } let mut section = Section::None; @@ -679,6 +683,10 @@ fn parse_security_config(path: &str) -> SecurityConfig { section = Section::GrpcVolume; continue; } + if trimmed == "[guard]" { + section = Section::Guard; + continue; + } if trimmed.starts_with('[') { section = Section::None; continue; @@ -708,6 +716,16 @@ fn parse_security_config(path: &str) -> SecurityConfig { "key" => cfg.grpc_key_file = value.to_string(), _ => {} }, + Section::Guard => match key { + "white_list" => { + cfg.guard_white_list = value + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + } + _ => {} + }, Section::None => {} } } diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 15813525c..c998d4a82 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -1089,22 +1089,15 @@ impl VolumeServer for VolumeGrpcService { self.state.check_maintenance()?; let req = request.into_inner(); let vid = VolumeId(req.volume_id); + let needle_id = NeedleId(req.needle_id); + let size = Size(req.size); let mut store = self.state.store.write().unwrap(); let (_, vol) = store.find_volume_mut(vid) .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; - // Write the raw needle blob at the end of the dat file (append) - let dat_size = vol.dat_file_size() - .map_err(|e| Status::internal(e.to_string()))? as i64; - vol.write_needle_blob(dat_size, &req.needle_blob) - .map_err(|e| Status::internal(e.to_string()))?; - - // Update the needle index so the written blob is discoverable - let needle_id = NeedleId(req.needle_id); - let size = Size(req.size); - vol.put_needle_index(needle_id, Offset::from_actual_offset(dat_size), size) - .map_err(|e| Status::internal(e.to_string()))?; + vol.write_needle_blob_and_index(needle_id, &req.needle_blob, size) + .map_err(|e| Status::internal(format!("write blob needle {} size {}: {}", needle_id.0, size.0, e)))?; Ok(Response::new(volume_server_pb::WriteNeedleBlobResponse {})) } @@ -1184,11 +1177,25 @@ impl VolumeServer for VolumeGrpcService { let mut draining_seconds = idle_timeout; loop { - // Scan for new needles + // Use binary search to find starting offset, then scan from there let scan_result = { let store = state.store.read().unwrap(); if let Some((_, vol)) = store.find_volume(vid) { - vol.scan_raw_needles_from(sb_size) + let start_offset = if last_timestamp_ns > 0 { + match vol.binary_search_by_append_at_ns(last_timestamp_ns) { + Ok((offset, _is_last)) => { + if offset.is_zero() { + sb_size + } else { + offset.to_actual_offset() as u64 + } + } + Err(_) => sb_size, + } + } else { + sb_size + }; + vol.scan_raw_needles_from(start_offset) } else { break; } @@ -1240,7 +1247,7 @@ impl VolumeServer for VolumeGrpcService { } } - tokio::time::sleep(std::time::Duration::from_secs(2)).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; if idle_timeout == 0 { last_timestamp_ns = last_processed_ns; @@ -1351,12 +1358,19 @@ impl VolumeServer for VolumeGrpcService { let vid = VolumeId(req.volume_id); let collection = &req.collection; - // Find the volume's directory - let dir = { + // Find the volume's directory and validate collection + let (dir, vol_version, dat_file_size) = { let store = self.state.store.read().unwrap(); - let (loc_idx, _) = store.find_volume(vid) - .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; - store.locations[loc_idx].directory.clone() + let (loc_idx, vol) = store.find_volume(vid) + .ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?; + if vol.collection != req.collection { + return Err(Status::internal(format!( + "existing collection:{} unexpected input: {}", vol.collection, req.collection + ))); + } + let version = vol.version().0 as u32; + let dat_size = vol.dat_file_size().unwrap_or(0) as i64; + (store.locations[loc_idx].directory.clone(), version, dat_size) }; let (data_shards, parity_shards) = crate::storage::erasure_coding::ec_volume::read_ec_shard_config(&dir, vid); @@ -1364,6 +1378,25 @@ impl VolumeServer for VolumeGrpcService { crate::storage::erasure_coding::ec_encoder::write_ec_files(&dir, collection, vid, data_shards as usize, parity_shards as usize) .map_err(|e| Status::internal(e.to_string()))?; + // Write .vif file with EC shard metadata + { + let base = crate::storage::volume::volume_file_name(&dir, collection, vid); + let vif_path = format!("{}.vif", base); + let vif = crate::storage::volume::VifVolumeInfo { + version: vol_version, + dat_file_size, + ec_shard_config: Some(crate::storage::volume::VifEcShardConfig { + data_shards: data_shards, + parity_shards: parity_shards, + }), + ..Default::default() + }; + let content = serde_json::to_string_pretty(&vif) + .map_err(|e| Status::internal(format!("serialize vif: {}", e)))?; + std::fs::write(&vif_path, content) + .map_err(|e| Status::internal(format!("write vif: {}", e)))?; + } + Ok(Response::new(volume_server_pb::VolumeEcShardsGenerateResponse {})) } @@ -1467,7 +1500,7 @@ impl VolumeServer for VolumeGrpcService { is_ec_volume: true, ext: ext.clone(), compaction_revision: u32::MAX, - stop_offset: 0, + stop_offset: i64::MAX as u64, ..Default::default() }; let mut stream = client.copy_file(copy_req).await @@ -1496,7 +1529,7 @@ impl VolumeServer for VolumeGrpcService { is_ec_volume: true, ext: ".ecx".to_string(), compaction_revision: u32::MAX, - stop_offset: 0, + stop_offset: i64::MAX as u64, ..Default::default() }; let mut stream = client.copy_file(copy_req).await @@ -1517,6 +1550,36 @@ impl VolumeServer for VolumeGrpcService { } } + // Copy .ecj file if requested + if req.copy_ecj_file { + let copy_req = volume_server_pb::CopyFileRequest { + volume_id: req.volume_id, + collection: req.collection.clone(), + is_ec_volume: true, + ext: ".ecj".to_string(), + compaction_revision: u32::MAX, + stop_offset: i64::MAX as u64, + ignore_source_file_not_found: true, + ..Default::default() + }; + let mut stream = client.copy_file(copy_req).await + .map_err(|e| Status::internal(format!("VolumeEcShardsCopy volume {} copy .ecj: {}", vid, e)))? + .into_inner(); + + let file_path = { + let base = crate::storage::volume::volume_file_name(&dest_dir, &req.collection, vid); + format!("{}.ecj", base) + }; + let mut file = std::fs::File::create(&file_path) + .map_err(|e| Status::internal(format!("create {}: {}", file_path, e)))?; + while let Some(chunk) = stream.message().await + .map_err(|e| Status::internal(format!("recv .ecj: {}", e)))? { + use std::io::Write; + file.write_all(&chunk.file_content) + .map_err(|e| Status::internal(format!("write {}: {}", file_path, e)))?; + } + } + // Copy .vif file if requested if req.copy_vif_file { let copy_req = volume_server_pb::CopyFileRequest { @@ -1525,7 +1588,7 @@ impl VolumeServer for VolumeGrpcService { is_ec_volume: true, ext: ".vif".to_string(), compaction_revision: u32::MAX, - stop_offset: 0, + stop_offset: i64::MAX as u64, ..Default::default() }; let mut stream = client.copy_file(copy_req).await @@ -1672,38 +1735,41 @@ impl VolumeServer for VolumeGrpcService { let ec_vol = store.ec_volumes.get(&vid) .ok_or_else(|| Status::not_found(format!("ec volume {} not found", req.volume_id)))?; - // Check that all 10 data shards are present - use crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT; - for shard_id in 0..DATA_SHARDS_COUNT as u8 { - if ec_vol.shards.get(shard_id as usize).map(|s| s.is_none()).unwrap_or(true) { + if ec_vol.collection != req.collection { + return Err(Status::internal(format!( + "existing collection:{} unexpected input: {}", ec_vol.collection, req.collection + ))); + } + + // Use EC context data shard count from the volume + let data_shards = ec_vol.data_shards as usize; + + // Check that all data shards are present + for shard_id in 0..data_shards { + if ec_vol.shards.get(shard_id).map(|s| s.is_none()).unwrap_or(true) { return Err(Status::internal(format!( "ec volume {} missing shard {}", req.volume_id, shard_id ))); } } - // Read the .ecx index to find all needles + // Read the .ecx index to check for live entries let ecx_path = ec_vol.ecx_file_name(); let ecx_data = std::fs::read(&ecx_path) .map_err(|e| Status::internal(format!("read ecx: {}", e)))?; let entry_count = ecx_data.len() / NEEDLE_MAP_ENTRY_SIZE; - // Read deleted needles from .ecj - let deleted_needles = ec_vol.read_deleted_needles() - .map_err(|e| Status::internal(e.to_string()))?; - - // Count live entries - let mut live_count = 0; + let mut has_live = false; for i in 0..entry_count { let start = i * NEEDLE_MAP_ENTRY_SIZE; - let (key, _offset, size) = idx_entry_from_bytes(&ecx_data[start..start + NEEDLE_MAP_ENTRY_SIZE]); - if size.is_deleted() || deleted_needles.contains(&key) { - continue; + let (_, _, size) = idx_entry_from_bytes(&ecx_data[start..start + NEEDLE_MAP_ENTRY_SIZE]); + if !size.is_deleted() { + has_live = true; + break; } - live_count += 1; } - if live_count == 0 { + if !has_live { return Err(Status::failed_precondition(format!( "ec volume {} has no live entries", req.volume_id ))); @@ -1714,34 +1780,17 @@ impl VolumeServer for VolumeGrpcService { let collection = ec_vol.collection.clone(); drop(store); - // Read all shard data and reconstruct the .dat file - // For simplicity, concatenate the first DATA_SHARDS_COUNT shards - let mut dat_data = Vec::new(); - { - let store = self.state.store.read().unwrap(); - let ec_vol = store.ec_volumes.get(&vid).unwrap(); - for shard_id in 0..DATA_SHARDS_COUNT as u8 { - if let Some(Some(shard)) = ec_vol.shards.get(shard_id as usize) { - let shard_size = shard.file_size() as usize; - let mut buf = vec![0u8; shard_size]; - let n = shard.read_at(&mut buf, 0) - .map_err(|e| Status::internal(format!("read shard {}: {}", shard_id, e)))?; - buf.truncate(n); - dat_data.extend_from_slice(&buf); - } - } - } + // Calculate .dat file size from .ecx entries + let dat_file_size = crate::storage::erasure_coding::ec_decoder::find_dat_file_size(&dir, &collection, vid) + .map_err(|e| Status::internal(format!("FindDatFileSize: {}", e)))?; - // Write the reconstructed .dat file - let base = crate::storage::volume::volume_file_name(&dir, &collection, vid); - let dat_path = format!("{}.dat", base); - std::fs::write(&dat_path, &dat_data) - .map_err(|e| Status::internal(format!("write dat: {}", e)))?; + // Write .dat file using block-interleaved reading from shards + crate::storage::erasure_coding::ec_decoder::write_dat_file_from_shards(&dir, &collection, vid, dat_file_size, data_shards) + .map_err(|e| Status::internal(format!("WriteDatFile: {}", e)))?; - // Copy the .ecx to .idx (they have the same format) - let idx_path = format!("{}.idx", base); - std::fs::copy(&ecx_path, &idx_path) - .map_err(|e| Status::internal(format!("copy ecx to idx: {}", e)))?; + // Write .idx file from .ecx and .ecj files + crate::storage::erasure_coding::ec_decoder::write_idx_file_from_ec_index(&dir, &collection, vid) + .map_err(|e| Status::internal(format!("WriteIdxFileFromEcIndex: {}", e)))?; // Unmount EC shards and mount the reconstructed volume { diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index e16bb655f..6924a5075 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -844,15 +844,77 @@ async fn get_or_head_handler_inner( let mut response_headers = HeaderMap::new(); response_headers.insert(header::ETAG, etag.parse().unwrap()); - // Set Content-Type: use response-content-type query param override, else from needle mime + // H1: Emit pairs as response headers + if n.has_pairs() && !n.pairs.is_empty() { + if let Ok(pair_map) = + serde_json::from_slice::>(&n.pairs) + { + for (k, v) in &pair_map { + if let (Ok(hname), Ok(hval)) = ( + axum::http::HeaderName::from_bytes(k.as_bytes()), + axum::http::HeaderValue::from_str(v), + ) { + response_headers.insert(hname, hval); + } + } + } + } + + // H8: Use needle stored name when URL path has no filename (only vid,fid) + let mut filename = extract_filename_from_path(&path); + let mut ext = ext; + if n.name_size > 0 && filename.is_empty() { + filename = String::from_utf8_lossy(&n.name).to_string(); + if ext.is_empty() { + if let Some(dot_pos) = filename.rfind('.') { + ext = filename[dot_pos..].to_lowercase(); + } + } + } + + // H6: Determine Content-Type: filter application/octet-stream, use mime_guess let content_type = if let Some(ref ct) = query.response_content_type { - ct.clone() - } else if !n.mime.is_empty() { - String::from_utf8_lossy(&n.mime).to_string() + Some(ct.clone()) } else { - "application/octet-stream".to_string() + // Get MIME from needle, but filter out application/octet-stream + let needle_mime = if !n.mime.is_empty() { + let mt = String::from_utf8_lossy(&n.mime).to_string(); + if mt.starts_with("application/octet-stream") { + String::new() + } else { + mt + } + } else { + String::new() + }; + + if !needle_mime.is_empty() { + Some(needle_mime) + } else { + // Fall through to extension-based detection + let detect_ext = if !ext.is_empty() { + ext.clone() + } else if !filename.is_empty() { + if let Some(dot_pos) = filename.rfind('.') { + filename[dot_pos..].to_lowercase() + } else { + String::new() + } + } else { + String::new() + }; + if !detect_ext.is_empty() { + mime_guess::from_ext(detect_ext.trim_start_matches('.')) + .first() + .map(|m| m.to_string()) + } else { + None // Omit Content-Type entirely + } + } }; - response_headers.insert(header::CONTENT_TYPE, content_type.parse().unwrap()); + if let Some(ref ct) = content_type { + response_headers.insert(header::CONTENT_TYPE, ct.parse().unwrap()); + } // Cache-Control override from query param if let Some(ref cc) = query.response_cache_control { @@ -878,16 +940,23 @@ async fn get_or_head_handler_inner( response_headers.insert(header::LAST_MODIFIED, lm.parse().unwrap()); } - // Content-Disposition for download - if query.dl.is_some() { - // Extract filename from URL path - let filename = extract_filename_from_path(&path); - let disposition = if filename.is_empty() { - "attachment".to_string() + // H7: Content-Disposition — inline by default, attachment only when dl is truthy + // Only set if not already set by response-content-disposition query param + if !response_headers.contains_key(header::CONTENT_DISPOSITION) && !filename.is_empty() { + let disposition_type = if let Some(ref dl_val) = query.dl { + // Parse dl as bool: "true", "1" -> attachment; anything else -> inline + if dl_val == "true" || dl_val == "1" { + "attachment" + } else { + "inline" + } } else { - format!("attachment; filename=\"{}\"", filename) + "inline" }; - response_headers.insert(header::CONTENT_DISPOSITION, disposition.parse().unwrap()); + let disposition = format!("{}; filename=\"{}\"", disposition_type, filename); + if let Ok(hval) = disposition.parse() { + response_headers.insert(header::CONTENT_DISPOSITION, hval); + } } // ---- Streaming path: large uncompressed files ---- @@ -1232,6 +1301,10 @@ struct UploadResult { size: u32, #[serde(rename = "eTag")] etag: String, + #[serde(skip_serializing_if = "Option::is_none")] + mime: Option, + #[serde(rename = "contentMd5", skip_serializing_if = "Option::is_none")] + content_md5: Option, } pub async fn post_handler( @@ -1247,7 +1320,7 @@ pub async fn post_handler( let (vid, needle_id, cookie) = match parse_url_path(&path) { Some(parsed) => parsed, - None => return (StatusCode::BAD_REQUEST, "invalid URL path").into_response(), + None => return json_error(StatusCode::BAD_REQUEST, "invalid URL path"), }; // JWT check for writes @@ -1257,7 +1330,7 @@ pub async fn post_handler( .guard .check_jwt_for_file(token.as_deref(), &file_id, true) { - return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response(); + return json_error(StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)); } // Upload throttling: check inflight bytes against limit @@ -1287,7 +1360,7 @@ pub async fn post_handler( .await .is_err() { - return (StatusCode::TOO_MANY_REQUESTS, "upload limit exceeded").into_response(); + return json_error(StatusCode::TOO_MANY_REQUESTS, "upload limit exceeded"); } } state @@ -1313,11 +1386,10 @@ pub async fn post_handler( if let Some(ct) = headers.get(header::CONTENT_TYPE) { if let Ok(ct_str) = ct.to_str() { if ct_str.starts_with("multipart/form-data") && !ct_str.contains("boundary=") { - return ( + return json_error( StatusCode::BAD_REQUEST, "no multipart boundary param in Content-Type", - ) - .into_response(); + ); } } } @@ -1330,12 +1402,83 @@ pub async fn post_handler( // Read body let body = match axum::body::to_bytes(request.into_body(), usize::MAX).await { Ok(b) => b, - Err(e) => return (StatusCode::BAD_REQUEST, format!("read body: {}", e)).into_response(), + Err(e) => return json_error(StatusCode::BAD_REQUEST, format!("read body: {}", e)), }; + // H5: Multipart form-data parsing + let content_type_str = headers + .get(header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .unwrap_or("") + .to_string(); + + let (body_data_raw, parsed_filename, parsed_content_type, _parsed_content_encoding) = + if content_type_str.starts_with("multipart/form-data") { + // Extract boundary from Content-Type + let boundary = content_type_str + .split(';') + .find_map(|part| { + let part = part.trim(); + if let Some(val) = part.strip_prefix("boundary=") { + Some(val.trim_matches('"').to_string()) + } else { + None + } + }) + .unwrap_or_default(); + + let mut multipart = + multer::Multipart::new(futures::stream::once(async { Ok::<_, std::io::Error>(body.clone()) }), boundary); + + let mut file_data: Option> = None; + let mut file_name: Option = None; + let mut file_content_type: Option = None; + let mut file_content_encoding: Option = None; + + while let Ok(Some(field)) = multipart.next_field().await { + let field_name = field.name().map(|s| s.to_string()); + let fname = field.file_name().map(|s| { + // Clean Windows backslashes + let cleaned = s.replace('\\', "/"); + cleaned + .rsplit('/') + .next() + .unwrap_or(&cleaned) + .to_string() + }); + let fct = field.content_type().map(|m| m.to_string()); + + if let Ok(data) = field.bytes().await { + if file_data.is_none() { + // First file field + file_data = Some(data.to_vec()); + file_name = fname; + file_content_type = fct; + // Content-Encoding comes from headers, not multipart field + file_content_encoding = None; + } + let _ = field_name; // suppress unused warning + } + } + + if let Some(data) = file_data { + ( + data, + file_name.unwrap_or_default(), + file_content_type, + file_content_encoding, + ) + } else { + // No file field found, use raw body + (body.to_vec(), String::new(), None, None) + } + } else { + (body.to_vec(), String::new(), None, None) + }; + // Check file size limit - if state.file_size_limit_bytes > 0 && body.len() as i64 > state.file_size_limit_bytes { - return (StatusCode::BAD_REQUEST, "file size limit exceeded").into_response(); + if state.file_size_limit_bytes > 0 && body_data_raw.len() as i64 > state.file_size_limit_bytes { + return json_error(StatusCode::BAD_REQUEST, "file size limit exceeded"); } // Validate Content-MD5 if provided @@ -1343,17 +1486,16 @@ pub async fn post_handler( use base64::Engine; use md5::{Digest, Md5}; let mut hasher = Md5::new(); - hasher.update(&body); + hasher.update(&body_data_raw); let actual = base64::engine::general_purpose::STANDARD.encode(hasher.finalize()); if actual != *expected_md5 { - return ( + return json_error( StatusCode::BAD_REQUEST, format!( "Content-MD5 mismatch: expected {} got {}", expected_md5, actual ), - ) - .into_response(); + ); } } @@ -1380,18 +1522,22 @@ pub async fn post_handler( .map(|s| s == "gzip") .unwrap_or(false); - // Extract MIME type from Content-Type header (needed early for JPEG orientation fix) - let mime_type = headers - .get(header::CONTENT_TYPE) - .and_then(|v| v.to_str().ok()) - .map(|ct| { - if ct.starts_with("multipart/") { - "application/octet-stream".to_string() - } else { - ct.to_string() - } - }) - .unwrap_or_else(|| "application/octet-stream".to_string()); + // Extract MIME type: prefer multipart-parsed content type, else from Content-Type header + let mime_type = if let Some(ref pct) = parsed_content_type { + pct.clone() + } else { + headers + .get(header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .map(|ct| { + if ct.starts_with("multipart/") { + "application/octet-stream".to_string() + } else { + ct.to_string() + } + }) + .unwrap_or_else(|| "application/octet-stream".to_string()) + }; // Parse TTL from query param (matches Go's r.FormValue("ttl")) let ttl_str = query @@ -1425,11 +1571,14 @@ pub async fn post_handler( // Fix JPEG orientation from EXIF data before storing (matches Go behavior). // Only for non-compressed uploads that are JPEG files. let body_data = if !is_gzipped && crate::images::is_jpeg(&mime_type, &path) { - crate::images::fix_jpg_orientation(&body) + crate::images::fix_jpg_orientation(&body_data_raw) } else { - body.to_vec() + body_data_raw }; + // H3: Capture original data size BEFORE auto-compression + let original_data_size = body_data.len() as u32; + // Auto-compress compressible file types (matches Go's IsCompressableFileType). // Only compress if not already gzipped and compression saves >10%. let (final_data, final_is_gzipped) = if !is_gzipped && !is_chunk_manifest { @@ -1497,7 +1646,12 @@ pub async fn post_handler( } // Set filename on needle (matches Go: n.Name = []byte(pu.FileName)) - let filename = extract_filename_from_path(&path); + // Prefer multipart-parsed filename, else extract from URL path + let filename = if !parsed_filename.is_empty() { + parsed_filename + } else { + extract_filename_from_path(&path) + }; if !filename.is_empty() && filename.len() < 256 { n.name = filename.as_bytes().to_vec(); n.name_size = filename.len() as u8; @@ -1533,11 +1687,10 @@ pub async fn post_handler( .await { tracing::error!("replicated write failed: {}", e); - return ( + return json_error( StatusCode::INTERNAL_SERVER_ERROR, format!("replication failed: {}", e), - ) - .into_response(); + ); } } } @@ -1548,28 +1701,46 @@ pub async fn post_handler( let etag = format!("\"{}\"", n.etag()); (StatusCode::NO_CONTENT, [(header::ETAG, etag)]).into_response() } else { + // H2: Compute Content-MD5 as base64(md5(original_data)) + let content_md5_value = { + use base64::Engine; + use md5::{Digest, Md5}; + let mut hasher = Md5::new(); + hasher.update(&n.data); + base64::engine::general_purpose::STANDARD.encode(hasher.finalize()) + }; let result = UploadResult { name: filename.clone(), - size: n.data_size, + size: original_data_size, // H3: use original size, not compressed etag: n.etag(), + mime: if mime_type.is_empty() { + None + } else { + Some(mime_type.clone()) + }, + content_md5: Some(content_md5_value.clone()), }; metrics::REQUEST_DURATION .with_label_values(&["write"]) .observe(start.elapsed().as_secs_f64()); - (StatusCode::CREATED, axum::Json(result)).into_response() + let mut resp = (StatusCode::CREATED, axum::Json(result)).into_response(); + resp.headers_mut().insert( + "Content-MD5", + content_md5_value.parse().unwrap(), + ); + resp } } Err(crate::storage::volume::VolumeError::NotFound) => { - (StatusCode::NOT_FOUND, "volume not found").into_response() + json_error(StatusCode::NOT_FOUND, "volume not found") } Err(crate::storage::volume::VolumeError::ReadOnly) => { - (StatusCode::FORBIDDEN, "volume is read-only").into_response() + json_error(StatusCode::FORBIDDEN, "volume is read-only") } - Err(e) => ( + Err(e) => json_error( StatusCode::INTERNAL_SERVER_ERROR, format!("write error: {}", e), - ) - .into_response(), + ), }; // _upload_guard drops here, releasing inflight bytes @@ -1599,7 +1770,7 @@ pub async fn delete_handler( let (vid, needle_id, cookie) = match parse_url_path(&path) { Some(parsed) => parsed, - None => return (StatusCode::BAD_REQUEST, "invalid URL path").into_response(), + None => return json_error(StatusCode::BAD_REQUEST, "invalid URL path"), }; // JWT check for writes (deletes use write key) @@ -1609,19 +1780,27 @@ pub async fn delete_handler( .guard .check_jwt_for_file(token.as_deref(), &file_id, true) { - return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response(); + return json_error(StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)); } - // Parse custom timestamp from query param + // H9: Parse custom timestamp from query param; default to now (not 0) let del_query = request.uri().query().unwrap_or(""); let del_ts_str = del_query .split('&') .find_map(|p| p.strip_prefix("ts=")) .unwrap_or(""); let del_last_modified = if !del_ts_str.is_empty() { - del_ts_str.parse::().unwrap_or(0) + del_ts_str.parse::().unwrap_or_else(|_| { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + }) } else { - 0 + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() }; let mut n = Needle { @@ -1643,18 +1822,15 @@ pub async fn delete_handler( } } if n.cookie != original_cookie { - return ( + return json_error( StatusCode::BAD_REQUEST, "File Random Cookie does not match.", - ) - .into_response(); + ); } - // Apply custom timestamp if provided - if del_last_modified > 0 { - n.last_modified = del_last_modified; - n.set_has_last_modified_date(); - } + // Apply custom timestamp (always set — defaults to now per H9) + n.last_modified = del_last_modified; + n.set_has_last_modified_date(); // If this is a chunk manifest, delete child chunks first if n.is_chunk_manifest() { @@ -1678,11 +1854,10 @@ pub async fn delete_handler( let (chunk_vid, chunk_nid, chunk_cookie) = match parse_url_path(&chunk.fid) { Some(p) => p, None => { - return ( + return json_error( StatusCode::INTERNAL_SERVER_ERROR, format!("invalid chunk fid: {}", chunk.fid), - ) - .into_response(); + ); } }; let mut chunk_needle = Needle { @@ -1694,31 +1869,28 @@ pub async fn delete_handler( { let store = state.store.read().unwrap(); if let Err(e) = store.read_volume_needle(chunk_vid, &mut chunk_needle) { - return ( + return json_error( StatusCode::INTERNAL_SERVER_ERROR, format!("read chunk {}: {}", chunk.fid, e), - ) - .into_response(); + ); } } // Delete the chunk let mut store = state.store.write().unwrap(); if let Err(e) = store.delete_volume_needle(chunk_vid, &mut chunk_needle) { - return ( + return json_error( StatusCode::INTERNAL_SERVER_ERROR, format!("delete chunk {}: {}", chunk.fid, e), - ) - .into_response(); + ); } } // Delete the manifest itself let mut store = state.store.write().unwrap(); if let Err(e) = store.delete_volume_needle(vid, &mut n) { - return ( + return json_error( StatusCode::INTERNAL_SERVER_ERROR, format!("delete manifest: {}", e), - ) - .into_response(); + ); } metrics::REQUEST_DURATION .with_label_values(&["delete"]) @@ -1757,11 +1929,10 @@ pub async fn delete_handler( .await { tracing::error!("replicated delete failed: {}", e); - return ( + return json_error( StatusCode::INTERNAL_SERVER_ERROR, format!("replication failed: {}", e), - ) - .into_response(); + ); } } } @@ -1782,11 +1953,10 @@ pub async fn delete_handler( let result = DeleteResult { size: 0 }; (StatusCode::NOT_FOUND, axum::Json(result)).into_response() } - Err(e) => ( + Err(e) => json_error( StatusCode::INTERNAL_SERVER_ERROR, format!("delete error: {}", e), - ) - .into_response(), + ), } } @@ -2150,6 +2320,12 @@ fn try_expand_chunk_manifest( // Helpers // ============================================================================ +/// Return a JSON error response: `{"error": ""}`. +fn json_error(status: StatusCode, msg: impl Into) -> Response { + let body = serde_json::json!({"error": msg.into()}); + (status, axum::Json(body)).into_response() +} + /// Extract JWT token from query param, Authorization header, or Cookie. /// Query param takes precedence over header, header over cookie. fn extract_jwt(headers: &HeaderMap, uri: &axum::http::Uri) -> Option { @@ -2164,11 +2340,11 @@ fn extract_jwt(headers: &HeaderMap, uri: &axum::http::Uri) -> Option { } } - // 2. Check Authorization: Bearer + // 2. Check Authorization: Bearer (case-insensitive prefix) if let Some(auth) = headers.get(header::AUTHORIZATION) { if let Ok(auth_str) = auth.to_str() { - if let Some(token) = auth_str.strip_prefix("Bearer ") { - return Some(token.to_string()); + if auth_str.len() >= 7 && auth_str[..7].eq_ignore_ascii_case("bearer ") { + return Some(auth_str[7..].to_string()); } } } diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index 20837d8f0..d85ebc8cf 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -31,6 +31,11 @@ pub struct HeartbeatConfig { } /// Run the heartbeat loop using VolumeServerState. +/// +/// Mirrors Go's `volume_grpc_client_to_master.go` heartbeat(): +/// - On leader redirect: sleep 3s, then connect directly to the new leader +/// - On duplicate UUID error: exponential backoff (2s, 4s, 8s), exit after 3 retries +/// - On other errors: sleep pulse interval, reset to seed master list iteration pub async fn run_heartbeat_with_state( config: HeartbeatConfig, state: Arc, @@ -42,6 +47,8 @@ pub async fn run_heartbeat_with_state( ); let pulse = Duration::from_secs(config.pulse_seconds.max(1)); + let mut new_leader: Option = None; + let mut duplicate_retry_count: u32 = 0; loop { for master_addr in &config.master_addresses { @@ -51,21 +58,93 @@ pub async fn run_heartbeat_with_state( return; } - let grpc_addr = to_grpc_address(master_addr); + // If we have a leader redirect, sleep 3s then connect to the leader + // instead of iterating through the seed list + let target_addr = if let Some(ref leader) = new_leader { + tokio::time::sleep(Duration::from_secs(3)).await; + leader.clone() + } else { + master_addr.clone() + }; + + let grpc_addr = to_grpc_address(&target_addr); info!("Connecting heartbeat to master {}", grpc_addr); - match do_heartbeat(&config, &state, &grpc_addr, pulse, &mut shutdown_rx).await { + // Determine what action to take after the heartbeat attempt. + // We convert the error to a string immediately so the non-Send + // Box is dropped before any .await point. + enum PostAction { + LeaderRedirect(String), + Done, + SleepDuplicate(Duration), + SleepPulse, + } + let action = match do_heartbeat(&config, &state, &grpc_addr, pulse, &mut shutdown_rx).await { Ok(Some(leader)) => { info!("Master leader changed to {}", leader); + PostAction::LeaderRedirect(leader) + } + Ok(None) => { + duplicate_retry_count = 0; + PostAction::Done } - Ok(None) => {} Err(e) => { + let err_msg = e.to_string(); + // Drop `e` (non-Send) before any .await + drop(e); state.is_heartbeating.store(false, Ordering::Relaxed); - warn!("Heartbeat to {} error: {}", grpc_addr, e); + warn!("Heartbeat to {} error: {}", grpc_addr, err_msg); + + if err_msg.contains("duplicate") && err_msg.contains("UUID") { + duplicate_retry_count += 1; + if duplicate_retry_count > 3 { + error!("Shut down Volume Server due to persistent duplicate volume directories after 3 retries"); + error!("Please check if another volume server is using the same directory"); + std::process::exit(1); + } + let retry_delay = Duration::from_secs(2u64.pow(duplicate_retry_count)); + warn!( + "Waiting {:?} before retrying due to duplicate UUID detection (attempt {}/3)...", + retry_delay, duplicate_retry_count + ); + PostAction::SleepDuplicate(retry_delay) + } else { + duplicate_retry_count = 0; + PostAction::SleepPulse + } + } + }; + + match action { + PostAction::LeaderRedirect(leader) => { + new_leader = Some(leader); + break; + } + PostAction::Done => { + new_leader = None; } + PostAction::SleepDuplicate(delay) => { + new_leader = None; + tokio::time::sleep(delay).await; + } + PostAction::SleepPulse => { + new_leader = None; + tokio::time::sleep(pulse).await; + } + } + + // If we connected to a leader (not seed list), break out after one attempt + // so we either reconnect to the new leader or fall back to seed list + if new_leader.is_some() { + break; } } + // If we have a leader redirect, skip the sleep and reconnect immediately + if new_leader.is_some() { + continue; + } + tokio::select! { _ = tokio::time::sleep(pulse) => {} _ = shutdown_rx.recv() => { @@ -213,6 +292,8 @@ async fn do_heartbeat( let delta_hb = master_pb::Heartbeat { ip: config.ip.clone(), port: config.port as u32, + grpc_port: config.grpc_port as u32, + public_url: config.public_url.clone(), data_center: config.data_center.clone(), rack: config.rack.clone(), new_volumes: new_vols, diff --git a/seaweed-volume/src/storage/erasure_coding/ec_decoder.rs b/seaweed-volume/src/storage/erasure_coding/ec_decoder.rs index e897bcbf5..384d60228 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_decoder.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_decoder.rs @@ -4,13 +4,55 @@ //! and the sorted index (.ecx) + deletion journal (.ecj). use std::fs::File; -use std::io::{self, Write}; +use std::io::{self, Read, Write}; use crate::storage::erasure_coding::ec_shard::*; use crate::storage::idx; +use crate::storage::needle::needle::get_actual_size; +use crate::storage::super_block::SUPER_BLOCK_SIZE; use crate::storage::types::*; use crate::storage::volume::volume_file_name; +/// Calculate .dat file size from the max offset entry in .ecx. +/// Reads the volume version from the first EC shard (.ec00) superblock, +/// then scans .ecx entries to find the largest (offset + needle_actual_size). +pub fn find_dat_file_size( + dir: &str, + collection: &str, + volume_id: VolumeId, +) -> io::Result { + let base = volume_file_name(dir, collection, volume_id); + + // Read volume version from .ec00 superblock + let ec00_path = format!("{}.ec00", base); + let mut ec00 = File::open(&ec00_path)?; + let mut sb_buf = [0u8; SUPER_BLOCK_SIZE]; + ec00.read_exact(&mut sb_buf)?; + let version = Version(sb_buf[0]); + + // Start with at least the superblock size + let mut dat_size: i64 = SUPER_BLOCK_SIZE as i64; + + // Scan .ecx entries + let ecx_path = format!("{}.ecx", base); + let ecx_data = std::fs::read(&ecx_path)?; + let entry_count = ecx_data.len() / NEEDLE_MAP_ENTRY_SIZE; + + for i in 0..entry_count { + let start = i * NEEDLE_MAP_ENTRY_SIZE; + let (_, offset, size) = idx_entry_from_bytes(&ecx_data[start..start + NEEDLE_MAP_ENTRY_SIZE]); + if size.is_deleted() { + continue; + } + let entry_stop = offset.to_actual_offset() + get_actual_size(size, version); + if entry_stop > dat_size { + dat_size = entry_stop; + } + } + + Ok(dat_size) +} + /// Reconstruct a .dat file from EC data shards. /// /// Reads from .ec00-.ec09 and writes a new .dat file. diff --git a/seaweed-volume/src/storage/needle/needle.rs b/seaweed-volume/src/storage/needle/needle.rs index c6283846b..a805e40bd 100644 --- a/seaweed-volume/src/storage/needle/needle.rs +++ b/seaweed-volume/src/storage/needle/needle.rs @@ -446,9 +446,9 @@ impl Needle { /// Compute padding to align needle to NEEDLE_PADDING_SIZE (8 bytes). pub fn padding_length(needle_size: Size, version: Version) -> Size { if version == VERSION_3 { - Size((NEEDLE_PADDING_SIZE as i32 - ((NEEDLE_HEADER_SIZE as i32 + needle_size.0 + NEEDLE_CHECKSUM_SIZE as i32 + TIMESTAMP_SIZE as i32) % NEEDLE_PADDING_SIZE as i32)) % NEEDLE_PADDING_SIZE as i32) + Size(NEEDLE_PADDING_SIZE as i32 - ((NEEDLE_HEADER_SIZE as i32 + needle_size.0 + NEEDLE_CHECKSUM_SIZE as i32 + TIMESTAMP_SIZE as i32) % NEEDLE_PADDING_SIZE as i32)) } else { - Size((NEEDLE_PADDING_SIZE as i32 - ((NEEDLE_HEADER_SIZE as i32 + needle_size.0 + NEEDLE_CHECKSUM_SIZE as i32) % NEEDLE_PADDING_SIZE as i32)) % NEEDLE_PADDING_SIZE as i32) + Size(NEEDLE_PADDING_SIZE as i32 - ((NEEDLE_HEADER_SIZE as i32 + needle_size.0 + NEEDLE_CHECKSUM_SIZE as i32) % NEEDLE_PADDING_SIZE as i32)) } } diff --git a/seaweed-volume/src/storage/needle_map.rs b/seaweed-volume/src/storage/needle_map.rs index ed1f0f47d..c40a0fea1 100644 --- a/seaweed-volume/src/storage/needle_map.rs +++ b/seaweed-volume/src/storage/needle_map.rs @@ -59,21 +59,30 @@ pub struct NeedleMapMetric { } impl NeedleMapMetric { - /// Update metrics based on a Put operation. + /// Update metrics based on a Put operation (additive-only, matching Go's logPut). fn on_put(&self, key: NeedleId, old: Option<&NeedleValue>, new_size: Size) { - if new_size.is_valid() { - if old.is_none() || !old.unwrap().size.is_valid() { - self.file_count.fetch_add(1, Ordering::Relaxed); - } - self.file_byte_count.fetch_add(new_size.0 as u64, Ordering::Relaxed); - if let Some(old_val) = old { - if old_val.size.is_valid() { - self.file_byte_count.fetch_sub(old_val.size.0 as u64, Ordering::Relaxed); - // Track overwritten bytes as garbage for compaction (garbage_level) - self.deletion_byte_count.fetch_add(old_val.size.0 as u64, Ordering::Relaxed); - } + self.maybe_set_max_file_key(key); + // Go: always LogFileCounter(newSize) which does FileCounter++ and FileByteCounter += newSize + self.file_count.fetch_add(1, Ordering::Relaxed); + self.file_byte_count.fetch_add(new_size.0 as u64, Ordering::Relaxed); + // Go: if oldSize > 0 && oldSize.IsValid() { LogDeletionCounter(oldSize) } + if let Some(old_val) = old { + if old_val.size.0 > 0 && old_val.size.is_valid() { + self.deletion_count.fetch_add(1, Ordering::Relaxed); + self.deletion_byte_count.fetch_add(old_val.size.0 as u64, Ordering::Relaxed); } } + } + + /// Update metrics based on a Delete operation (additive-only, matching Go's logDelete). + fn on_delete(&self, old: &NeedleValue) { + if old.size.0 > 0 { + self.deletion_count.fetch_add(1, Ordering::Relaxed); + self.deletion_byte_count.fetch_add(old.size.0 as u64, Ordering::Relaxed); + } + } + + fn maybe_set_max_file_key(&self, key: NeedleId) { let key_val: u64 = key.into(); loop { let current = self.max_file_key.load(Ordering::Relaxed); @@ -85,16 +94,6 @@ impl NeedleMapMetric { } } } - - /// Update metrics based on a Delete operation. - fn on_delete(&self, old: &NeedleValue) { - if old.size.is_valid() { - self.deletion_count.fetch_add(1, Ordering::Relaxed); - self.deletion_byte_count.fetch_add(old.size.0 as u64, Ordering::Relaxed); - self.file_count.fetch_sub(1, Ordering::Relaxed); - self.file_byte_count.fetch_sub(old.size.0 as u64, Ordering::Relaxed); - } - } } // ============================================================================ @@ -221,6 +220,7 @@ impl CompactNeedleMap { /// Remove from map during loading (handle deletions in idx walk). fn delete_from_map(&mut self, key: NeedleId) { + self.metric.maybe_set_max_file_key(key); if let Some(old) = self.map.get(&key).cloned() { if old.size.is_valid() { self.metric.on_delete(&old); @@ -815,7 +815,8 @@ mod tests { let deleted = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap(); assert_eq!(deleted, Some(Size(100))); - assert_eq!(nm.file_count(), 0); + // Additive-only: file_count stays at 1 after delete + assert_eq!(nm.file_count(), 1); assert_eq!(nm.deleted_count(), 1); assert_eq!(nm.deleted_size(), 100); } @@ -831,15 +832,15 @@ mod tests { assert_eq!(nm.content_size(), 600); assert_eq!(nm.max_file_key(), NeedleId(3)); - // Update existing + // Update existing — additive-only: file_count increments, content_size adds nm.put(NeedleId(2), Offset::from_actual_offset(700), Size(250)).unwrap(); - assert_eq!(nm.file_count(), 3); // still 3 - assert_eq!(nm.content_size(), 650); // 100 + 250 + 300 + assert_eq!(nm.file_count(), 4); // 3 + 1 (always increments) + assert_eq!(nm.content_size(), 850); // 600 + 250 (always adds) - // Delete + // Delete — additive-only: file_count unchanged nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap(); - assert_eq!(nm.file_count(), 2); - assert_eq!(nm.deleted_count(), 1); + assert_eq!(nm.file_count(), 4); // unchanged + assert_eq!(nm.deleted_count(), 2); // 1 from overwrite + 1 from delete } #[test] @@ -859,7 +860,8 @@ mod tests { assert!(nm.get(NeedleId(1)).is_some()); assert!(nm.get(NeedleId(2)).is_none()); // deleted assert!(nm.get(NeedleId(3)).is_some()); - assert_eq!(nm.file_count(), 2); + // Additive-only: put(1)+put(2)+put(3) = 3, delete doesn't decrement + assert_eq!(nm.file_count(), 3); } #[test] @@ -909,7 +911,8 @@ mod tests { let deleted = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap(); assert_eq!(deleted, Some(Size(100))); - assert_eq!(nm.file_count(), 0); + // Additive-only: file_count stays at 1 after delete + assert_eq!(nm.file_count(), 1); assert_eq!(nm.deleted_count(), 1); assert_eq!(nm.deleted_size(), 100); @@ -932,15 +935,15 @@ mod tests { assert_eq!(nm.content_size(), 600); assert_eq!(nm.max_file_key(), NeedleId(3)); - // Update existing + // Update existing — additive-only: file_count increments, content_size adds nm.put(NeedleId(2), Offset::from_actual_offset(700), Size(250)).unwrap(); - assert_eq!(nm.file_count(), 3); - assert_eq!(nm.content_size(), 650); // 100 + 250 + 300 + assert_eq!(nm.file_count(), 4); // 3 + 1 (always increments) + assert_eq!(nm.content_size(), 850); // 600 + 250 (always adds) - // Delete + // Delete — additive-only: file_count unchanged nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap(); - assert_eq!(nm.file_count(), 2); - assert_eq!(nm.deleted_count(), 1); + assert_eq!(nm.file_count(), 4); // unchanged + assert_eq!(nm.deleted_count(), 2); // 1 from overwrite + 1 from delete } #[test] diff --git a/seaweed-volume/src/storage/types.rs b/seaweed-volume/src/storage/types.rs index 157f44c68..99354ed7d 100644 --- a/seaweed-volume/src/storage/types.rs +++ b/seaweed-volume/src/storage/types.rs @@ -23,7 +23,7 @@ pub const NEEDLE_CHECKSUM_SIZE: usize = 4; /// Maximum possible volume size with 4-byte offset: 32GB /// Formula: 4 * 1024 * 1024 * 1024 * 8 -pub const MAX_POSSIBLE_VOLUME_SIZE: u64 = 4 * 1024 * 1024 * 1024 * 8 * 256; +pub const MAX_POSSIBLE_VOLUME_SIZE: u64 = 4 * 1024 * 1024 * 1024 * 8; // ============================================================================ // NeedleId diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 7e02dc98e..36878c52c 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -927,6 +927,16 @@ impl Volume { n: &mut Needle, check_cookie: bool, ) -> Result<(u64, Size, bool), VolumeError> { + // TTL inheritance from volume (matching Go's writeNeedle2) + { + use crate::storage::needle::ttl::TTL; + let needle_ttl = n.ttl.unwrap_or(TTL::EMPTY); + if needle_ttl == TTL::EMPTY && self.super_block.ttl != TTL::EMPTY { + n.set_has_ttl(); + n.ttl = Some(self.super_block.ttl); + } + } + // Ensure checksum is computed before dedup check if n.checksum == crate::storage::needle::crc::CRC(0) && !n.data.is_empty() { n.checksum = crate::storage::needle::crc::CRC::new(&n.data); @@ -1047,7 +1057,20 @@ impl Volume { })?; let offset = dat_file.seek(SeekFrom::End(0))?; - dat_file.write_all(&bytes)?; + + // Check volume size limit before writing (matching Go's Append) + if offset >= MAX_POSSIBLE_VOLUME_SIZE && !n.data.is_empty() { + return Err(VolumeError::SizeLimitExceeded { + current: offset, + limit: MAX_POSSIBLE_VOLUME_SIZE, + }); + } + + if let Err(e) = dat_file.write_all(&bytes) { + // Truncate back to pre-write position on error (matching Go) + let _ = dat_file.set_len(offset); + return Err(VolumeError::Io(e)); + } Ok((offset, n.size, actual_size)) } @@ -1282,7 +1305,7 @@ impl Volume { /// Path to .vif file. fn vif_path(&self) -> String { - format!("{}/{}.vif", self.dir, self.id.0) + format!("{}.vif", self.data_file_name()) } /// Load volume info from .vif file. @@ -1558,19 +1581,49 @@ impl Volume { Ok(()) } + /// Write a needle blob and update the needle map index. + /// Matches Go's Volume.WriteNeedleBlob which appends to dat and calls nm.Put. + pub fn write_needle_blob_and_index( + &mut self, + needle_id: NeedleId, + needle_blob: &[u8], + size: Size, + ) -> Result<(), VolumeError> { + // Check volume size limit + let content_size = self.content_size(); + if MAX_POSSIBLE_VOLUME_SIZE < content_size + needle_blob.len() as u64 { + return Err(VolumeError::Io(io::Error::new( + io::ErrorKind::Other, + format!("volume size limit {} exceeded! current size is {}", MAX_POSSIBLE_VOLUME_SIZE, content_size), + ))); + } + + // Append blob at end of dat file + let dat_size = self.dat_file_size()? as i64; + self.write_needle_blob(dat_size, needle_blob)?; + + // Update needle map index + let offset = Offset::from_actual_offset(dat_size); + if let Some(ref mut nm) = self.nm { + nm.put(needle_id, offset, size)?; + } + + Ok(()) + } + pub fn needs_replication(&self) -> bool { self.super_block.replica_placement.get_copy_count() > 1 } - /// Garbage ratio: deleted_size / (content_size + deleted_size) + /// Garbage ratio: deleted_size / content_size (matching Go's garbageLevel). + /// content_size is the additive-only FileByteCounter. pub fn garbage_level(&self) -> f64 { let content = self.content_size(); - let deleted = self.deleted_size(); - let total = content + deleted; - if total == 0 { + if content == 0 { return 0.0; } - deleted as f64 / total as f64 + let deleted = self.deleted_size(); + deleted as f64 / content as f64 } pub fn dat_file_size(&self) -> io::Result { @@ -2213,7 +2266,8 @@ mod tests { }) .unwrap(); assert!(deleted_size.0 > 0); - assert_eq!(v.file_count(), 0); + // Additive-only: file_count stays at 1 after delete + assert_eq!(v.file_count(), 1); assert_eq!(v.deleted_count(), 1); // Read should fail with Deleted @@ -2384,7 +2438,8 @@ mod tests { ..Needle::default() }; v.delete_needle(&mut del).unwrap(); - assert_eq!(v.file_count(), 2); + // Additive-only: file_count stays at 3 after delete + assert_eq!(v.file_count(), 3); assert_eq!(v.deleted_count(), 1); let dat_size_before = v.dat_file_size().unwrap();