Browse Source

fix: round 3 Go parity — address all remaining behavioral differences

HTTP handlers (handlers.rs):
- Content-MD5 response now computed from uncompressed data (not compressed)
- JWT check moved before path parsing (invalid paths return 401 not 400)
- Delete always returns 202 ACCEPTED regardless of size
- JSON error responses support ?pretty=y and ?callback=fn (JSONP)
- DeleteResult size field changed from i32 to i64
- Removed If-None-Match wildcard "*" to match Go exact-match-only

Storage (volume.rs):
- Delete skips tombstone append for remote-tiered volumes (offset=0)

gRPC (grpc_server.rs):
- VolumeCopy preserves file timestamps via set_file_mtime()
- FetchAndWriteNeedle replication now concurrent (tokio::spawn)
- BatchDelete EC validates cookie before journal delete
- VolumeEcShardsGenerate reuses existing .vif EC config
- VolumeTailSender polling interval changed to 2s (was 1s)
- Removed final 100% progress from TierMoveDat to/from remote
- TierMoveDatFromRemote checks file exists (not exists+len>0)

Config/heartbeat:
- Duplicate UUID retry count fixed to 3 (was 4)
- S3 part sizes: 8MB upload, 4MB download (was 64MB)

Router (volume_server.rs):
- UI route conditionally registered based on both signing keys

All tests pass: 61 HTTP + 82 gRPC + 16 Rust = 159/159
rust-volume-server
Chris Lu 1 day ago
parent
commit
d65be83a25
  1. 6
      seaweed-volume/src/images.rs
  2. 6
      seaweed-volume/src/lib.rs
  3. 15
      seaweed-volume/src/remote_storage/mod.rs
  4. 46
      seaweed-volume/src/remote_storage/s3.rs
  5. 10
      seaweed-volume/src/remote_storage/s3_tier.rs
  6. 35
      seaweed-volume/src/security.rs
  7. 1370
      seaweed-volume/src/server/grpc_server.rs
  8. 231
      seaweed-volume/src/server/handlers.rs
  9. 24
      seaweed-volume/src/server/heartbeat.rs
  10. 5
      seaweed-volume/src/server/volume_server.rs
  11. 57
      seaweed-volume/src/storage/erasure_coding/ec_decoder.rs
  12. 87
      seaweed-volume/src/storage/erasure_coding/ec_encoder.rs
  13. 29
      seaweed-volume/src/storage/erasure_coding/ec_shard.rs
  14. 74
      seaweed-volume/src/storage/erasure_coding/ec_volume.rs
  15. 10
      seaweed-volume/src/storage/erasure_coding/mod.rs
  16. 24
      seaweed-volume/src/storage/idx/mod.rs
  17. 12
      seaweed-volume/src/storage/mod.rs
  18. 209
      seaweed-volume/src/storage/needle/needle.rs
  19. 128
      seaweed-volume/src/storage/needle/ttl.rs
  20. 281
      seaweed-volume/src/storage/needle_map.rs
  21. 9
      seaweed-volume/src/storage/store.rs
  22. 24
      seaweed-volume/src/storage/super_block.rs
  23. 56
      seaweed-volume/src/storage/types.rs
  24. 72
      seaweed-volume/src/storage/volume.rs
  25. 2
      seaweed-volume/tests/http_integration.rs

6
seaweed-volume/src/images.rs

@ -192,9 +192,9 @@ mod tests {
fn test_rotate_180() {
// Create a 2x2 image with distinct pixel colors
let mut img = RgbaImage::new(2, 2);
img.put_pixel(0, 0, image::Rgba([255, 0, 0, 255])); // red top-left
img.put_pixel(1, 0, image::Rgba([0, 255, 0, 255])); // green top-right
img.put_pixel(0, 1, image::Rgba([0, 0, 255, 255])); // blue bottom-left
img.put_pixel(0, 0, image::Rgba([255, 0, 0, 255])); // red top-left
img.put_pixel(1, 0, image::Rgba([0, 255, 0, 255])); // green top-right
img.put_pixel(0, 1, image::Rgba([0, 0, 255, 255])); // blue bottom-left
img.put_pixel(1, 1, image::Rgba([255, 255, 0, 255])); // yellow bottom-right
let dynamic = DynamicImage::ImageRgba8(img);

6
seaweed-volume/src/lib.rs

@ -1,10 +1,10 @@
pub mod config;
pub mod images;
pub mod storage;
pub mod security;
pub mod server;
pub mod metrics;
pub mod remote_storage;
pub mod security;
pub mod server;
pub mod storage;
/// Generated protobuf modules.
pub mod pb {

15
seaweed-volume/src/remote_storage/mod.rs

@ -55,10 +55,7 @@ pub trait RemoteStorageClient: Send + Sync {
) -> Result<RemoteEntry, RemoteStorageError>;
/// Delete a file from remote storage.
async fn delete_file(
&self,
loc: &RemoteStorageLocation,
) -> Result<(), RemoteStorageError>;
async fn delete_file(&self, loc: &RemoteStorageLocation) -> Result<(), RemoteStorageError>;
/// List all buckets.
async fn list_buckets(&self) -> Result<Vec<String>, RemoteStorageError>;
@ -73,8 +70,8 @@ pub fn make_remote_storage_client(
) -> Result<Box<dyn RemoteStorageClient>, RemoteStorageError> {
match conf.r#type.as_str() {
// All S3-compatible backends use the same client with different credentials
"s3" | "wasabi" | "backblaze" | "aliyun" | "tencent" | "baidu"
| "filebase" | "storj" | "contabo" => {
"s3" | "wasabi" | "backblaze" | "aliyun" | "tencent" | "baidu" | "filebase" | "storj"
| "contabo" => {
let (access_key, secret_key, endpoint, region) = extract_s3_credentials(conf);
Ok(Box::new(s3::S3RemoteStorageClient::new(
conf.clone(),
@ -96,7 +93,11 @@ fn extract_s3_credentials(conf: &RemoteConf) -> (String, String, String, String)
conf.s3_access_key.clone(),
conf.s3_secret_key.clone(),
conf.s3_endpoint.clone(),
if conf.s3_region.is_empty() { "us-east-1".to_string() } else { conf.s3_region.clone() },
if conf.s3_region.is_empty() {
"us-east-1".to_string()
} else {
conf.s3_region.clone()
},
),
"wasabi" => (
conf.wasabi_access_key.clone(),

46
seaweed-volume/src/remote_storage/s3.rs

@ -2,12 +2,12 @@
//!
//! Works with AWS S3, MinIO, SeaweedFS S3, and all S3-compatible providers.
use aws_sdk_s3::Client;
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::Client;
use crate::pb::remote_pb::{RemoteConf, RemoteStorageLocation};
use super::{RemoteEntry, RemoteStorageClient, RemoteStorageError};
use crate::pb::remote_pb::{RemoteConf, RemoteStorageLocation};
/// S3-compatible remote storage client.
pub struct S3RemoteStorageClient {
@ -25,7 +25,11 @@ impl S3RemoteStorageClient {
endpoint: &str,
force_path_style: bool,
) -> Self {
let region = if region.is_empty() { "us-east-1" } else { region };
let region = if region.is_empty() {
"us-east-1"
} else {
region
};
let credentials = Credentials::new(
access_key,
@ -61,9 +65,7 @@ impl RemoteStorageClient for S3RemoteStorageClient {
) -> Result<Vec<u8>, RemoteStorageError> {
let key = loc.path.trim_start_matches('/');
let mut req = self.client.get_object()
.bucket(&loc.bucket)
.key(key);
let mut req = self.client.get_object().bucket(&loc.bucket).key(key);
// Set byte range if specified
if size > 0 {
@ -82,7 +84,10 @@ impl RemoteStorageClient for S3RemoteStorageClient {
}
})?;
let data = resp.body.collect().await
let data = resp
.body
.collect()
.await
.map_err(|e| RemoteStorageError::Other(format!("s3 read body: {}", e)))?;
Ok(data.into_bytes().to_vec())
@ -95,7 +100,9 @@ impl RemoteStorageClient for S3RemoteStorageClient {
) -> Result<RemoteEntry, RemoteStorageError> {
let key = loc.path.trim_start_matches('/');
let resp = self.client.put_object()
let resp = self
.client
.put_object()
.bucket(&loc.bucket)
.key(key)
.body(ByteStream::from(data.to_vec()))
@ -120,7 +127,9 @@ impl RemoteStorageClient for S3RemoteStorageClient {
) -> Result<RemoteEntry, RemoteStorageError> {
let key = loc.path.trim_start_matches('/');
let resp = self.client.head_object()
let resp = self
.client
.head_object()
.bucket(&loc.bucket)
.key(key)
.send()
@ -136,21 +145,17 @@ impl RemoteStorageClient for S3RemoteStorageClient {
Ok(RemoteEntry {
size: resp.content_length().unwrap_or(0),
last_modified_at: resp.last_modified()
.map(|t| t.secs())
.unwrap_or(0),
last_modified_at: resp.last_modified().map(|t| t.secs()).unwrap_or(0),
e_tag: resp.e_tag().unwrap_or_default().to_string(),
storage_name: loc.name.clone(),
})
}
async fn delete_file(
&self,
loc: &RemoteStorageLocation,
) -> Result<(), RemoteStorageError> {
async fn delete_file(&self, loc: &RemoteStorageLocation) -> Result<(), RemoteStorageError> {
let key = loc.path.trim_start_matches('/');
self.client.delete_object()
self.client
.delete_object()
.bucket(&loc.bucket)
.key(key)
.send()
@ -161,12 +166,15 @@ impl RemoteStorageClient for S3RemoteStorageClient {
}
async fn list_buckets(&self) -> Result<Vec<String>, RemoteStorageError> {
let resp = self.client.list_buckets()
let resp = self
.client
.list_buckets()
.send()
.await
.map_err(|e| RemoteStorageError::Other(format!("s3 list buckets: {}", e)))?;
Ok(resp.buckets()
Ok(resp
.buckets()
.iter()
.filter_map(|b| b.name().map(String::from))
.collect())

10
seaweed-volume/src/remote_storage/s3_tier.rs

@ -6,9 +6,9 @@
use std::collections::HashMap;
use std::sync::Arc;
use aws_sdk_s3::Client;
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
use aws_sdk_s3::Client;
use tokio::io::AsyncReadExt;
/// Configuration for an S3 tier backend.
@ -89,8 +89,8 @@ impl S3TierBackend {
.map_err(|e| format!("failed to stat file {}: {}", file_path, e))?;
let file_size = metadata.len();
// Calculate part size: start at 64MB, scale up for very large files
let mut part_size: u64 = 64 * 1024 * 1024;
// Calculate part size: start at 8MB, scale up for very large files (matches Go)
let mut part_size: u64 = 8 * 1024 * 1024;
while part_size * 1000 < file_size {
part_size *= 4;
}
@ -214,8 +214,8 @@ impl S3TierBackend {
let file_size = head_resp.content_length().unwrap_or(0) as u64;
// Download in chunks
let part_size: u64 = 64 * 1024 * 1024;
// Download in chunks (4MB to match Go)
let part_size: u64 = 4 * 1024 * 1024;
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.create(true)

35
seaweed-volume/src/security.rs

@ -8,7 +8,7 @@ use std::collections::HashSet;
use std::net::IpAddr;
use std::time::{SystemTime, UNIX_EPOCH};
use jsonwebtoken::{decode, encode, DecodingKey, EncodingKey, Header, Validation, Algorithm};
use jsonwebtoken::{decode, encode, Algorithm, DecodingKey, EncodingKey, Header, Validation};
use serde::{Deserialize, Serialize};
// ============================================================================
@ -81,10 +81,7 @@ pub fn gen_jwt(
}
/// Decode and validate a JWT token.
pub fn decode_jwt(
signing_key: &SigningKey,
token: &str,
) -> Result<FileIdClaims, JwtError> {
pub fn decode_jwt(signing_key: &SigningKey, token: &str) -> Result<FileIdClaims, JwtError> {
if signing_key.is_empty() {
return Err(JwtError::NoSigningKey);
}
@ -284,13 +281,25 @@ fn ip_in_cidr(ip: &IpAddr, network: &IpAddr, prefix_len: u8) -> bool {
(IpAddr::V4(ip), IpAddr::V4(net)) => {
let ip_bits = u32::from(*ip);
let net_bits = u32::from(*net);
let mask = if prefix_len == 0 { 0 } else if prefix_len >= 32 { u32::MAX } else { u32::MAX << (32 - prefix_len) };
let mask = if prefix_len == 0 {
0
} else if prefix_len >= 32 {
u32::MAX
} else {
u32::MAX << (32 - prefix_len)
};
(ip_bits & mask) == (net_bits & mask)
}
(IpAddr::V6(ip), IpAddr::V6(net)) => {
let ip_bits = u128::from(*ip);
let net_bits = u128::from(*net);
let mask = if prefix_len == 0 { 0 } else if prefix_len >= 128 { u128::MAX } else { u128::MAX << (128 - prefix_len) };
let mask = if prefix_len == 0 {
0
} else if prefix_len >= 128 {
u128::MAX
} else {
u128::MAX << (128 - prefix_len)
};
(ip_bits & mask) == (net_bits & mask)
}
_ => false, // V4/V6 mismatch
@ -358,13 +367,7 @@ mod tests {
#[test]
fn test_guard_empty_whitelist() {
let guard = Guard::new(
&[],
SigningKey(vec![]),
0,
SigningKey(vec![]),
0,
);
let guard = Guard::new(&[], SigningKey(vec![]), 0, SigningKey(vec![]), 0);
assert!(guard.check_whitelist("192.168.1.1:8080"));
}
@ -433,7 +436,9 @@ mod tests {
let token = gen_jwt(&key, 3600, "3,01637037d6").unwrap();
// Correct file ID
assert!(guard.check_jwt_for_file(Some(&token), "3,01637037d6", true).is_ok());
assert!(guard
.check_jwt_for_file(Some(&token), "3,01637037d6", true)
.is_ok());
// Wrong file ID
let err = guard.check_jwt_for_file(Some(&token), "4,deadbeef", true);

1370
seaweed-volume/src/server/grpc_server.rs
File diff suppressed because it is too large
View File

231
seaweed-volume/src/server/handlers.rs

@ -332,9 +332,9 @@ async fn do_replicated_request(
for loc in remote_locations {
let url = format!("http://{}{}?{}", loc.url, path, new_query);
let client = state.http_client.clone();
let mut req_builder = client.request(method.clone(), &url);
// Forward relevant headers
if let Some(ct) = headers.get(axum::http::header::CONTENT_TYPE) {
req_builder = req_builder.header(axum::http::header::CONTENT_TYPE, ct);
@ -612,12 +612,9 @@ async fn get_or_head_handler_inner(
let path = request.uri().path().to_string();
let method = request.method().clone();
let (vid, needle_id, cookie) = match parse_url_path(&path) {
Some(parsed) => parsed,
None => return (StatusCode::BAD_REQUEST, "invalid URL path").into_response(),
};
// JWT check for reads
// JWT check for reads — must happen BEFORE path parsing to match Go behavior.
// Go's GetOrHeadHandler calls maybeCheckJwtAuthorization before NewVolumeId,
// so invalid paths with JWT enabled return 401, not 400.
let file_id = extract_file_id(&path);
let token = extract_jwt(&headers, request.uri());
if let Err(e) = state
@ -627,6 +624,11 @@ async fn get_or_head_handler_inner(
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
}
let (vid, needle_id, cookie) = match parse_url_path(&path) {
Some(parsed) => parsed,
None => return (StatusCode::BAD_REQUEST, "invalid URL path").into_response(),
};
// Check if volume exists locally; if not, proxy/redirect based on read_mode.
// This mirrors Go's hasVolume check in GetOrHeadHandler.
// NOTE: The RwLockReadGuard must be dropped before any .await to keep the future Send.
@ -835,7 +837,7 @@ async fn get_or_head_handler_inner(
// Check If-None-Match SECOND
if let Some(if_none_match) = headers.get(header::IF_NONE_MATCH) {
if let Ok(inm) = if_none_match.to_str() {
if inm == etag || inm == "*" {
if inm == etag {
return StatusCode::NOT_MODIFIED.into_response();
}
}
@ -1320,7 +1322,9 @@ pub async fn post_handler(
let (vid, needle_id, cookie) = match parse_url_path(&path) {
Some(parsed) => parsed,
None => return json_error(StatusCode::BAD_REQUEST, "invalid URL path"),
None => {
return json_error_with_query(StatusCode::BAD_REQUEST, "invalid URL path", Some(&query))
}
};
// JWT check for writes
@ -1330,7 +1334,11 @@ pub async fn post_handler(
.guard
.check_jwt_for_file(token.as_deref(), &file_id, true)
{
return json_error(StatusCode::UNAUTHORIZED, format!("JWT error: {}", e));
return json_error_with_query(
StatusCode::UNAUTHORIZED,
format!("JWT error: {}", e),
Some(&query),
);
}
// Upload throttling: check inflight bytes against limit
@ -1360,7 +1368,11 @@ pub async fn post_handler(
.await
.is_err()
{
return json_error(StatusCode::TOO_MANY_REQUESTS, "upload limit exceeded");
return json_error_with_query(
StatusCode::TOO_MANY_REQUESTS,
"upload limit exceeded",
Some(&query),
);
}
}
state
@ -1386,9 +1398,10 @@ pub async fn post_handler(
if let Some(ct) = headers.get(header::CONTENT_TYPE) {
if let Ok(ct_str) = ct.to_str() {
if ct_str.starts_with("multipart/form-data") && !ct_str.contains("boundary=") {
return json_error(
return json_error_with_query(
StatusCode::BAD_REQUEST,
"no multipart boundary param in Content-Type",
Some(&query),
);
}
}
@ -1402,7 +1415,13 @@ pub async fn post_handler(
// Read body
let body = match axum::body::to_bytes(request.into_body(), usize::MAX).await {
Ok(b) => b,
Err(e) => return json_error(StatusCode::BAD_REQUEST, format!("read body: {}", e)),
Err(e) => {
return json_error_with_query(
StatusCode::BAD_REQUEST,
format!("read body: {}", e),
Some(&query),
)
}
};
// H5: Multipart form-data parsing
@ -1427,8 +1446,10 @@ pub async fn post_handler(
})
.unwrap_or_default();
let mut multipart =
multer::Multipart::new(futures::stream::once(async { Ok::<_, std::io::Error>(body.clone()) }), boundary);
let mut multipart = multer::Multipart::new(
futures::stream::once(async { Ok::<_, std::io::Error>(body.clone()) }),
boundary,
);
let mut file_data: Option<Vec<u8>> = None;
let mut file_name: Option<String> = None;
@ -1440,11 +1461,7 @@ pub async fn post_handler(
let fname = field.file_name().map(|s| {
// Clean Windows backslashes
let cleaned = s.replace('\\', "/");
cleaned
.rsplit('/')
.next()
.unwrap_or(&cleaned)
.to_string()
cleaned.rsplit('/').next().unwrap_or(&cleaned).to_string()
});
let fct = field.content_type().map(|m| m.to_string());
@ -1478,7 +1495,11 @@ pub async fn post_handler(
// Check file size limit
if state.file_size_limit_bytes > 0 && body_data_raw.len() as i64 > state.file_size_limit_bytes {
return json_error(StatusCode::BAD_REQUEST, "file size limit exceeded");
return json_error_with_query(
StatusCode::BAD_REQUEST,
"file size limit exceeded",
Some(&query),
);
}
// Validate Content-MD5 if provided
@ -1489,12 +1510,13 @@ pub async fn post_handler(
hasher.update(&body_data_raw);
let actual = base64::engine::general_purpose::STANDARD.encode(hasher.finalize());
if actual != *expected_md5 {
return json_error(
return json_error_with_query(
StatusCode::BAD_REQUEST,
format!(
"Content-MD5 mismatch: expected {} got {}",
expected_md5, actual
),
Some(&query),
);
}
}
@ -1579,6 +1601,15 @@ pub async fn post_handler(
// H3: Capture original data size BEFORE auto-compression
let original_data_size = body_data.len() as u32;
// H1: Compute Content-MD5 from uncompressed data BEFORE auto-compression
let original_content_md5 = {
use base64::Engine;
use md5::{Digest, Md5};
let mut hasher = Md5::new();
hasher.update(&body_data);
base64::engine::general_purpose::STANDARD.encode(hasher.finalize())
};
// Auto-compress compressible file types (matches Go's IsCompressableFileType).
// Only compress if not already gzipped and compression saves >10%.
let (final_data, final_is_gzipped) = if !is_gzipped && !is_chunk_manifest {
@ -1675,23 +1706,24 @@ pub async fn post_handler(
})
};
if needs_replication {
if let Err(e) = do_replicated_request(
&state,
vid.0,
Method::POST,
&path,
&query,
&headers,
Some(body.clone()),
)
.await
{
tracing::error!("replicated write failed: {}", e);
return json_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("replication failed: {}", e),
);
}
if let Err(e) = do_replicated_request(
&state,
vid.0,
Method::POST,
&path,
&query,
&headers,
Some(body.clone()),
)
.await
{
tracing::error!("replicated write failed: {}", e);
return json_error_with_query(
StatusCode::INTERNAL_SERVER_ERROR,
format!("replication failed: {}", e),
Some(&query),
);
}
}
}
@ -1701,14 +1733,8 @@ pub async fn post_handler(
let etag = format!("\"{}\"", n.etag());
(StatusCode::NO_CONTENT, [(header::ETAG, etag)]).into_response()
} else {
// H2: Compute Content-MD5 as base64(md5(original_data))
let content_md5_value = {
use base64::Engine;
use md5::{Digest, Md5};
let mut hasher = Md5::new();
hasher.update(&n.data);
base64::engine::general_purpose::STANDARD.encode(hasher.finalize())
};
// H2: Use Content-MD5 computed from original uncompressed data
let content_md5_value = original_content_md5;
let result = UploadResult {
name: filename.clone(),
size: original_data_size, // H3: use original size, not compressed
@ -1724,22 +1750,21 @@ pub async fn post_handler(
.with_label_values(&["write"])
.observe(start.elapsed().as_secs_f64());
let mut resp = (StatusCode::CREATED, axum::Json(result)).into_response();
resp.headers_mut().insert(
"Content-MD5",
content_md5_value.parse().unwrap(),
);
resp.headers_mut()
.insert("Content-MD5", content_md5_value.parse().unwrap());
resp
}
}
Err(crate::storage::volume::VolumeError::NotFound) => {
json_error(StatusCode::NOT_FOUND, "volume not found")
json_error_with_query(StatusCode::NOT_FOUND, "volume not found", Some(&query))
}
Err(crate::storage::volume::VolumeError::ReadOnly) => {
json_error(StatusCode::FORBIDDEN, "volume is read-only")
json_error_with_query(StatusCode::FORBIDDEN, "volume is read-only", Some(&query))
}
Err(e) => json_error(
Err(e) => json_error_with_query(
StatusCode::INTERNAL_SERVER_ERROR,
format!("write error: {}", e),
Some(&query),
),
};
@ -1753,7 +1778,7 @@ pub async fn post_handler(
#[derive(Serialize)]
struct DeleteResult {
size: i32,
size: i64,
}
pub async fn delete_handler(
@ -1766,11 +1791,18 @@ pub async fn delete_handler(
.inc();
let path = request.uri().path().to_string();
let del_query = request.uri().query().unwrap_or("").to_string();
let headers = request.headers().clone();
let (vid, needle_id, cookie) = match parse_url_path(&path) {
Some(parsed) => parsed,
None => return json_error(StatusCode::BAD_REQUEST, "invalid URL path"),
None => {
return json_error_with_query(
StatusCode::BAD_REQUEST,
"invalid URL path",
Some(&del_query),
)
}
};
// JWT check for writes (deletes use write key)
@ -1780,11 +1812,14 @@ pub async fn delete_handler(
.guard
.check_jwt_for_file(token.as_deref(), &file_id, true)
{
return json_error(StatusCode::UNAUTHORIZED, format!("JWT error: {}", e));
return json_error_with_query(
StatusCode::UNAUTHORIZED,
format!("JWT error: {}", e),
Some(&del_query),
);
}
// H9: Parse custom timestamp from query param; default to now (not 0)
let del_query = request.uri().query().unwrap_or("");
let del_ts_str = del_query
.split('&')
.find_map(|p| p.strip_prefix("ts="))
@ -1822,9 +1857,10 @@ pub async fn delete_handler(
}
}
if n.cookie != original_cookie {
return json_error(
return json_error_with_query(
StatusCode::BAD_REQUEST,
"File Random Cookie does not match.",
Some(&del_query),
);
}
@ -1854,9 +1890,10 @@ pub async fn delete_handler(
let (chunk_vid, chunk_nid, chunk_cookie) = match parse_url_path(&chunk.fid) {
Some(p) => p,
None => {
return json_error(
return json_error_with_query(
StatusCode::INTERNAL_SERVER_ERROR,
format!("invalid chunk fid: {}", chunk.fid),
Some(&del_query),
);
}
};
@ -1869,27 +1906,30 @@ pub async fn delete_handler(
{
let store = state.store.read().unwrap();
if let Err(e) = store.read_volume_needle(chunk_vid, &mut chunk_needle) {
return json_error(
return json_error_with_query(
StatusCode::INTERNAL_SERVER_ERROR,
format!("read chunk {}: {}", chunk.fid, e),
Some(&del_query),
);
}
}
// Delete the chunk
let mut store = state.store.write().unwrap();
if let Err(e) = store.delete_volume_needle(chunk_vid, &mut chunk_needle) {
return json_error(
return json_error_with_query(
StatusCode::INTERNAL_SERVER_ERROR,
format!("delete chunk {}: {}", chunk.fid, e),
Some(&del_query),
);
}
}
// Delete the manifest itself
let mut store = state.store.write().unwrap();
if let Err(e) = store.delete_volume_needle(vid, &mut n) {
return json_error(
return json_error_with_query(
StatusCode::INTERNAL_SERVER_ERROR,
format!("delete manifest: {}", e),
Some(&del_query),
);
}
metrics::REQUEST_DURATION
@ -1897,7 +1937,7 @@ pub async fn delete_handler(
.observe(start.elapsed().as_secs_f64());
// Return the manifest's declared size (matches Go behavior)
let result = DeleteResult {
size: manifest.size as i32,
size: manifest.size as i64,
};
return (StatusCode::ACCEPTED, axum::Json(result)).into_response();
}
@ -1908,7 +1948,7 @@ pub async fn delete_handler(
store.delete_volume_needle(vid, &mut n)
};
let is_replicate = request.uri().query().unwrap_or("").split('&').any(|p| p == "type=replicate");
let is_replicate = del_query.split('&').any(|p| p == "type=replicate");
if !is_replicate && delete_result.is_ok() && !state.master_url.is_empty() {
let needs_replication = {
let store = state.store.read().unwrap();
@ -1922,16 +1962,17 @@ pub async fn delete_handler(
vid.0,
Method::DELETE,
&path,
request.uri().query().unwrap_or(""),
&del_query,
&headers,
None,
)
.await
{
tracing::error!("replicated delete failed: {}", e);
return json_error(
return json_error_with_query(
StatusCode::INTERNAL_SERVER_ERROR,
format!("replication failed: {}", e),
Some(&del_query),
);
}
}
@ -1939,23 +1980,22 @@ pub async fn delete_handler(
match delete_result {
Ok(size) => {
if size.0 == 0 {
let result = DeleteResult { size: 0 };
return (StatusCode::NOT_FOUND, axum::Json(result)).into_response();
}
metrics::REQUEST_DURATION
.with_label_values(&["delete"])
.observe(start.elapsed().as_secs_f64());
let result = DeleteResult { size: size.0 };
let result = DeleteResult {
size: size.0 as i64,
};
(StatusCode::ACCEPTED, axum::Json(result)).into_response()
}
Err(crate::storage::volume::VolumeError::NotFound) => {
let result = DeleteResult { size: 0 };
(StatusCode::NOT_FOUND, axum::Json(result)).into_response()
}
Err(e) => json_error(
Err(e) => json_error_with_query(
StatusCode::INTERNAL_SERVER_ERROR,
format!("delete error: {}", e),
Some(&del_query),
),
}
}
@ -2320,10 +2360,47 @@ fn try_expand_chunk_manifest(
// Helpers
// ============================================================================
/// Return a JSON error response: `{"error": "<msg>"}`.
fn json_error(status: StatusCode, msg: impl Into<String>) -> Response {
/// Return a JSON error response with optional query string for pretty/JSONP support.
/// Supports `?pretty=y` for pretty-printed JSON and `?callback=fn` for JSONP,
/// matching Go's writeJsonError behavior.
fn json_error_with_query(
status: StatusCode,
msg: impl Into<String>,
query: Option<&str>,
) -> Response {
let body = serde_json::json!({"error": msg.into()});
(status, axum::Json(body)).into_response()
let (is_pretty, callback) = if let Some(q) = query {
let pretty = q.split('&').any(|p| p == "pretty=y");
let cb = q
.split('&')
.find_map(|p| p.strip_prefix("callback="))
.map(|s| s.to_string());
(pretty, cb)
} else {
(false, None)
};
let json_body = if is_pretty {
serde_json::to_string_pretty(&body).unwrap()
} else {
serde_json::to_string(&body).unwrap()
};
if let Some(cb) = callback {
let jsonp = format!("{}({});\n", cb, json_body);
Response::builder()
.status(status)
.header(header::CONTENT_TYPE, "application/javascript")
.body(Body::from(jsonp))
.unwrap()
} else {
Response::builder()
.status(status)
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(json_body))
.unwrap()
}
}
/// Extract JWT token from query param, Authorization header, or Cookie.

24
seaweed-volume/src/server/heartbeat.rs

@ -79,7 +79,9 @@ pub async fn run_heartbeat_with_state(
SleepDuplicate(Duration),
SleepPulse,
}
let action = match do_heartbeat(&config, &state, &grpc_addr, pulse, &mut shutdown_rx).await {
let action = match do_heartbeat(&config, &state, &grpc_addr, pulse, &mut shutdown_rx)
.await
{
Ok(Some(leader)) => {
info!("Master leader changed to {}", leader);
PostAction::LeaderRedirect(leader)
@ -97,9 +99,11 @@ pub async fn run_heartbeat_with_state(
if err_msg.contains("duplicate") && err_msg.contains("UUID") {
duplicate_retry_count += 1;
if duplicate_retry_count > 3 {
if duplicate_retry_count >= 3 {
error!("Shut down Volume Server due to persistent duplicate volume directories after 3 retries");
error!("Please check if another volume server is using the same directory");
error!(
"Please check if another volume server is using the same directory"
);
std::process::exit(1);
}
let retry_delay = Duration::from_secs(2u64.pow(duplicate_retry_count));
@ -188,7 +192,11 @@ async fn do_heartbeat(
// Keep track of what we sent, to generate delta updates
let initial_hb = collect_heartbeat(config, state);
let mut last_volumes: HashMap<u32, master_pb::VolumeInformationMessage> = initial_hb.volumes.iter().map(|v| (v.id, v.clone())).collect();
let mut last_volumes: HashMap<u32, master_pb::VolumeInformationMessage> = initial_hb
.volumes
.iter()
.map(|v| (v.id, v.clone()))
.collect();
// Send initial heartbeats BEFORE calling send_heartbeat to avoid deadlock:
// the server won't send response headers until it receives the first message,
@ -256,10 +264,10 @@ async fn do_heartbeat(
_ = state.volume_state_notify.notified() => {
let current_hb = collect_heartbeat(config, state);
let current_volumes: HashMap<u32, _> = current_hb.volumes.iter().map(|v| (v.id, v.clone())).collect();
let mut new_vols = Vec::new();
let mut del_vols = Vec::new();
for (id, vol) in &current_volumes {
if !last_volumes.contains_key(id) {
new_vols.push(master_pb::VolumeShortInformationMessage {
@ -273,7 +281,7 @@ async fn do_heartbeat(
});
}
}
for (id, vol) in &last_volumes {
if !current_volumes.contains_key(id) {
del_vols.push(master_pb::VolumeShortInformationMessage {
@ -287,7 +295,7 @@ async fn do_heartbeat(
});
}
}
if !new_vols.is_empty() || !del_vols.is_empty() {
let delta_hb = master_pb::Heartbeat {
ip: config.ip.clone(),

5
seaweed-volume/src/server/volume_server.rs

@ -194,8 +194,11 @@ fn public_options_response() -> Response {
}
/// Build the admin (private) HTTP router — supports all operations.
/// UI route is only registered when no signing keys are configured,
/// matching Go's `if signingKey == "" || enableUiAccess` check.
pub fn build_admin_router(state: Arc<VolumeServerState>) -> Router {
build_admin_router_with_ui(state.clone(), state.guard.signing_key.0.is_empty())
let ui_enabled = state.guard.signing_key.0.is_empty() && !state.guard.has_read_signing_key();
build_admin_router_with_ui(state, ui_enabled)
}
/// Build the admin router with an explicit UI exposure flag.

57
seaweed-volume/src/storage/erasure_coding/ec_decoder.rs

@ -16,11 +16,7 @@ use crate::storage::volume::volume_file_name;
/// Calculate .dat file size from the max offset entry in .ecx.
/// Reads the volume version from the first EC shard (.ec00) superblock,
/// then scans .ecx entries to find the largest (offset + needle_actual_size).
pub fn find_dat_file_size(
dir: &str,
collection: &str,
volume_id: VolumeId,
) -> io::Result<i64> {
pub fn find_dat_file_size(dir: &str, collection: &str, volume_id: VolumeId) -> io::Result<i64> {
let base = volume_file_name(dir, collection, volume_id);
// Read volume version from .ec00 superblock
@ -40,7 +36,8 @@ pub fn find_dat_file_size(
for i in 0..entry_count {
let start = i * NEEDLE_MAP_ENTRY_SIZE;
let (_, offset, size) = idx_entry_from_bytes(&ecx_data[start..start + NEEDLE_MAP_ENTRY_SIZE]);
let (_, offset, size) =
idx_entry_from_bytes(&ecx_data[start..start + NEEDLE_MAP_ENTRY_SIZE]);
if size.is_deleted() {
continue;
}
@ -180,14 +177,24 @@ mod tests {
// Create volume with data
let mut v = Volume::new(
dir, dir, "", VolumeId(1),
NeedleMapKind::InMemory, None, None, 0, Version::current(),
).unwrap();
let test_data: Vec<(NeedleId, Vec<u8>)> = (1..=3).map(|i| {
let data = format!("EC round trip data for needle {}", i);
(NeedleId(i), data.into_bytes())
}).collect();
dir,
dir,
"",
VolumeId(1),
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap();
let test_data: Vec<(NeedleId, Vec<u8>)> = (1..=3)
.map(|i| {
let data = format!("EC round trip data for needle {}", i);
(NeedleId(i), data.into_bytes())
})
.collect();
for (id, data) in &test_data {
let mut n = Needle {
@ -216,7 +223,8 @@ mod tests {
std::fs::remove_file(format!("{}/1.idx", dir)).unwrap();
// Reconstruct from EC shards
write_dat_file_from_shards(dir, "", VolumeId(1), original_dat_size as i64, data_shards).unwrap();
write_dat_file_from_shards(dir, "", VolumeId(1), original_dat_size as i64, data_shards)
.unwrap();
write_idx_file_from_ec_index(dir, "", VolumeId(1)).unwrap();
// Verify reconstructed .dat matches original
@ -229,12 +237,23 @@ mod tests {
// Verify we can load and read from reconstructed volume
let v2 = Volume::new(
dir, dir, "", VolumeId(1),
NeedleMapKind::InMemory, None, None, 0, Version::current(),
).unwrap();
dir,
dir,
"",
VolumeId(1),
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap();
for (id, expected_data) in &test_data {
let mut n = Needle { id: *id, ..Needle::default() };
let mut n = Needle {
id: *id,
..Needle::default()
};
v2.read_needle(&mut n).unwrap();
assert_eq!(&n.data, expected_data, "needle {} data should match", id);
}

87
seaweed-volume/src/storage/erasure_coding/ec_encoder.rs

@ -37,9 +37,8 @@ pub fn write_ec_files(
let dat_file = File::open(&dat_path)?;
let dat_size = dat_file.metadata()?.len() as i64;
let rs = ReedSolomon::new(data_shards, parity_shards).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e))
})?;
let rs = ReedSolomon::new(data_shards, parity_shards)
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e)))?;
// Create shard files
let total_shards = data_shards + parity_shards;
@ -52,7 +51,14 @@ pub fn write_ec_files(
}
// Encode in large blocks, then small blocks
encode_dat_file(&dat_file, dat_size, &rs, &mut shards, data_shards, parity_shards)?;
encode_dat_file(
&dat_file,
dat_size,
&rs,
&mut shards,
data_shards,
parity_shards,
)?;
// Close all shards
for shard in &mut shards {
@ -77,9 +83,8 @@ pub fn rebuild_ec_files(
return Ok(());
}
let rs = ReedSolomon::new(data_shards, parity_shards).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e))
})?;
let rs = ReedSolomon::new(data_shards, parity_shards)
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e)))?;
let total_shards = data_shards + parity_shards;
let mut shards: Vec<EcVolumeShard> = (0..total_shards as u8)
@ -140,7 +145,10 @@ pub fn rebuild_ec_files(
// Reconstruct missing shards
rs.reconstruct(&mut buffers).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("reed-solomon reconstruct: {:?}", e))
io::Error::new(
io::ErrorKind::Other,
format!("reed-solomon reconstruct: {:?}", e),
)
})?;
// Write recovered data into the missing shards
@ -171,9 +179,8 @@ pub fn verify_ec_shards(
data_shards: usize,
parity_shards: usize,
) -> io::Result<(Vec<u32>, Vec<String>)> {
let rs = ReedSolomon::new(data_shards, parity_shards).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e))
})?;
let rs = ReedSolomon::new(data_shards, parity_shards)
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("reed-solomon init: {:?}", e)))?;
let total_shards = data_shards + parity_shards;
let mut shards: Vec<EcVolumeShard> = (0..total_shards as u8)
@ -232,7 +239,7 @@ pub fn verify_ec_shards(
// is corrupted without recalculating parities or syndromes, so we just
// log that this batch has corruption. Wait, we can test each parity shard!
// Let's re-encode from the first `data_shards` and compare to the actual `parity_shards`.
let mut verify_buffers = buffers.clone();
// Clear the parity parts
for i in data_shards..total_shards {
@ -242,7 +249,10 @@ pub fn verify_ec_shards(
for i in 0..total_shards {
if buffers[i] != verify_buffers[i] {
broken_shards.insert(i as u32);
details.push(format!("parity mismatch on shard {} at offset {}", i, offset));
details.push(format!(
"parity mismatch on shard {} at offset {}",
i, offset
));
}
}
}
@ -261,14 +271,17 @@ pub fn verify_ec_shards(
let mut broken_vec: Vec<u32> = broken_shards.into_iter().collect();
broken_vec.sort_unstable();
Ok((broken_vec, details))
}
/// Write sorted .ecx index from .idx file.
fn write_sorted_ecx_from_idx(idx_path: &str, ecx_path: &str) -> io::Result<()> {
if !std::path::Path::new(idx_path).exists() {
return Err(io::Error::new(io::ErrorKind::NotFound, "idx file not found"));
return Err(io::Error::new(
io::ErrorKind::NotFound,
"idx file not found",
));
}
// Read all idx entries
@ -317,7 +330,15 @@ fn encode_dat_file(
// Process all data in small blocks to avoid large memory allocations
while remaining > 0 {
let to_process = remaining.min(row_size as i64);
encode_one_batch(dat_file, offset, block_size, rs, shards, data_shards, parity_shards)?;
encode_one_batch(
dat_file,
offset,
block_size,
rs,
shards,
data_shards,
parity_shards,
)?;
offset += to_process as u64;
remaining -= to_process;
}
@ -339,7 +360,10 @@ fn encode_one_batch(
// Each batch allocates block_size * total_shards bytes.
// With large blocks (1 GiB) this is 14 GiB -- guard against OOM.
let total_alloc = block_size.checked_mul(total_shards).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "block_size * shard count overflows usize")
io::Error::new(
io::ErrorKind::InvalidInput,
"block_size * shard count overflows usize",
)
})?;
const MAX_BATCH_ALLOC: usize = 1024 * 1024 * 1024; // 1 GiB safety limit
if total_alloc > MAX_BATCH_ALLOC {
@ -353,9 +377,7 @@ fn encode_one_batch(
}
// Allocate buffers for all shards
let mut buffers: Vec<Vec<u8>> = (0..total_shards)
.map(|_| vec![0u8; block_size])
.collect();
let mut buffers: Vec<Vec<u8>> = (0..total_shards).map(|_| vec![0u8; block_size]).collect();
// Read data shards from .dat file
for i in 0..data_shards {
@ -377,7 +399,10 @@ fn encode_one_batch(
// Encode parity shards
rs.encode(&mut buffers).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("reed-solomon encode: {:?}", e))
io::Error::new(
io::ErrorKind::Other,
format!("reed-solomon encode: {:?}", e),
)
})?;
// Write all shard buffers to files
@ -403,9 +428,17 @@ mod tests {
// Create a volume with some data
let mut v = Volume::new(
dir, dir, "", VolumeId(1),
NeedleMapKind::InMemory, None, None, 0, Version::current(),
).unwrap();
dir,
dir,
"",
VolumeId(1),
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap();
for i in 1..=5 {
let data = format!("test data for needle {}", i);
@ -432,7 +465,8 @@ mod tests {
let path = format!("{}/{}.ec{:02}", dir, 1, i);
assert!(
std::path::Path::new(&path).exists(),
"shard file {} should exist", path
"shard file {} should exist",
path
);
}
@ -462,7 +496,8 @@ mod tests {
rs.encode(&mut shards).unwrap();
// Verify parity is non-zero (at least some)
let parity_nonzero: bool = shards[data_shards..].iter()
let parity_nonzero: bool = shards[data_shards..]
.iter()
.any(|s| s.iter().any(|&b| b != 0));
assert!(parity_nonzero);

29
seaweed-volume/src/storage/erasure_coding/ec_shard.rs

@ -40,7 +40,8 @@ impl EcVolumeShard {
/// Shard file name, e.g. "dir/collection_42.ec03"
pub fn file_name(&self) -> String {
let base = crate::storage::volume::volume_file_name(&self.dir, &self.collection, self.volume_id);
let base =
crate::storage::volume::volume_file_name(&self.dir, &self.collection, self.volume_id);
format!("{}.ec{:02}", base, self.shard_id)
}
@ -69,9 +70,10 @@ impl EcVolumeShard {
/// Read data at a specific offset.
pub fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
let file = self.ecd_file.as_ref().ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "shard file not open")
})?;
let file = self
.ecd_file
.as_ref()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "shard file not open"))?;
#[cfg(unix)]
{
@ -92,9 +94,10 @@ impl EcVolumeShard {
/// Write data to the shard file (appends).
pub fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
let file = self.ecd_file.as_mut().ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "shard file not open")
})?;
let file = self
.ecd_file
.as_mut()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "shard file not open"))?;
file.write_all(data)?;
self.ecd_file_size += data.len() as i64;
Ok(())
@ -125,20 +128,12 @@ pub struct ShardBits(pub u32);
impl ShardBits {
pub fn add_shard_id(&mut self, id: ShardId) {
assert!(
(id as usize) < 32,
"shard id {} out of bounds (max 31)",
id,
);
assert!((id as usize) < 32, "shard id {} out of bounds (max 31)", id,);
self.0 |= 1 << id;
}
pub fn remove_shard_id(&mut self, id: ShardId) {
assert!(
(id as usize) < 32,
"shard id {} out of bounds (max 31)",
id,
);
assert!((id as usize) < 32, "shard id {} out of bounds (max 31)", id,);
self.0 &= !(1 << id);
}

74
seaweed-volume/src/storage/erasure_coding/ec_volume.rs

@ -32,7 +32,9 @@ pub fn read_ec_shard_config(dir: &str, volume_id: VolumeId) -> (u32, u32) {
let mut parity_shards = crate::storage::erasure_coding::ec_shard::PARITY_SHARDS_COUNT as u32;
let vif_path = format!("{}/{}.vif", dir, volume_id.0);
if let Ok(vif_content) = std::fs::read_to_string(&vif_path) {
if let Ok(vif_info) = serde_json::from_str::<crate::storage::volume::VifVolumeInfo>(&vif_content) {
if let Ok(vif_info) =
serde_json::from_str::<crate::storage::volume::VifVolumeInfo>(&vif_content)
{
if let Some(ec) = vif_info.ec_shard_config {
if ec.data_shards > 0 && ec.parity_shards > 0 {
data_shards = ec.data_shards;
@ -161,9 +163,10 @@ impl EcVolume {
/// Find a needle's offset and size in the sorted .ecx index via binary search.
pub fn find_needle_from_ecx(&self, needle_id: NeedleId) -> io::Result<Option<(Offset, Size)>> {
let ecx_file = self.ecx_file.as_ref().ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "ecx file not open")
})?;
let ecx_file = self
.ecx_file
.as_ref()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "ecx file not open"))?;
let entry_count = self.ecx_file_size as usize / NEEDLE_MAP_ENTRY_SIZE;
if entry_count == 0 {
@ -237,9 +240,10 @@ impl EcVolume {
/// Append a deleted needle ID to the .ecj journal.
pub fn journal_delete(&mut self, needle_id: NeedleId) -> io::Result<()> {
let ecj_file = self.ecj_file.as_mut().ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "ecj file not open")
})?;
let ecj_file = self
.ecj_file
.as_mut()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "ecj file not open"))?;
let mut buf = [0u8; NEEDLE_ID_SIZE];
needle_id.to_bytes(&mut buf);
@ -248,6 +252,52 @@ impl EcVolume {
Ok(())
}
/// Append a deleted needle ID to the .ecj journal, validating the cookie first.
/// Matches Go's DeleteEcShardNeedle which validates cookie before journaling.
/// A cookie of 0 means skip cookie check (e.g., orphan cleanup).
pub fn journal_delete_with_cookie(
&mut self,
needle_id: NeedleId,
cookie: crate::storage::types::Cookie,
) -> io::Result<()> {
// cookie == 0 indicates SkipCookieCheck was requested
if cookie.0 != 0 {
// Try to read the needle's cookie from the EC shards to validate
// Look up the needle in ecx index to find its offset, then read header from shard
if let Ok(Some((offset, size))) = self.find_needle_from_ecx(needle_id) {
if !size.is_deleted() && !offset.is_zero() {
let actual_offset = offset.to_actual_offset() as u64;
// Determine which shard contains this offset and read the cookie
let shard_size = self
.shards
.iter()
.filter_map(|s| s.as_ref())
.map(|s| s.file_size())
.next()
.unwrap_or(0) as u64;
if shard_size > 0 {
let shard_id = (actual_offset / shard_size) as usize;
let shard_offset = actual_offset % shard_size;
if let Some(Some(shard)) = self.shards.get(shard_id) {
let mut header_buf = [0u8; 4]; // cookie is first 4 bytes of needle
if shard.read_at(&mut header_buf, shard_offset).is_ok() {
let needle_cookie =
crate::storage::types::Cookie(u32::from_be_bytes(header_buf));
if needle_cookie != cookie {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("unexpected cookie {:x}", cookie.0),
));
}
}
}
}
}
}
}
self.journal_delete(needle_id)
}
/// Read all deleted needle IDs from the .ecj journal.
pub fn read_deleted_needles(&self) -> io::Result<Vec<NeedleId>> {
let ecj_path = self.ecj_file_name();
@ -299,7 +349,12 @@ mod tests {
use crate::storage::idx;
use tempfile::TempDir;
fn write_ecx_file(dir: &str, collection: &str, vid: VolumeId, entries: &[(NeedleId, Offset, Size)]) {
fn write_ecx_file(
dir: &str,
collection: &str,
vid: VolumeId,
entries: &[(NeedleId, Offset, Size)],
) {
let base = crate::storage::volume::volume_file_name(dir, collection, vid);
let ecx_path = format!("{}.ecx", base);
let mut file = File::create(&ecx_path).unwrap();
@ -371,7 +426,8 @@ mod tests {
shard.write_all(&[0u8; 100]).unwrap();
shard.close();
vol.add_shard(EcVolumeShard::new(dir, "", VolumeId(1), 3)).unwrap();
vol.add_shard(EcVolumeShard::new(dir, "", VolumeId(1), 3))
.unwrap();
assert_eq!(vol.shard_count(), 1);
assert!(vol.shard_bits().has_shard_id(3));
}

10
seaweed-volume/src/storage/erasure_coding/mod.rs

@ -3,11 +3,13 @@
//! Encodes a volume's .dat file into 10 data + 4 parity shards using
//! Reed-Solomon erasure coding. Can reconstruct from any 10 of 14 shards.
pub mod ec_shard;
pub mod ec_volume;
pub mod ec_encoder;
pub mod ec_decoder;
pub mod ec_encoder;
pub mod ec_locate;
pub mod ec_shard;
pub mod ec_volume;
pub use ec_shard::{ShardId, EcVolumeShard, TOTAL_SHARDS_COUNT, DATA_SHARDS_COUNT, PARITY_SHARDS_COUNT};
pub use ec_shard::{
EcVolumeShard, ShardId, DATA_SHARDS_COUNT, PARITY_SHARDS_COUNT, TOTAL_SHARDS_COUNT,
};
pub use ec_volume::EcVolume;

24
seaweed-volume/src/storage/idx/mod.rs

@ -37,7 +37,12 @@ where
}
/// Write a single index entry to a writer.
pub fn write_index_entry<W: io::Write>(writer: &mut W, key: NeedleId, offset: Offset, size: Size) -> io::Result<()> {
pub fn write_index_entry<W: io::Write>(
writer: &mut W,
key: NeedleId,
offset: Offset,
size: Size,
) -> io::Result<()> {
let mut buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
idx_entry_to_bytes(&mut buf, key, offset, size);
writer.write_all(&buf)
@ -68,7 +73,8 @@ mod tests {
walk_index_file(&mut cursor, 0, |key, offset, size| {
collected.push((key, offset.to_actual_offset(), size));
Ok(())
}).unwrap();
})
.unwrap();
assert_eq!(collected.len(), 3);
assert_eq!(collected[0].0, NeedleId(1));
@ -82,14 +88,24 @@ mod tests {
fn test_walk_empty() {
let mut cursor = Cursor::new(Vec::new());
let mut count = 0;
walk_index_file(&mut cursor, 0, |_, _, _| { count += 1; Ok(()) }).unwrap();
walk_index_file(&mut cursor, 0, |_, _, _| {
count += 1;
Ok(())
})
.unwrap();
assert_eq!(count, 0);
}
#[test]
fn test_write_index_entry() {
let mut buf = Vec::new();
write_index_entry(&mut buf, NeedleId(42), Offset::from_actual_offset(8 * 10), Size(512)).unwrap();
write_index_entry(
&mut buf,
NeedleId(42),
Offset::from_actual_offset(8 * 10),
Size(512),
)
.unwrap();
assert_eq!(buf.len(), NEEDLE_MAP_ENTRY_SIZE);
let (key, offset, size) = idx_entry_from_bytes(&buf);

12
seaweed-volume/src/storage/mod.rs

@ -1,9 +1,9 @@
pub mod types;
pub mod needle;
pub mod super_block;
pub mod disk_location;
pub mod erasure_coding;
pub mod idx;
pub mod needle;
pub mod needle_map;
pub mod volume;
pub mod disk_location;
pub mod store;
pub mod erasure_coding;
pub mod super_block;
pub mod types;
pub mod volume;

209
seaweed-volume/src/storage/needle/needle.rs

@ -39,11 +39,11 @@ pub struct Needle {
pub data: Vec<u8>,
pub flags: u8,
pub name_size: u8,
pub name: Vec<u8>, // max 255 bytes
pub name: Vec<u8>, // max 255 bytes
pub mime_size: u8,
pub mime: Vec<u8>, // max 255 bytes
pub mime: Vec<u8>, // max 255 bytes
pub pairs_size: u16,
pub pairs: Vec<u8>, // max 64KB, JSON
pub pairs: Vec<u8>, // max 64KB, JSON
pub last_modified: u64, // stored as 5 bytes on disk
pub ttl: Option<TTL>,
@ -56,26 +56,54 @@ pub struct Needle {
impl Needle {
// ---- Flag accessors (matching Go) ----
pub fn is_compressed(&self) -> bool { self.flags & FLAG_IS_COMPRESSED != 0 }
pub fn set_is_compressed(&mut self) { self.flags |= FLAG_IS_COMPRESSED; }
pub fn is_compressed(&self) -> bool {
self.flags & FLAG_IS_COMPRESSED != 0
}
pub fn set_is_compressed(&mut self) {
self.flags |= FLAG_IS_COMPRESSED;
}
pub fn has_name(&self) -> bool { self.flags & FLAG_HAS_NAME != 0 }
pub fn set_has_name(&mut self) { self.flags |= FLAG_HAS_NAME; }
pub fn has_name(&self) -> bool {
self.flags & FLAG_HAS_NAME != 0
}
pub fn set_has_name(&mut self) {
self.flags |= FLAG_HAS_NAME;
}
pub fn has_mime(&self) -> bool { self.flags & FLAG_HAS_MIME != 0 }
pub fn set_has_mime(&mut self) { self.flags |= FLAG_HAS_MIME; }
pub fn has_mime(&self) -> bool {
self.flags & FLAG_HAS_MIME != 0
}
pub fn set_has_mime(&mut self) {
self.flags |= FLAG_HAS_MIME;
}
pub fn has_last_modified_date(&self) -> bool { self.flags & FLAG_HAS_LAST_MODIFIED_DATE != 0 }
pub fn set_has_last_modified_date(&mut self) { self.flags |= FLAG_HAS_LAST_MODIFIED_DATE; }
pub fn has_last_modified_date(&self) -> bool {
self.flags & FLAG_HAS_LAST_MODIFIED_DATE != 0
}
pub fn set_has_last_modified_date(&mut self) {
self.flags |= FLAG_HAS_LAST_MODIFIED_DATE;
}
pub fn has_ttl(&self) -> bool { self.flags & FLAG_HAS_TTL != 0 }
pub fn set_has_ttl(&mut self) { self.flags |= FLAG_HAS_TTL; }
pub fn has_ttl(&self) -> bool {
self.flags & FLAG_HAS_TTL != 0
}
pub fn set_has_ttl(&mut self) {
self.flags |= FLAG_HAS_TTL;
}
pub fn has_pairs(&self) -> bool { self.flags & FLAG_HAS_PAIRS != 0 }
pub fn set_has_pairs(&mut self) { self.flags |= FLAG_HAS_PAIRS; }
pub fn has_pairs(&self) -> bool {
self.flags & FLAG_HAS_PAIRS != 0
}
pub fn set_has_pairs(&mut self) {
self.flags |= FLAG_HAS_PAIRS;
}
pub fn is_chunk_manifest(&self) -> bool { self.flags & FLAG_IS_CHUNK_MANIFEST != 0 }
pub fn set_is_chunk_manifest(&mut self) { self.flags |= FLAG_IS_CHUNK_MANIFEST; }
pub fn is_chunk_manifest(&self) -> bool {
self.flags & FLAG_IS_CHUNK_MANIFEST != 0
}
pub fn set_is_chunk_manifest(&mut self) {
self.flags |= FLAG_IS_CHUNK_MANIFEST;
}
// ---- Header parsing ----
@ -108,7 +136,12 @@ impl Needle {
if index + 4 > len_bytes {
return Err(NeedleError::IndexOutOfRange(1));
}
self.data_size = u32::from_be_bytes([bytes[index], bytes[index + 1], bytes[index + 2], bytes[index + 3]]);
self.data_size = u32::from_be_bytes([
bytes[index],
bytes[index + 1],
bytes[index + 2],
bytes[index + 3],
]);
index += 4;
// Skip data bytes (do NOT copy them)
@ -124,7 +157,13 @@ impl Needle {
/// Read full needle from bytes but skip copying the data payload.
/// Sets all metadata fields, checksum, etc. but leaves `data` empty.
pub fn read_bytes_meta_only(&mut self, bytes: &[u8], offset: i64, expected_size: Size, version: Version) -> Result<(), NeedleError> {
pub fn read_bytes_meta_only(
&mut self,
bytes: &[u8],
offset: i64,
expected_size: Size,
version: Version,
) -> Result<(), NeedleError> {
self.read_header(bytes);
if self.size != expected_size {
@ -152,13 +191,20 @@ impl Needle {
}
/// Read tail without CRC validation (used when data was not read).
fn read_tail_meta_only(&mut self, tail_bytes: &[u8], version: Version) -> Result<(), NeedleError> {
fn read_tail_meta_only(
&mut self,
tail_bytes: &[u8],
version: Version,
) -> Result<(), NeedleError> {
if tail_bytes.len() < NEEDLE_CHECKSUM_SIZE {
return Err(NeedleError::TailTooShort);
}
self.checksum = CRC(u32::from_be_bytes([
tail_bytes[0], tail_bytes[1], tail_bytes[2], tail_bytes[3],
tail_bytes[0],
tail_bytes[1],
tail_bytes[2],
tail_bytes[3],
]));
if version == VERSION_3 {
@ -190,7 +236,12 @@ impl Needle {
if index + 4 > len_bytes {
return Err(NeedleError::IndexOutOfRange(1));
}
self.data_size = u32::from_be_bytes([bytes[index], bytes[index + 1], bytes[index + 2], bytes[index + 3]]);
self.data_size = u32::from_be_bytes([
bytes[index],
bytes[index + 1],
bytes[index + 2],
bytes[index + 3],
]);
index += 4;
// Data
@ -282,7 +333,10 @@ impl Needle {
}
let expected_checksum = CRC(u32::from_be_bytes([
tail_bytes[0], tail_bytes[1], tail_bytes[2], tail_bytes[3],
tail_bytes[0],
tail_bytes[1],
tail_bytes[2],
tail_bytes[3],
]));
if !self.data.is_empty() {
@ -322,7 +376,13 @@ impl Needle {
// ---- Full read from bytes ----
/// Read a complete needle from its raw bytes (header + body + tail).
pub fn read_bytes(&mut self, bytes: &[u8], offset: i64, expected_size: Size, version: Version) -> Result<(), NeedleError> {
pub fn read_bytes(
&mut self,
bytes: &[u8],
offset: i64,
expected_size: Size,
version: Version,
) -> Result<(), NeedleError> {
self.read_header(bytes);
if self.size != expected_size {
@ -354,19 +414,33 @@ impl Needle {
let mut buf = Vec::with_capacity(256);
// Compute sizes and flags
if self.name_size >= 255 { self.name_size = 255; }
if self.name.len() < self.name_size as usize { self.name_size = self.name.len() as u8; }
if self.name_size >= 255 {
self.name_size = 255;
}
if self.name.len() < self.name_size as usize {
self.name_size = self.name.len() as u8;
}
self.data_size = self.data.len() as u32;
self.mime_size = self.mime.len() as u8;
// Compute n.Size (body size, excluding header)
if self.data_size > 0 {
let mut s: i32 = 4 + self.data_size as i32 + 1; // DataSize + Data + Flags
if self.has_name() { s += 1 + self.name_size as i32; }
if self.has_mime() { s += 1 + self.mime_size as i32; }
if self.has_last_modified_date() { s += LAST_MODIFIED_BYTES_LENGTH as i32; }
if self.has_ttl() { s += TTL_BYTES_LENGTH as i32; }
if self.has_pairs() { s += 2 + self.pairs_size as i32; }
if self.has_name() {
s += 1 + self.name_size as i32;
}
if self.has_mime() {
s += 1 + self.mime_size as i32;
}
if self.has_last_modified_date() {
s += LAST_MODIFIED_BYTES_LENGTH as i32;
}
if self.has_ttl() {
s += TTL_BYTES_LENGTH as i32;
}
if self.has_pairs() {
s += 2 + self.pairs_size as i32;
}
self.size = Size(s);
} else {
self.size = Size(0);
@ -375,8 +449,10 @@ impl Needle {
// Header: Cookie(4) + NeedleId(8) + Size(4) = 16 bytes
let mut header = [0u8; NEEDLE_HEADER_SIZE];
self.cookie.to_bytes(&mut header[0..COOKIE_SIZE]);
self.id.to_bytes(&mut header[COOKIE_SIZE..COOKIE_SIZE + NEEDLE_ID_SIZE]);
self.size.to_bytes(&mut header[COOKIE_SIZE + NEEDLE_ID_SIZE..NEEDLE_HEADER_SIZE]);
self.id
.to_bytes(&mut header[COOKIE_SIZE..COOKIE_SIZE + NEEDLE_ID_SIZE]);
self.size
.to_bytes(&mut header[COOKIE_SIZE + NEEDLE_ID_SIZE..NEEDLE_HEADER_SIZE]);
buf.extend_from_slice(&header);
// Body
@ -446,18 +522,34 @@ impl Needle {
/// Compute padding to align needle to NEEDLE_PADDING_SIZE (8 bytes).
pub fn padding_length(needle_size: Size, version: Version) -> Size {
if version == VERSION_3 {
Size(NEEDLE_PADDING_SIZE as i32 - ((NEEDLE_HEADER_SIZE as i32 + needle_size.0 + NEEDLE_CHECKSUM_SIZE as i32 + TIMESTAMP_SIZE as i32) % NEEDLE_PADDING_SIZE as i32))
Size(
NEEDLE_PADDING_SIZE as i32
- ((NEEDLE_HEADER_SIZE as i32
+ needle_size.0
+ NEEDLE_CHECKSUM_SIZE as i32
+ TIMESTAMP_SIZE as i32)
% NEEDLE_PADDING_SIZE as i32),
)
} else {
Size(NEEDLE_PADDING_SIZE as i32 - ((NEEDLE_HEADER_SIZE as i32 + needle_size.0 + NEEDLE_CHECKSUM_SIZE as i32) % NEEDLE_PADDING_SIZE as i32))
Size(
NEEDLE_PADDING_SIZE as i32
- ((NEEDLE_HEADER_SIZE as i32 + needle_size.0 + NEEDLE_CHECKSUM_SIZE as i32)
% NEEDLE_PADDING_SIZE as i32),
)
}
}
/// Body length = Size + Checksum + [Timestamp] + Padding.
pub fn needle_body_length(needle_size: Size, version: Version) -> i64 {
if version == VERSION_3 {
needle_size.0 as i64 + NEEDLE_CHECKSUM_SIZE as i64 + TIMESTAMP_SIZE as i64 + padding_length(needle_size, version).0 as i64
needle_size.0 as i64
+ NEEDLE_CHECKSUM_SIZE as i64
+ TIMESTAMP_SIZE as i64
+ padding_length(needle_size, version).0 as i64
} else {
needle_size.0 as i64 + NEEDLE_CHECKSUM_SIZE as i64 + padding_length(needle_size, version).0 as i64
needle_size.0 as i64
+ NEEDLE_CHECKSUM_SIZE as i64
+ padding_length(needle_size, version).0 as i64
}
}
@ -480,7 +572,10 @@ fn bytes_to_u64_5(bytes: &[u8]) -> u64 {
/// ETag formatted as Go: hex of big-endian u32 bytes.
pub fn etag_from_checksum(checksum: u32) -> String {
let bits = checksum.to_be_bytes();
format!("{:02x}{:02x}{:02x}{:02x}", bits[0], bits[1], bits[2], bits[3])
format!(
"{:02x}{:02x}{:02x}{:02x}",
bits[0], bits[1], bits[2], bits[3]
)
}
// ============================================================================
@ -498,7 +593,11 @@ pub struct FileId {
impl FileId {
pub fn new(volume_id: VolumeId, key: NeedleId, cookie: Cookie) -> Self {
FileId { volume_id, key, cookie }
FileId {
volume_id,
key,
cookie,
}
}
/// Parse "volume_id,needle_id_cookie" or "volume_id/needle_id_cookie".
@ -511,9 +610,14 @@ impl FileId {
return Err(format!("invalid file id: {}", s));
};
let volume_id = VolumeId::parse(vid_str).map_err(|e| format!("invalid volume id: {}", e))?;
let volume_id =
VolumeId::parse(vid_str).map_err(|e| format!("invalid volume id: {}", e))?;
let (key, cookie) = parse_needle_id_cookie(rest)?;
Ok(FileId { volume_id, key, cookie })
Ok(FileId {
volume_id,
key,
cookie,
})
}
/// Format the needle_id + cookie part as a hex string (stripping leading zeros).
@ -536,7 +640,10 @@ fn format_needle_id_cookie(key: NeedleId, cookie: Cookie) -> String {
cookie.to_bytes(&mut bytes[8..12]);
// Strip leading zero bytes
let first_nonzero = bytes.iter().position(|&b| b != 0).unwrap_or(bytes.len() - 1);
let first_nonzero = bytes
.iter()
.position(|&b| b != 0)
.unwrap_or(bytes.len() - 1);
hex::encode(&bytes[first_nonzero..])
}
@ -564,10 +671,19 @@ pub fn parse_needle_id_cookie(s: &str) -> Result<(NeedleId, Cookie), String> {
#[derive(Debug, thiserror::Error)]
pub enum NeedleError {
#[error("size mismatch at offset {offset}: found id={id} size={found:?}, expected size={expected:?}")]
SizeMismatch { offset: i64, id: NeedleId, found: Size, expected: Size },
SizeMismatch {
offset: i64,
id: NeedleId,
found: Size,
expected: Size,
},
#[error("CRC mismatch for needle {needle_id}: got {got:08x}, want {want:08x}")]
CrcMismatch { needle_id: NeedleId, got: u32, want: u32 },
CrcMismatch {
needle_id: NeedleId,
got: u32,
want: u32,
},
#[error("index out of range ({0})")]
IndexOutOfRange(u32),
@ -622,7 +738,10 @@ mod tests {
n.set_has_last_modified_date();
n.last_modified = 1234567890;
n.set_has_ttl();
n.ttl = Some(TTL { count: 5, unit: super::super::ttl::TTL_UNIT_DAY });
n.ttl = Some(TTL {
count: 5,
unit: super::super::ttl::TTL_UNIT_DAY,
});
n.append_at_ns = 999999999;
let bytes = n.write_bytes(VERSION_3);

128
seaweed-volume/src/storage/needle/ttl.rs

@ -78,7 +78,9 @@ impl TTL {
return Ok(TTL::EMPTY);
}
let (num_str, unit_char) = s.split_at(s.len() - 1);
let count: u32 = num_str.parse().map_err(|e| format!("invalid TTL count: {}", e))?;
let count: u32 = num_str
.parse()
.map_err(|e| format!("invalid TTL count: {}", e))?;
let unit = match unit_char {
"m" => TTL_UNIT_MINUTE,
"h" => TTL_UNIT_HOUR,
@ -113,33 +115,96 @@ fn unit_to_seconds(count: u64, unit: u8) -> u64 {
/// Fit a count into a single byte, converting to larger unit if needed.
fn fit_ttl_count(count: u32, unit: u8) -> TTL {
if count <= 255 {
return TTL { count: count as u8, unit };
return TTL {
count: count as u8,
unit,
};
}
// Try next larger unit
match unit {
TTL_UNIT_MINUTE => {
if count / 60 <= 255 { return TTL { count: (count / 60) as u8, unit: TTL_UNIT_HOUR }; }
if count / (60 * 24) <= 255 { return TTL { count: (count / (60 * 24)) as u8, unit: TTL_UNIT_DAY }; }
TTL { count: 255, unit: TTL_UNIT_DAY }
if count / 60 <= 255 {
return TTL {
count: (count / 60) as u8,
unit: TTL_UNIT_HOUR,
};
}
if count / (60 * 24) <= 255 {
return TTL {
count: (count / (60 * 24)) as u8,
unit: TTL_UNIT_DAY,
};
}
TTL {
count: 255,
unit: TTL_UNIT_DAY,
}
}
TTL_UNIT_HOUR => {
if count / 24 <= 255 { return TTL { count: (count / 24) as u8, unit: TTL_UNIT_DAY }; }
TTL { count: 255, unit: TTL_UNIT_DAY }
if count / 24 <= 255 {
return TTL {
count: (count / 24) as u8,
unit: TTL_UNIT_DAY,
};
}
TTL {
count: 255,
unit: TTL_UNIT_DAY,
}
}
TTL_UNIT_DAY => {
if count / 7 <= 255 { return TTL { count: (count / 7) as u8, unit: TTL_UNIT_WEEK }; }
if count / 30 <= 255 { return TTL { count: (count / 30) as u8, unit: TTL_UNIT_MONTH }; }
if count / 365 <= 255 { return TTL { count: (count / 365) as u8, unit: TTL_UNIT_YEAR }; }
TTL { count: 255, unit: TTL_UNIT_YEAR }
if count / 7 <= 255 {
return TTL {
count: (count / 7) as u8,
unit: TTL_UNIT_WEEK,
};
}
if count / 30 <= 255 {
return TTL {
count: (count / 30) as u8,
unit: TTL_UNIT_MONTH,
};
}
if count / 365 <= 255 {
return TTL {
count: (count / 365) as u8,
unit: TTL_UNIT_YEAR,
};
}
TTL {
count: 255,
unit: TTL_UNIT_YEAR,
}
}
TTL_UNIT_WEEK => {
if count * 7 / 30 <= 255 { return TTL { count: (count * 7 / 30) as u8, unit: TTL_UNIT_MONTH }; }
if count * 7 / 365 <= 255 { return TTL { count: (count * 7 / 365) as u8, unit: TTL_UNIT_YEAR }; }
TTL { count: 255, unit: TTL_UNIT_YEAR }
if count * 7 / 30 <= 255 {
return TTL {
count: (count * 7 / 30) as u8,
unit: TTL_UNIT_MONTH,
};
}
if count * 7 / 365 <= 255 {
return TTL {
count: (count * 7 / 365) as u8,
unit: TTL_UNIT_YEAR,
};
}
TTL {
count: 255,
unit: TTL_UNIT_YEAR,
}
}
TTL_UNIT_MONTH => {
if count / 12 <= 255 { return TTL { count: (count / 12) as u8, unit: TTL_UNIT_YEAR }; }
TTL { count: 255, unit: TTL_UNIT_YEAR }
if count / 12 <= 255 {
return TTL {
count: (count / 12) as u8,
unit: TTL_UNIT_YEAR,
};
}
TTL {
count: 255,
unit: TTL_UNIT_YEAR,
}
}
_ => TTL { count: 255, unit },
}
@ -173,7 +238,13 @@ mod tests {
#[test]
fn test_ttl_parse() {
let ttl = TTL::read("3m").unwrap();
assert_eq!(ttl, TTL { count: 3, unit: TTL_UNIT_MINUTE });
assert_eq!(
ttl,
TTL {
count: 3,
unit: TTL_UNIT_MINUTE
}
);
assert_eq!(ttl.to_seconds(), 180);
}
@ -185,13 +256,19 @@ mod tests {
#[test]
fn test_ttl_display() {
let ttl = TTL { count: 5, unit: TTL_UNIT_DAY };
let ttl = TTL {
count: 5,
unit: TTL_UNIT_DAY,
};
assert_eq!(ttl.to_string(), "5d");
}
#[test]
fn test_ttl_bytes_round_trip() {
let ttl = TTL { count: 10, unit: TTL_UNIT_WEEK };
let ttl = TTL {
count: 10,
unit: TTL_UNIT_WEEK,
};
let mut buf = [0u8; 2];
ttl.to_bytes(&mut buf);
let ttl2 = TTL::from_bytes(&buf);
@ -200,7 +277,10 @@ mod tests {
#[test]
fn test_ttl_u32_round_trip() {
let ttl = TTL { count: 42, unit: TTL_UNIT_HOUR };
let ttl = TTL {
count: 42,
unit: TTL_UNIT_HOUR,
};
let v = ttl.to_u32();
let ttl2 = TTL::from_u32(v);
assert_eq!(ttl, ttl2);
@ -217,6 +297,12 @@ mod tests {
fn test_ttl_overflow_fit() {
// 300 minutes should fit into 5 hours
let ttl = TTL::read("300m").unwrap();
assert_eq!(ttl, TTL { count: 5, unit: TTL_UNIT_HOUR });
assert_eq!(
ttl,
TTL {
count: 5,
unit: TTL_UNIT_HOUR
}
);
}
}

281
seaweed-volume/src/storage/needle_map.rs

@ -64,12 +64,14 @@ impl NeedleMapMetric {
self.maybe_set_max_file_key(key);
// Go: always LogFileCounter(newSize) which does FileCounter++ and FileByteCounter += newSize
self.file_count.fetch_add(1, Ordering::Relaxed);
self.file_byte_count.fetch_add(new_size.0 as u64, Ordering::Relaxed);
self.file_byte_count
.fetch_add(new_size.0 as u64, Ordering::Relaxed);
// Go: if oldSize > 0 && oldSize.IsValid() { LogDeletionCounter(oldSize) }
if let Some(old_val) = old {
if old_val.size.0 > 0 && old_val.size.is_valid() {
self.deletion_count.fetch_add(1, Ordering::Relaxed);
self.deletion_byte_count.fetch_add(old_val.size.0 as u64, Ordering::Relaxed);
self.deletion_byte_count
.fetch_add(old_val.size.0 as u64, Ordering::Relaxed);
}
}
}
@ -78,7 +80,8 @@ impl NeedleMapMetric {
fn on_delete(&self, old: &NeedleValue) {
if old.size.0 > 0 {
self.deletion_count.fetch_add(1, Ordering::Relaxed);
self.deletion_byte_count.fetch_add(old.size.0 as u64, Ordering::Relaxed);
self.deletion_byte_count
.fetch_add(old.size.0 as u64, Ordering::Relaxed);
}
}
@ -89,7 +92,11 @@ impl NeedleMapMetric {
if key_val <= current {
break;
}
if self.max_file_key.compare_exchange(current, key_val, Ordering::Relaxed, Ordering::Relaxed).is_ok() {
if self
.max_file_key
.compare_exchange(current, key_val, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break;
}
}
@ -202,7 +209,13 @@ impl CompactNeedleMap {
self.metric.on_delete(&old);
let deleted_size = Size(-(old.size.0));
// Keep original offset so readDeleted can find original data (matching Go behavior)
self.map.insert(key, NeedleValue { offset: old.offset, size: deleted_size });
self.map.insert(
key,
NeedleValue {
offset: old.offset,
size: deleted_size,
},
);
return Ok(Some(old.size));
}
}
@ -276,7 +289,9 @@ impl CompactNeedleMap {
/// Save the in-memory map to an index file, sorted by needle ID ascending.
pub fn save_to_idx(&self, path: &str) -> io::Result<()> {
let mut entries: Vec<_> = self.map.iter()
let mut entries: Vec<_> = self
.map
.iter()
.filter(|(_, nv)| nv.size.is_valid())
.collect();
entries.sort_by_key(|(id, _)| **id);
@ -341,9 +356,8 @@ impl RedbNeedleMap {
io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e))
})?;
}
txn.commit().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e))
})?;
txn.commit()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e)))?;
Ok(RedbNeedleMap {
db,
@ -393,9 +407,8 @@ impl RedbNeedleMap {
}
}
}
txn.commit().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e))
})?;
txn.commit()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e)))?;
Ok(nm)
}
@ -430,13 +443,12 @@ impl RedbNeedleMap {
let mut table = txn.open_table(NEEDLE_TABLE).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e))
})?;
table.insert(key_u64, packed.as_slice()).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb insert: {}", e))
})?;
table
.insert(key_u64, packed.as_slice())
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb insert: {}", e)))?;
}
txn.commit().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e))
})?;
txn.commit()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e)))?;
self.metric.on_put(key, old.as_ref(), size);
Ok(())
@ -450,12 +462,13 @@ impl RedbNeedleMap {
/// Internal get that returns io::Result for error propagation.
fn get_internal(&self, key_u64: u64) -> io::Result<Option<NeedleValue>> {
let txn = self.db.begin_read().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb begin_read: {}", e))
})?;
let table = txn.open_table(NEEDLE_TABLE).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e))
})?;
let txn = self
.db
.begin_read()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb begin_read: {}", e)))?;
let table = txn
.open_table(NEEDLE_TABLE)
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e)))?;
match table.get(key_u64) {
Ok(Some(guard)) => {
let bytes: &[u8] = guard.value();
@ -468,7 +481,10 @@ impl RedbNeedleMap {
}
}
Ok(None) => Ok(None),
Err(e) => Err(io::Error::new(io::ErrorKind::Other, format!("redb get: {}", e))),
Err(e) => Err(io::Error::new(
io::ErrorKind::Other,
format!("redb get: {}", e),
)),
}
}
@ -487,7 +503,10 @@ impl RedbNeedleMap {
self.metric.on_delete(&old);
let deleted_size = Size(-(old.size.0));
// Keep original offset so readDeleted can find original data (matching Go behavior)
let deleted_nv = NeedleValue { offset: old.offset, size: deleted_size };
let deleted_nv = NeedleValue {
offset: old.offset,
size: deleted_size,
};
let packed = pack_needle_value(&deleted_nv);
let txn = self.db.begin_write().map_err(|e| {
@ -553,12 +572,13 @@ impl RedbNeedleMap {
/// Save the redb contents to an index file, sorted by needle ID ascending.
pub fn save_to_idx(&self, path: &str) -> io::Result<()> {
let txn = self.db.begin_read().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb begin_read: {}", e))
})?;
let table = txn.open_table(NEEDLE_TABLE).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e))
})?;
let txn = self
.db
.begin_read()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb begin_read: {}", e)))?;
let table = txn
.open_table(NEEDLE_TABLE)
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e)))?;
let mut file = std::fs::OpenOptions::new()
.write(true)
@ -567,9 +587,9 @@ impl RedbNeedleMap {
.open(path)?;
// redb iterates in key order (u64 ascending)
let iter = table.iter().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb iter: {}", e))
})?;
let iter = table
.iter()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb iter: {}", e)))?;
for entry in iter {
let (key_guard, val_guard) = entry.map_err(|e| {
@ -595,8 +615,13 @@ impl RedbNeedleMap {
where
F: FnMut(NeedleId, &NeedleValue) -> Result<(), String>,
{
let txn = self.db.begin_read().map_err(|e| format!("redb begin_read: {}", e))?;
let table = txn.open_table(NEEDLE_TABLE).map_err(|e| format!("redb open_table: {}", e))?;
let txn = self
.db
.begin_read()
.map_err(|e| format!("redb begin_read: {}", e))?;
let table = txn
.open_table(NEEDLE_TABLE)
.map_err(|e| format!("redb open_table: {}", e))?;
let iter = table.iter().map_err(|e| format!("redb iter: {}", e))?;
for entry in iter {
@ -792,8 +817,10 @@ mod tests {
#[test]
fn test_needle_map_put_get() {
let mut nm = CompactNeedleMap::new();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100))
.unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200))
.unwrap();
let v1 = nm.get(NeedleId(1)).unwrap();
assert_eq!(v1.size, Size(100));
@ -807,12 +834,15 @@ mod tests {
#[test]
fn test_needle_map_delete() {
let mut nm = CompactNeedleMap::new();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100))
.unwrap();
assert_eq!(nm.file_count(), 1);
assert_eq!(nm.content_size(), 100);
let deleted = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
let deleted = nm
.delete(NeedleId(1), Offset::from_actual_offset(0))
.unwrap();
assert_eq!(deleted, Some(Size(100)));
// Additive-only: file_count stays at 1 after delete
@ -824,21 +854,26 @@ mod tests {
#[test]
fn test_needle_map_metrics() {
let mut nm = CompactNeedleMap::new();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap();
nm.put(NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100))
.unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200))
.unwrap();
nm.put(NeedleId(3), Offset::from_actual_offset(384), Size(300))
.unwrap();
assert_eq!(nm.file_count(), 3);
assert_eq!(nm.content_size(), 600);
assert_eq!(nm.max_file_key(), NeedleId(3));
// Update existing — additive-only: file_count increments, content_size adds
nm.put(NeedleId(2), Offset::from_actual_offset(700), Size(250)).unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(700), Size(250))
.unwrap();
assert_eq!(nm.file_count(), 4); // 3 + 1 (always increments)
assert_eq!(nm.content_size(), 850); // 600 + 250 (always adds)
// Delete — additive-only: file_count unchanged
nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
nm.delete(NeedleId(1), Offset::from_actual_offset(0))
.unwrap();
assert_eq!(nm.file_count(), 4); // unchanged
assert_eq!(nm.deleted_count(), 2); // 1 from overwrite + 1 from delete
}
@ -848,11 +883,35 @@ mod tests {
// Build an idx file in memory
// Note: offset 0 is reserved for the SuperBlock, so real needles start at offset >= 8
let mut idx_data = Vec::new();
idx::write_index_entry(&mut idx_data, NeedleId(1), Offset::from_actual_offset(8), Size(100)).unwrap();
idx::write_index_entry(&mut idx_data, NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap();
idx::write_index_entry(&mut idx_data, NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap();
idx::write_index_entry(
&mut idx_data,
NeedleId(1),
Offset::from_actual_offset(8),
Size(100),
)
.unwrap();
idx::write_index_entry(
&mut idx_data,
NeedleId(2),
Offset::from_actual_offset(128),
Size(200),
)
.unwrap();
idx::write_index_entry(
&mut idx_data,
NeedleId(3),
Offset::from_actual_offset(384),
Size(300),
)
.unwrap();
// Delete needle 2
idx::write_index_entry(&mut idx_data, NeedleId(2), Offset::default(), TOMBSTONE_FILE_SIZE).unwrap();
idx::write_index_entry(
&mut idx_data,
NeedleId(2),
Offset::default(),
TOMBSTONE_FILE_SIZE,
)
.unwrap();
let mut cursor = Cursor::new(idx_data);
let nm = CompactNeedleMap::load_from_idx(&mut cursor).unwrap();
@ -867,13 +926,18 @@ mod tests {
#[test]
fn test_needle_map_double_delete() {
let mut nm = CompactNeedleMap::new();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100))
.unwrap();
let r1 = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
let r1 = nm
.delete(NeedleId(1), Offset::from_actual_offset(0))
.unwrap();
assert_eq!(r1, Some(Size(100)));
// Second delete should return None (already deleted)
let r2 = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
let r2 = nm
.delete(NeedleId(1), Offset::from_actual_offset(0))
.unwrap();
assert_eq!(r2, None);
assert_eq!(nm.deleted_count(), 1); // not double counted
}
@ -886,8 +950,10 @@ mod tests {
let db_path = dir.path().join("test.rdb");
let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100))
.unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200))
.unwrap();
let v1 = nm.get(NeedleId(1)).unwrap();
assert_eq!(v1.size, Size(100));
@ -904,11 +970,14 @@ mod tests {
let db_path = dir.path().join("test.rdb");
let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100))
.unwrap();
assert_eq!(nm.file_count(), 1);
assert_eq!(nm.content_size(), 100);
let deleted = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
let deleted = nm
.delete(NeedleId(1), Offset::from_actual_offset(0))
.unwrap();
assert_eq!(deleted, Some(Size(100)));
// Additive-only: file_count stays at 1 after delete
@ -927,21 +996,26 @@ mod tests {
let db_path = dir.path().join("test.rdb");
let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap();
nm.put(NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100))
.unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200))
.unwrap();
nm.put(NeedleId(3), Offset::from_actual_offset(384), Size(300))
.unwrap();
assert_eq!(nm.file_count(), 3);
assert_eq!(nm.content_size(), 600);
assert_eq!(nm.max_file_key(), NeedleId(3));
// Update existing — additive-only: file_count increments, content_size adds
nm.put(NeedleId(2), Offset::from_actual_offset(700), Size(250)).unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(700), Size(250))
.unwrap();
assert_eq!(nm.file_count(), 4); // 3 + 1 (always increments)
assert_eq!(nm.content_size(), 850); // 600 + 250 (always adds)
// Delete — additive-only: file_count unchanged
nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
nm.delete(NeedleId(1), Offset::from_actual_offset(0))
.unwrap();
assert_eq!(nm.file_count(), 4); // unchanged
assert_eq!(nm.deleted_count(), 2); // 1 from overwrite + 1 from delete
}
@ -952,11 +1026,35 @@ mod tests {
let db_path = dir.path().join("test.rdb");
let mut idx_data = Vec::new();
idx::write_index_entry(&mut idx_data, NeedleId(1), Offset::from_actual_offset(8), Size(100)).unwrap();
idx::write_index_entry(&mut idx_data, NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap();
idx::write_index_entry(&mut idx_data, NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap();
idx::write_index_entry(
&mut idx_data,
NeedleId(1),
Offset::from_actual_offset(8),
Size(100),
)
.unwrap();
idx::write_index_entry(
&mut idx_data,
NeedleId(2),
Offset::from_actual_offset(128),
Size(200),
)
.unwrap();
idx::write_index_entry(
&mut idx_data,
NeedleId(3),
Offset::from_actual_offset(384),
Size(300),
)
.unwrap();
// Delete needle 2
idx::write_index_entry(&mut idx_data, NeedleId(2), Offset::default(), TOMBSTONE_FILE_SIZE).unwrap();
idx::write_index_entry(
&mut idx_data,
NeedleId(2),
Offset::default(),
TOMBSTONE_FILE_SIZE,
)
.unwrap();
let mut cursor = Cursor::new(idx_data);
let nm = RedbNeedleMap::load_from_idx(db_path.to_str().unwrap(), &mut cursor).unwrap();
@ -973,13 +1071,18 @@ mod tests {
let db_path = dir.path().join("test.rdb");
let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100))
.unwrap();
let r1 = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
let r1 = nm
.delete(NeedleId(1), Offset::from_actual_offset(0))
.unwrap();
assert_eq!(r1, Some(Size(100)));
// Second delete should return None (already deleted)
let r2 = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
let r2 = nm
.delete(NeedleId(1), Offset::from_actual_offset(0))
.unwrap();
assert_eq!(r2, None);
assert_eq!(nm.deleted_count(), 1); // not double counted
}
@ -990,15 +1093,19 @@ mod tests {
let db_path = dir.path().join("test.rdb");
let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap();
nm.put(NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap();
nm.put(NeedleId(3), Offset::from_actual_offset(384), Size(300))
.unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100))
.unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200))
.unwrap();
let mut visited = Vec::new();
nm.ascending_visit(|id, nv| {
visited.push((id, nv.size));
Ok(())
}).unwrap();
})
.unwrap();
assert_eq!(visited.len(), 3);
assert_eq!(visited[0], (NeedleId(1), Size(100)));
@ -1013,11 +1120,15 @@ mod tests {
let idx_path = dir.path().join("test.idx");
let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(8), Size(100)).unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap();
nm.put(NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(8), Size(100))
.unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200))
.unwrap();
nm.put(NeedleId(3), Offset::from_actual_offset(384), Size(300))
.unwrap();
// Delete needle 2
nm.delete(NeedleId(2), Offset::from_actual_offset(128)).unwrap();
nm.delete(NeedleId(2), Offset::from_actual_offset(128))
.unwrap();
nm.save_to_idx(idx_path.to_str().unwrap()).unwrap();
@ -1038,7 +1149,10 @@ mod tests {
};
let packed = pack_needle_value(&nv);
let unpacked = unpack_needle_value(&packed);
assert_eq!(nv.offset.to_actual_offset(), unpacked.offset.to_actual_offset());
assert_eq!(
nv.offset.to_actual_offset(),
unpacked.offset.to_actual_offset()
);
assert_eq!(nv.size, unpacked.size);
}
@ -1050,7 +1164,10 @@ mod tests {
};
let packed = pack_needle_value(&nv);
let unpacked = unpack_needle_value(&packed);
assert_eq!(nv.offset.to_actual_offset(), unpacked.offset.to_actual_offset());
assert_eq!(
nv.offset.to_actual_offset(),
unpacked.offset.to_actual_offset()
);
assert_eq!(nv.size, unpacked.size);
}
@ -1059,7 +1176,8 @@ mod tests {
#[test]
fn test_needle_map_enum_inmemory() {
let mut nm = NeedleMap::InMemory(CompactNeedleMap::new());
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100))
.unwrap();
assert_eq!(nm.get(NeedleId(1)).unwrap().size, Size(100));
assert_eq!(nm.file_count(), 1);
}
@ -1069,7 +1187,8 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.rdb");
let mut nm = NeedleMap::Redb(RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap());
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100))
.unwrap();
assert_eq!(nm.get(NeedleId(1)).unwrap().size, Size(100));
assert_eq!(nm.file_count(), 1);
}

9
seaweed-volume/src/storage/store.rs

@ -459,13 +459,14 @@ impl Store {
where
F: Fn(i64) -> bool,
{
let loc_idx = self.find_volume(vid).map(|(i, _)| i).ok_or_else(|| {
format!("volume id {} is not found during compact", vid.0)
})?;
let loc_idx = self
.find_volume(vid)
.map(|(i, _)| i)
.ok_or_else(|| format!("volume id {} is not found during compact", vid.0))?;
let dir = self.locations[loc_idx].directory.clone();
let (_, free) = crate::storage::disk_location::get_disk_stats(&dir);
// Compute required space from current volume sizes
let required_space = {
let (_, v) = self.find_volume(vid).unwrap();

24
seaweed-volume/src/storage/super_block.rs

@ -62,7 +62,8 @@ impl SuperBlock {
let compaction_revision = u16::from_be_bytes([bytes[4], bytes[5]]);
let extra_size = u16::from_be_bytes([bytes[6], bytes[7]]);
let extra_data = if extra_size > 0 && bytes.len() >= SUPER_BLOCK_SIZE + extra_size as usize {
let extra_data = if extra_size > 0 && bytes.len() >= SUPER_BLOCK_SIZE + extra_size as usize
{
bytes[SUPER_BLOCK_SIZE..SUPER_BLOCK_SIZE + extra_size as usize].to_vec()
} else {
vec![]
@ -127,9 +128,18 @@ impl ReplicaPlacement {
return Err(SuperBlockError::InvalidReplicaPlacement(s.to_string()));
}
let chars: Vec<char> = s.chars().collect();
let dc = chars[0].to_digit(10).ok_or_else(|| SuperBlockError::InvalidReplicaPlacement(s.to_string()))? as u8;
let rack = chars[1].to_digit(10).ok_or_else(|| SuperBlockError::InvalidReplicaPlacement(s.to_string()))? as u8;
let same = chars[2].to_digit(10).ok_or_else(|| SuperBlockError::InvalidReplicaPlacement(s.to_string()))? as u8;
let dc = chars[0]
.to_digit(10)
.ok_or_else(|| SuperBlockError::InvalidReplicaPlacement(s.to_string()))?
as u8;
let rack = chars[1]
.to_digit(10)
.ok_or_else(|| SuperBlockError::InvalidReplicaPlacement(s.to_string()))?
as u8;
let same = chars[2]
.to_digit(10)
.ok_or_else(|| SuperBlockError::InvalidReplicaPlacement(s.to_string()))?
as u8;
Ok(ReplicaPlacement {
diff_data_center_count: dc,
diff_rack_count: rack,
@ -164,7 +174,11 @@ impl ReplicaPlacement {
impl std::fmt::Display for ReplicaPlacement {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}{}{}", self.diff_data_center_count, self.diff_rack_count, self.same_rack_count)
write!(
f,
"{}{}{}",
self.diff_data_center_count, self.diff_rack_count, self.same_rack_count
)
}
}

56
seaweed-volume/src/storage/types.rs

@ -42,8 +42,7 @@ impl NeedleId {
pub fn from_bytes(bytes: &[u8]) -> Self {
assert!(bytes.len() >= NEEDLE_ID_SIZE);
NeedleId(u64::from_be_bytes([
bytes[0], bytes[1], bytes[2], bytes[3],
bytes[4], bytes[5], bytes[6], bytes[7],
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]))
}
@ -64,11 +63,15 @@ impl fmt::Display for NeedleId {
}
impl From<u64> for NeedleId {
fn from(v: u64) -> Self { NeedleId(v) }
fn from(v: u64) -> Self {
NeedleId(v)
}
}
impl From<NeedleId> for u64 {
fn from(v: NeedleId) -> Self { v.0 }
fn from(v: NeedleId) -> Self {
v.0
}
}
// ============================================================================
@ -103,7 +106,9 @@ impl fmt::Display for Cookie {
}
impl From<u32> for Cookie {
fn from(v: u32) -> Self { Cookie(v) }
fn from(v: u32) -> Self {
Cookie(v)
}
}
// ============================================================================
@ -158,11 +163,15 @@ impl Size {
}
impl From<i32> for Size {
fn from(v: i32) -> Self { Size(v) }
fn from(v: i32) -> Self {
Size(v)
}
}
impl From<Size> for i32 {
fn from(v: Size) -> Self { v.0 }
fn from(v: Size) -> Self {
v.0
}
}
// ============================================================================
@ -281,7 +290,9 @@ impl fmt::Display for DiskType {
}
impl Default for DiskType {
fn default() -> Self { DiskType::HardDrive }
fn default() -> Self {
DiskType::HardDrive
}
}
// ============================================================================
@ -309,7 +320,9 @@ impl fmt::Display for VolumeId {
}
impl From<u32> for VolumeId {
fn from(v: u32) -> Self { VolumeId(v) }
fn from(v: u32) -> Self {
VolumeId(v)
}
}
// ============================================================================
@ -325,7 +338,9 @@ pub const VERSION_2: Version = Version(2);
pub const VERSION_3: Version = Version(3);
impl Version {
pub fn current() -> Self { VERSION_3 }
pub fn current() -> Self {
VERSION_3
}
pub fn is_supported(&self) -> bool {
self.0 >= 1 && self.0 <= 3
@ -333,11 +348,15 @@ impl Version {
}
impl Default for Version {
fn default() -> Self { VERSION_3 }
fn default() -> Self {
VERSION_3
}
}
impl From<u8> for Version {
fn from(v: u8) -> Self { Version(v) }
fn from(v: u8) -> Self {
Version(v)
}
}
// ============================================================================
@ -349,7 +368,9 @@ pub fn idx_entry_from_bytes(bytes: &[u8]) -> (NeedleId, Offset, Size) {
assert!(bytes.len() >= NEEDLE_MAP_ENTRY_SIZE);
let key = NeedleId::from_bytes(&bytes[..NEEDLE_ID_SIZE]);
let offset = Offset::from_bytes(&bytes[NEEDLE_ID_SIZE..NEEDLE_ID_SIZE + OFFSET_SIZE]);
let size = Size::from_bytes(&bytes[NEEDLE_ID_SIZE + OFFSET_SIZE..NEEDLE_ID_SIZE + OFFSET_SIZE + SIZE_SIZE]);
let size = Size::from_bytes(
&bytes[NEEDLE_ID_SIZE + OFFSET_SIZE..NEEDLE_ID_SIZE + OFFSET_SIZE + SIZE_SIZE],
);
(key, offset, size)
}
@ -358,7 +379,9 @@ pub fn idx_entry_to_bytes(bytes: &mut [u8], key: NeedleId, offset: Offset, size:
assert!(bytes.len() >= NEEDLE_MAP_ENTRY_SIZE);
key.to_bytes(&mut bytes[..NEEDLE_ID_SIZE]);
offset.to_bytes(&mut bytes[NEEDLE_ID_SIZE..NEEDLE_ID_SIZE + OFFSET_SIZE]);
size.to_bytes(&mut bytes[NEEDLE_ID_SIZE + OFFSET_SIZE..NEEDLE_ID_SIZE + OFFSET_SIZE + SIZE_SIZE]);
size.to_bytes(
&mut bytes[NEEDLE_ID_SIZE + OFFSET_SIZE..NEEDLE_ID_SIZE + OFFSET_SIZE + SIZE_SIZE],
);
}
// ============================================================================
@ -505,7 +528,10 @@ mod tests {
assert_eq!(DiskType::from_string(""), DiskType::HardDrive);
assert_eq!(DiskType::from_string("hdd"), DiskType::Hdd);
assert_eq!(DiskType::from_string("SSD"), DiskType::Ssd);
assert_eq!(DiskType::from_string("nvme"), DiskType::Custom("nvme".to_string()));
assert_eq!(
DiskType::from_string("nvme"),
DiskType::Custom("nvme".to_string())
);
assert_eq!(DiskType::HardDrive.readable_string(), "hdd");
assert_eq!(DiskType::Ssd.readable_string(), "ssd");
}

72
seaweed-volume/src/storage/volume.rs

@ -185,7 +185,11 @@ pub struct VifVolumeInfo {
pub expire_at_sec: u64,
#[serde(default, rename = "readOnly")]
pub read_only: bool,
#[serde(default, rename = "ecShardConfig", skip_serializing_if = "Option::is_none")]
#[serde(
default,
rename = "ecShardConfig",
skip_serializing_if = "Option::is_none"
)]
pub ec_shard_config: Option<VifEcShardConfig>,
}
@ -241,9 +245,11 @@ impl VifVolumeInfo {
dat_file_size: self.dat_file_size,
expire_at_sec: self.expire_at_sec,
read_only: self.read_only,
ec_shard_config: self.ec_shard_config.as_ref().map(|c| crate::pb::volume_server_pb::EcShardConfig {
data_shards: c.data_shards,
parity_shards: c.parity_shards,
ec_shard_config: self.ec_shard_config.as_ref().map(|c| {
crate::pb::volume_server_pb::EcShardConfig {
data_shards: c.data_shards,
parity_shards: c.parity_shards,
}
}),
}
}
@ -1108,7 +1114,15 @@ impl Volume {
// Write tombstone: append needle with empty data
n.data = vec![];
n.append_at_ns = get_append_at_ns(self.last_append_at_ns);
let (offset, _, _) = self.append_needle(n)?;
let offset = if !self.has_remote_file {
// Normal volume: append tombstone to .dat file
let (offset, _, _) = self.append_needle(n)?;
offset
} else {
// Remote-tiered volume: skip .dat append, use offset 0
0
};
self.last_append_at_ns = n.append_at_ns;
// Update index
@ -1594,7 +1608,10 @@ impl Volume {
if MAX_POSSIBLE_VOLUME_SIZE < content_size + needle_blob.len() as u64 {
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::Other,
format!("volume size limit {} exceeded! current size is {}", MAX_POSSIBLE_VOLUME_SIZE, content_size),
format!(
"volume size limit {} exceeded! current size is {}",
MAX_POSSIBLE_VOLUME_SIZE, content_size
),
)));
}
@ -1863,11 +1880,11 @@ impl Volume {
let old_idx_path = self.file_name(".idx");
let mut old_idx_file = File::open(&old_idx_path)?;
// Read new entries from .idx
let mut incremented_entries = std::collections::HashMap::new();
let offset = self.last_compact_index_offset;
old_idx_file.seek(SeekFrom::Start(offset))?;
let entry_count = (old_idx_size - offset) / NEEDLE_MAP_ENTRY_SIZE as u64;
for _ in 0..entry_count {
@ -1883,9 +1900,12 @@ impl Volume {
let cpd_path = self.file_name(".cpd");
let cpx_path = self.file_name(".cpx");
let mut dst_dat = OpenOptions::new().read(true).write(true).open(&cpd_path)?;
let mut dst_idx = OpenOptions::new().write(true).append(true).open(&cpx_path)?;
let mut dst_idx = OpenOptions::new()
.write(true)
.append(true)
.open(&cpx_path)?;
let mut dat_offset = dst_dat.seek(SeekFrom::End(0))?;
let padding_rem = dat_offset % NEEDLE_PADDING_SIZE as u64;
@ -1902,19 +1922,24 @@ impl Volume {
if !needle_offset.is_zero() && !size.is_deleted() && size.0 > 0 {
let actual_size = crate::storage::needle::needle::get_actual_size(size, version);
let mut blob = vec![0u8; actual_size as usize];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
old_dat_file.read_exact_at(&mut blob, needle_offset.to_actual_offset() as u64)?;
old_dat_file
.read_exact_at(&mut blob, needle_offset.to_actual_offset() as u64)?;
}
#[cfg(windows)]
{
crate::storage::volume::read_exact_at(&old_dat_file, &mut blob, needle_offset.to_actual_offset() as u64)?;
crate::storage::volume::read_exact_at(
&old_dat_file,
&mut blob,
needle_offset.to_actual_offset() as u64,
)?;
}
dst_dat.write_all(&blob)?;
let mut idx_entry_buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
crate::storage::types::idx_entry_to_bytes(
&mut idx_entry_buf,
@ -1923,18 +1948,21 @@ impl Volume {
size,
);
dst_idx.write_all(&idx_entry_buf)?;
dat_offset += actual_size as u64;
} else {
let mut fake_del_needle = Needle {
id: key,
cookie: Cookie(0x12345678),
append_at_ns: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64,
append_at_ns: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64,
..Needle::default()
};
let bytes = fake_del_needle.write_bytes(version);
dst_dat.write_all(&bytes)?;
let mut idx_entry_buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
crate::storage::types::idx_entry_to_bytes(
&mut idx_entry_buf,
@ -1943,20 +1971,20 @@ impl Volume {
Size(crate::storage::types::TOMBSTONE_FILE_SIZE.into()),
);
dst_idx.write_all(&idx_entry_buf)?;
dat_offset += bytes.len() as u64;
}
let padding = NEEDLE_PADDING_SIZE as u64 - (dat_offset % NEEDLE_PADDING_SIZE as u64);
if padding != NEEDLE_PADDING_SIZE as u64 {
dat_offset += padding;
dst_dat.seek(SeekFrom::Start(dat_offset))?;
}
}
dst_dat.sync_all()?;
dst_idx.sync_all()?;
Ok(())
}

2
seaweed-volume/tests/http_integration.rs

@ -515,5 +515,7 @@ async fn admin_router_can_expose_ui_with_explicit_override() {
.await
.unwrap();
// UI handler does JWT check inside but read_signing_key is empty in this test,
// so it returns 200 (auth is only enforced when read key is set)
assert_eq!(response.status(), StatusCode::OK);
}
Loading…
Cancel
Save