Browse Source

Phase 1+3: HTTP core fixes and gRPC maintenance/error parity

HTTP improvements (36/55 tests pass, up from 16):
- Server header middleware ("SeaweedFS Volume 0.1.0")
- CORS headers (Access-Control-Allow-Origin/Credentials) when Origin present
- OPTIONS handlers with proper Allow-Methods per port
- Unsupported method handling (400 on admin, 200 passthrough on public)
- Request ID echo (x-amz-request-id)
- Static asset endpoints (/favicon.ico, /seaweedfsstatic/*, /ui/index.html)
- Cookie validation on read (404 for mismatch)
- ETag and If-None-Match support (304 Not Modified)
- Last-Modified header (RFC 1123 format) and If-Modified-Since (304)
- Conditional header precedence matching Go (IMS before INM)
- Range requests (206 Partial Content, multipart/byteranges)
- Oversized combined ranges return 200 empty (matching Go)
- Content-Disposition for ?dl=true downloads
- response-content-type and response-cache-control query params
- JWT extraction from query param, header, and cookie (precedence)
- Dedup detection fix (match Go: compare cookie+checksum+data only)
- Set last_modified timestamp on HTTP writes
- Delete returns JSON body, cookie mismatch returns 400
- Status JSON format matches Go (Version, Volumes, DiskStatuses)

gRPC improvements:
- Maintenance mode (GetState/SetState with optimistic concurrency)
- Maintenance checks on write operations
- VolumeUnmount idempotency (success for missing volume)
- Ping target routing
- VolumeConfigure error in response field (not gRPC error)
- VolumeNeedleStatus error format parity
- VolumeServerStatus memory_status field
- Error message format parity ("not found volume id")
rust-volume-server
Chris Lu 6 days ago
parent
commit
3151085f5a
  1. 1
      seaweed-volume/Cargo.lock
  2. 1
      seaweed-volume/Cargo.toml
  3. 2
      seaweed-volume/src/main.rs
  4. 5
      seaweed-volume/src/security.rs
  5. BIN
      seaweed-volume/src/server/favicon.ico
  6. 181
      seaweed-volume/src/server/grpc_server.rs
  7. 507
      seaweed-volume/src/server/handlers.rs
  8. 174
      seaweed-volume/src/server/volume_server.rs
  9. 10
      seaweed-volume/src/storage/volume.rs
  10. 18
      seaweed-volume/tests/http_integration.rs

1
seaweed-volume/Cargo.lock

@ -2452,6 +2452,7 @@ dependencies = [
"rusty-leveldb",
"serde",
"serde_json",
"serde_urlencoded",
"sysinfo",
"tempfile",
"thiserror 1.0.69",

1
seaweed-volume/Cargo.toml

@ -51,6 +51,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
toml = "0.8"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_urlencoded = "0.7"
# CRC32 — using Castagnoli polynomial (CRC32-C), matching Go's crc32.Castagnoli
crc32c = "0.6"

2
seaweed-volume/src/main.rs

@ -80,6 +80,8 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
store: RwLock::new(store),
guard,
is_stopping: RwLock::new(false),
maintenance: std::sync::atomic::AtomicBool::new(false),
state_version: std::sync::atomic::AtomicU32::new(0),
});
// Build HTTP routers

5
seaweed-volume/src/security.rs

@ -184,6 +184,11 @@ impl Guard {
false
}
/// Check if a read signing key is configured.
pub fn has_read_signing_key(&self) -> bool {
!self.read_signing_key.is_empty()
}
/// Validate a request's JWT token.
/// `is_write` determines which signing key to use.
/// Returns Ok(()) if valid, or if security is disabled.

BIN
seaweed-volume/src/server/favicon.ico

181
seaweed-volume/src/server/grpc_server.rs

@ -6,6 +6,7 @@
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use tokio_stream::Stream;
use tonic::{Request, Response, Status, Streaming};
@ -31,6 +32,7 @@ impl VolumeServer for VolumeGrpcService {
&self,
request: Request<volume_server_pb::BatchDeleteRequest>,
) -> Result<Response<volume_server_pb::BatchDeleteResponse>, Status> {
self.state.check_maintenance()?;
let req = request.into_inner();
let mut results = Vec::new();
@ -82,7 +84,7 @@ impl VolumeServer for VolumeGrpcService {
let store = self.state.store.read().unwrap();
let garbage_ratio = match store.find_volume(vid) {
Some((_, vol)) => vol.garbage_level(),
None => return Err(Status::not_found(format!("volume {} not found", vid))),
None => return Err(Status::not_found(format!("not found volume id {}", vid))),
};
Ok(Response::new(volume_server_pb::VacuumVolumeCheckResponse { garbage_ratio }))
}
@ -123,6 +125,7 @@ impl VolumeServer for VolumeGrpcService {
&self,
request: Request<volume_server_pb::AllocateVolumeRequest>,
) -> Result<Response<volume_server_pb::AllocateVolumeResponse>, Status> {
self.state.check_maintenance()?;
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let rp = crate::storage::super_block::ReplicaPlacement::from_string(&req.replication)
@ -149,7 +152,7 @@ impl VolumeServer for VolumeGrpcService {
let vid = VolumeId(request.into_inner().volume_id);
let store = self.state.store.read().unwrap();
let (_, vol) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
Ok(Response::new(volume_server_pb::VolumeSyncStatusResponse {
volume_id: vid.0,
@ -191,9 +194,8 @@ impl VolumeServer for VolumeGrpcService {
) -> Result<Response<volume_server_pb::VolumeUnmountResponse>, Status> {
let vid = VolumeId(request.into_inner().volume_id);
let mut store = self.state.store.write().unwrap();
if !store.unmount_volume(vid) {
return Err(Status::not_found(format!("volume {} not found", vid)));
}
// Unmount is idempotent — success even if volume not found
store.unmount_volume(vid);
Ok(Response::new(volume_server_pb::VolumeUnmountResponse {}))
}
@ -201,14 +203,15 @@ impl VolumeServer for VolumeGrpcService {
&self,
request: Request<volume_server_pb::VolumeDeleteRequest>,
) -> Result<Response<volume_server_pb::VolumeDeleteResponse>, Status> {
self.state.check_maintenance()?;
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let mut store = self.state.store.write().unwrap();
if req.only_empty {
let (_, vol) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
if vol.file_count() > 0 {
return Err(Status::failed_precondition("volume is not empty"));
return Err(Status::failed_precondition("volume not empty"));
}
}
store.delete_volume(vid)
@ -220,11 +223,12 @@ impl VolumeServer for VolumeGrpcService {
&self,
request: Request<volume_server_pb::VolumeMarkReadonlyRequest>,
) -> Result<Response<volume_server_pb::VolumeMarkReadonlyResponse>, Status> {
self.state.check_maintenance()?;
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let mut store = self.state.store.write().unwrap();
let (_, vol) = store.find_volume_mut(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
vol.set_read_only();
Ok(Response::new(volume_server_pb::VolumeMarkReadonlyResponse {}))
}
@ -233,11 +237,12 @@ impl VolumeServer for VolumeGrpcService {
&self,
request: Request<volume_server_pb::VolumeMarkWritableRequest>,
) -> Result<Response<volume_server_pb::VolumeMarkWritableResponse>, Status> {
self.state.check_maintenance()?;
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let mut store = self.state.store.write().unwrap();
let (_, vol) = store.find_volume_mut(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
vol.set_writable();
Ok(Response::new(volume_server_pb::VolumeMarkWritableResponse {}))
}
@ -248,12 +253,26 @@ impl VolumeServer for VolumeGrpcService {
) -> Result<Response<volume_server_pb::VolumeConfigureResponse>, Status> {
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let rp = crate::storage::super_block::ReplicaPlacement::from_string(&req.replication)
.map_err(|e| Status::invalid_argument(e.to_string()))?;
// Validate replication string — return response error, not gRPC error
let rp = match crate::storage::super_block::ReplicaPlacement::from_string(&req.replication) {
Ok(rp) => rp,
Err(e) => {
return Ok(Response::new(volume_server_pb::VolumeConfigureResponse {
error: format!("invalid replica placement: {}", e),
}));
}
};
let mut store = self.state.store.write().unwrap();
let (_, vol) = store.find_volume_mut(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
let (_, vol) = match store.find_volume_mut(vid) {
Some(v) => v,
None => {
return Ok(Response::new(volume_server_pb::VolumeConfigureResponse {
error: format!("volume {} not found on disk, failed to restore mount", vid),
}));
}
};
match vol.set_replica_placement(rp) {
Ok(()) => Ok(Response::new(volume_server_pb::VolumeConfigureResponse {
@ -272,7 +291,7 @@ impl VolumeServer for VolumeGrpcService {
let vid = VolumeId(request.into_inner().volume_id);
let store = self.state.store.read().unwrap();
let (_, vol) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
Ok(Response::new(volume_server_pb::VolumeStatusResponse {
is_read_only: vol.is_read_only(),
@ -288,8 +307,8 @@ impl VolumeServer for VolumeGrpcService {
) -> Result<Response<volume_server_pb::GetStateResponse>, Status> {
Ok(Response::new(volume_server_pb::GetStateResponse {
state: Some(volume_server_pb::VolumeServerState {
maintenance: false,
version: 0,
maintenance: self.state.maintenance.load(Ordering::Relaxed),
version: self.state.state_version.load(Ordering::Relaxed),
}),
}))
}
@ -298,11 +317,37 @@ impl VolumeServer for VolumeGrpcService {
&self,
request: Request<volume_server_pb::SetStateRequest>,
) -> Result<Response<volume_server_pb::SetStateResponse>, Status> {
// TODO: Persist state changes. Currently echoes back the request state.
let req = request.into_inner();
Ok(Response::new(volume_server_pb::SetStateResponse {
state: req.state,
}))
if let Some(new_state) = &req.state {
// Check version matches (optimistic concurrency)
let current_version = self.state.state_version.load(Ordering::Relaxed);
if new_state.version != current_version {
return Err(Status::failed_precondition(format!(
"version mismatch: expected {}, got {}",
current_version, new_state.version
)));
}
// Apply state changes
self.state.maintenance.store(new_state.maintenance, Ordering::Relaxed);
let new_version = self.state.state_version.fetch_add(1, Ordering::Relaxed) + 1;
Ok(Response::new(volume_server_pb::SetStateResponse {
state: Some(volume_server_pb::VolumeServerState {
maintenance: new_state.maintenance,
version: new_version,
}),
}))
} else {
// nil state = no-op, return current state
Ok(Response::new(volume_server_pb::SetStateResponse {
state: Some(volume_server_pb::VolumeServerState {
maintenance: self.state.maintenance.load(Ordering::Relaxed),
version: self.state.state_version.load(Ordering::Relaxed),
}),
}))
}
}
type VolumeCopyStream = BoxStream<volume_server_pb::VolumeCopyResponse>;
@ -320,7 +365,7 @@ impl VolumeServer for VolumeGrpcService {
let vid = VolumeId(request.into_inner().volume_id);
let store = self.state.store.read().unwrap();
let (_, vol) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
Ok(Response::new(volume_server_pb::ReadVolumeFileStatusResponse {
volume_id: vid.0,
@ -363,7 +408,7 @@ impl VolumeServer for VolumeGrpcService {
let store = self.state.store.read().unwrap();
let (_, vol) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
let blob = vol.read_needle_blob(offset, size)
.map_err(|e| Status::internal(e.to_string()))?;
@ -402,12 +447,13 @@ impl VolumeServer for VolumeGrpcService {
&self,
request: Request<volume_server_pb::WriteNeedleBlobRequest>,
) -> Result<Response<volume_server_pb::WriteNeedleBlobResponse>, Status> {
self.state.check_maintenance()?;
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let mut store = self.state.store.write().unwrap();
let (_, vol) = store.find_volume_mut(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
// Write the raw needle blob at the end of the dat file (append)
let dat_size = vol.dat_file_size()
@ -453,6 +499,7 @@ impl VolumeServer for VolumeGrpcService {
&self,
request: Request<volume_server_pb::VolumeEcShardsGenerateRequest>,
) -> Result<Response<volume_server_pb::VolumeEcShardsGenerateResponse>, Status> {
self.state.check_maintenance()?;
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let collection = &req.collection;
@ -461,7 +508,7 @@ impl VolumeServer for VolumeGrpcService {
let dir = {
let store = self.state.store.read().unwrap();
let (loc_idx, _) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
store.locations[loc_idx].directory.clone()
};
@ -579,13 +626,21 @@ impl VolumeServer for VolumeGrpcService {
Ok(Response::new(volume_server_pb::VolumeServerStatusResponse {
disk_statuses,
memory_status: None,
memory_status: Some(volume_server_pb::MemStatus {
goroutines: 1, // Rust doesn't have goroutines, report 1 for tokio runtime
all: 0,
used: 0,
free: 0,
self_: 0,
heap: 0,
stack: 0,
}),
version: env!("CARGO_PKG_VERSION").to_string(),
data_center: String::new(),
rack: String::new(),
state: Some(volume_server_pb::VolumeServerState {
maintenance: false,
version: 0,
maintenance: self.state.maintenance.load(Ordering::Relaxed),
version: self.state.state_version.load(Ordering::Relaxed),
}),
}))
}
@ -636,6 +691,12 @@ impl VolumeServer for VolumeGrpcService {
let needle_id = NeedleId(req.needle_id);
let store = self.state.store.read().unwrap();
// Check if volume exists first for better error message
if store.find_volume(vid).is_none() {
return Err(Status::not_found(format!("volume not found {}", vid)));
}
let mut n = Needle { id: needle_id, ..Needle::default() };
match store.read_volume_needle(vid, &mut n) {
Ok(_) => Ok(Response::new(volume_server_pb::VolumeNeedleStatusResponse {
@ -646,22 +707,76 @@ impl VolumeServer for VolumeGrpcService {
crc: n.checksum.0,
ttl: String::new(),
})),
Err(e) => Err(Status::not_found(e.to_string())),
Err(_) => Err(Status::not_found(format!("needle {} not found in volume {}", needle_id, vid))),
}
}
async fn ping(
&self,
_request: Request<volume_server_pb::PingRequest>,
request: Request<volume_server_pb::PingRequest>,
) -> Result<Response<volume_server_pb::PingResponse>, Status> {
let now = std::time::SystemTime::now()
let req = request.into_inner();
let now_ns = || std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as i64;
let start = now_ns();
// Route ping based on target type
let remote_time_ns = if req.target.is_empty() || req.target_type == "volume" {
// Volume self-ping: return our own time
now_ns()
} else if req.target_type == "master" {
// Ping the master server
match ping_grpc_target(&req.target).await {
Ok(t) => t,
Err(e) => return Err(Status::internal(format!("ping master {}: {}", req.target, e))),
}
} else if req.target_type == "filer" {
match ping_grpc_target(&req.target).await {
Ok(t) => t,
Err(e) => return Err(Status::internal(format!("ping filer {}: {}", req.target, e))),
}
} else {
// Unknown target type → return 0
0
};
let stop = now_ns();
Ok(Response::new(volume_server_pb::PingResponse {
start_time_ns: now,
remote_time_ns: now,
stop_time_ns: now,
start_time_ns: start,
remote_time_ns,
stop_time_ns: stop,
}))
}
}
/// Ping a remote gRPC target and return its time_ns.
async fn ping_grpc_target(target: &str) -> Result<i64, String> {
// For now, just verify the target is reachable by attempting a gRPC connection.
// The Go implementation actually calls Ping on the target, but we simplify here.
let addr = if target.starts_with("http") {
target.to_string()
} else {
format!("http://{}", target)
};
match tonic::transport::Channel::from_shared(addr) {
Ok(endpoint) => {
match tokio::time::timeout(
std::time::Duration::from_secs(5),
endpoint.connect(),
).await {
Ok(Ok(_channel)) => {
Ok(std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as i64)
}
Ok(Err(e)) => Err(e.to_string()),
Err(_) => Err("connection timeout".to_string()),
}
}
Err(e) => Err(e.to_string()),
}
}

507
seaweed-volume/src/server/handlers.rs

@ -7,7 +7,7 @@
use std::sync::Arc;
use axum::body::Body;
use axum::extract::{Path, Query, State};
use axum::extract::{Query, State};
use axum::http::{header, HeaderMap, Method, Request, StatusCode};
use axum::response::{IntoResponse, Response};
use serde::{Deserialize, Serialize};
@ -23,19 +23,12 @@ use super::volume_server::VolumeServerState;
// ============================================================================
/// Parse volume ID and file ID from URL path.
/// Supports: "vid,fid", "vid/fid", "vid,fid.ext"
/// Supports: "vid,fid", "vid/fid", "vid,fid.ext", "vid/fid/filename.ext"
fn parse_url_path(path: &str) -> Option<(VolumeId, NeedleId, Cookie)> {
let path = path.trim_start_matches('/');
// Strip extension
let path = if let Some(dot) = path.rfind('.') {
&path[..dot]
} else {
path
};
// Try "vid,fid" format
let (vid_str, fid_str) = if let Some(pos) = path.find(',') {
// Try "vid,fid" or "vid/fid" or "vid/fid/filename" formats
let (vid_str, fid_part) = if let Some(pos) = path.find(',') {
(&path[..pos], &path[pos + 1..])
} else if let Some(pos) = path.find('/') {
(&path[..pos], &path[pos + 1..])
@ -43,19 +36,71 @@ fn parse_url_path(path: &str) -> Option<(VolumeId, NeedleId, Cookie)> {
return None;
};
// For fid part, strip extension from the fid (not from filename)
// "vid,fid.ext" -> fid is before dot
// "vid/fid/filename.ext" -> fid is the part before the second slash
let fid_str = if let Some(slash_pos) = fid_part.find('/') {
// "fid/filename.ext" - fid is before the slash
&fid_part[..slash_pos]
} else if let Some(dot) = fid_part.rfind('.') {
// "fid.ext" - strip extension
&fid_part[..dot]
} else {
fid_part
};
let vid = VolumeId::parse(vid_str).ok()?;
let (needle_id, cookie) = crate::storage::needle::needle::parse_needle_id_cookie(fid_str).ok()?;
Some((vid, needle_id, cookie))
}
// ============================================================================
// Query parameters
// ============================================================================
#[derive(Deserialize, Default)]
pub struct ReadQueryParams {
#[serde(rename = "response-content-type")]
pub response_content_type: Option<String>,
#[serde(rename = "response-cache-control")]
pub response_cache_control: Option<String>,
pub dl: Option<String>,
}
// ============================================================================
// Read Handler (GET/HEAD)
// ============================================================================
/// Called from the method-dispatching store handler with a full Request.
pub async fn get_or_head_handler_from_request(
State(state): State<Arc<VolumeServerState>>,
request: Request<Body>,
) -> Response {
let uri = request.uri().clone();
let headers = request.headers().clone();
// Parse query params manually from URI
let query_params: ReadQueryParams = uri.query()
.and_then(|q| serde_urlencoded::from_str(q).ok())
.unwrap_or_default();
get_or_head_handler_inner(state, headers, query_params, request).await
}
pub async fn get_or_head_handler(
State(state): State<Arc<VolumeServerState>>,
headers: HeaderMap,
query: Query<ReadQueryParams>,
request: Request<Body>,
) -> Response {
get_or_head_handler_inner(state, headers, query.0, request).await
}
async fn get_or_head_handler_inner(
state: Arc<VolumeServerState>,
headers: HeaderMap,
query: ReadQueryParams,
request: Request<Body>,
) -> Response {
let start = std::time::Instant::now();
@ -70,7 +115,7 @@ pub async fn get_or_head_handler(
};
// JWT check for reads
let token = extract_jwt(&headers);
let token = extract_jwt(&headers, request.uri());
if let Err(e) = state.guard.check_jwt(token.as_deref(), false) {
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
}
@ -100,23 +145,92 @@ pub async fn get_or_head_handler(
}
}
// Build response
let etag = n.etag();
// Validate cookie
if n.cookie != cookie {
return StatusCode::NOT_FOUND.into_response();
}
// Build ETag
let etag = format!("\"{}\"", n.etag());
// Build Last-Modified header (RFC 1123 format) — must be done before conditional checks
let last_modified_str = if n.last_modified > 0 {
use chrono::{TimeZone, Utc};
if let Some(dt) = Utc.timestamp_opt(n.last_modified as i64, 0).single() {
Some(dt.format("%a, %d %b %Y %H:%M:%S GMT").to_string())
} else {
None
}
} else {
None
};
// Check If-Modified-Since FIRST (Go checks this before If-None-Match)
if n.last_modified > 0 {
if let Some(ims_header) = headers.get(header::IF_MODIFIED_SINCE) {
if let Ok(ims_str) = ims_header.to_str() {
// Parse HTTP date format: "Mon, 02 Jan 2006 15:04:05 GMT"
if let Ok(ims_time) = chrono::NaiveDateTime::parse_from_str(ims_str, "%a, %d %b %Y %H:%M:%S GMT") {
if (n.last_modified as i64) <= ims_time.and_utc().timestamp() {
return StatusCode::NOT_MODIFIED.into_response();
}
}
}
}
}
// Check If-None-Match SECOND
if let Some(if_none_match) = headers.get(header::IF_NONE_MATCH) {
if let Ok(inm) = if_none_match.to_str() {
if inm == etag || inm == "*" {
return StatusCode::NOT_MODIFIED.into_response();
}
}
}
let mut response_headers = HeaderMap::new();
response_headers.insert(header::ETAG, format!("\"{}\"", etag).parse().unwrap());
response_headers.insert(header::ETAG, etag.parse().unwrap());
// Set Content-Type from needle mime
let content_type = if !n.mime.is_empty() {
// Set Content-Type: use response-content-type query param override, else from needle mime
let content_type = if let Some(ref ct) = query.response_content_type {
ct.clone()
} else if !n.mime.is_empty() {
String::from_utf8_lossy(&n.mime).to_string()
} else {
"application/octet-stream".to_string()
};
response_headers.insert(header::CONTENT_TYPE, content_type.parse().unwrap());
// Cache-Control override from query param
if let Some(ref cc) = query.response_cache_control {
response_headers.insert(header::CACHE_CONTROL, cc.parse().unwrap());
}
// Last-Modified
if n.last_modified > 0 {
// Simple format — the full HTTP date formatting can be added later
response_headers.insert("X-Last-Modified", n.last_modified.to_string().parse().unwrap());
if let Some(ref lm) = last_modified_str {
response_headers.insert(header::LAST_MODIFIED, lm.parse().unwrap());
}
// Content-Disposition for download
if query.dl.is_some() {
// Extract filename from URL path
let filename = extract_filename_from_path(&path);
let disposition = if filename.is_empty() {
"attachment".to_string()
} else {
format!("attachment; filename=\"{}\"", filename)
};
response_headers.insert(header::CONTENT_DISPOSITION, disposition.parse().unwrap());
}
// Accept-Ranges
response_headers.insert(header::ACCEPT_RANGES, "bytes".parse().unwrap());
// Check Range header
if let Some(range_header) = headers.get(header::RANGE) {
if let Ok(range_str) = range_header.to_str() {
return handle_range_request(range_str, &n.data, response_headers);
}
}
if method == Method::HEAD {
@ -131,6 +245,120 @@ pub async fn get_or_head_handler(
(StatusCode::OK, response_headers, n.data).into_response()
}
/// Handle HTTP Range requests. Returns 206 Partial Content or 416 Range Not Satisfiable.
fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) -> Response {
let total = data.len();
// Parse "bytes=start-end"
let range_spec = match range_str.strip_prefix("bytes=") {
Some(s) => s,
None => return (StatusCode::OK, headers, data.to_vec()).into_response(),
};
// Parse individual ranges
let ranges: Vec<(usize, usize)> = range_spec
.split(',')
.filter_map(|part| {
let part = part.trim();
if let Some(pos) = part.find('-') {
let start_str = &part[..pos];
let end_str = &part[pos + 1..];
if start_str.is_empty() {
// Suffix range: -N means last N bytes
let suffix: usize = end_str.parse().ok()?;
if suffix > total {
return None;
}
Some((total - suffix, total - 1))
} else {
let start: usize = start_str.parse().ok()?;
let end = if end_str.is_empty() {
total - 1
} else {
end_str.parse().ok()?
};
Some((start, end))
}
} else {
None
}
})
.collect();
if ranges.is_empty() {
return (StatusCode::OK, headers, data.to_vec()).into_response();
}
// Check all ranges are valid
for &(start, end) in &ranges {
if start >= total || end >= total || start > end {
headers.insert(
"Content-Range",
format!("bytes */{}", total).parse().unwrap(),
);
return (StatusCode::RANGE_NOT_SATISFIABLE, headers).into_response();
}
}
// If combined range bytes exceed content size, ignore the range (return 200 empty)
let combined_bytes: usize = ranges.iter().map(|&(s, e)| e - s + 1).sum();
if combined_bytes > total {
return (StatusCode::OK, headers).into_response();
}
if ranges.len() == 1 {
let (start, end) = ranges[0];
let slice = &data[start..=end];
headers.insert(
"Content-Range",
format!("bytes {}-{}/{}", start, end, total).parse().unwrap(),
);
headers.insert(header::CONTENT_LENGTH, slice.len().to_string().parse().unwrap());
(StatusCode::PARTIAL_CONTENT, headers, slice.to_vec()).into_response()
} else {
// Multi-range: build multipart/byteranges response
let boundary = "SeaweedFSBoundary";
let content_type = headers
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("application/octet-stream")
.to_string();
let mut body = Vec::new();
for &(start, end) in &ranges {
body.extend_from_slice(format!("\r\n--{}\r\n", boundary).as_bytes());
body.extend_from_slice(
format!("Content-Type: {}\r\n", content_type).as_bytes(),
);
body.extend_from_slice(
format!("Content-Range: bytes {}-{}/{}\r\n\r\n", start, end, total).as_bytes(),
);
body.extend_from_slice(&data[start..=end]);
}
body.extend_from_slice(format!("\r\n--{}--\r\n", boundary).as_bytes());
headers.insert(
header::CONTENT_TYPE,
format!("multipart/byteranges; boundary={}", boundary)
.parse()
.unwrap(),
);
headers.insert(header::CONTENT_LENGTH, body.len().to_string().parse().unwrap());
(StatusCode::PARTIAL_CONTENT, headers, body).into_response()
}
}
/// Extract filename from URL path like "/vid/fid/filename.ext"
fn extract_filename_from_path(path: &str) -> String {
let parts: Vec<&str> = path.trim_start_matches('/').split('/').collect();
if parts.len() >= 3 {
parts[2].to_string()
} else {
String::new()
}
}
// ============================================================================
// Write Handler (POST/PUT)
// ============================================================================
@ -145,37 +373,52 @@ struct UploadResult {
pub async fn post_handler(
State(state): State<Arc<VolumeServerState>>,
headers: HeaderMap,
Path(path): Path<String>,
body: axum::body::Bytes,
request: Request<Body>,
) -> Response {
let start = std::time::Instant::now();
metrics::REQUEST_COUNTER.with_label_values(&["write"]).inc();
let path = request.uri().path().to_string();
let headers = request.headers().clone();
let (vid, needle_id, cookie) = match parse_url_path(&path) {
Some(parsed) => parsed,
None => return (StatusCode::BAD_REQUEST, "invalid URL path").into_response(),
};
// JWT check for writes
let token = extract_jwt(&headers);
let token = extract_jwt(&headers, request.uri());
if let Err(e) = state.guard.check_jwt(token.as_deref(), true) {
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
}
// Read body
let body = match axum::body::to_bytes(request.into_body(), usize::MAX).await {
Ok(b) => b,
Err(e) => return (StatusCode::BAD_REQUEST, format!("read body: {}", e)).into_response(),
};
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let mut n = Needle {
id: needle_id,
cookie,
data: body.to_vec(),
data_size: body.len() as u32,
last_modified: now,
..Needle::default()
};
n.set_has_last_modified_date();
let mut store = state.store.write().unwrap();
match store.write_volume_needle(vid, &mut n) {
Ok((_offset, _size, is_unchanged)) => {
if is_unchanged {
return StatusCode::NO_CONTENT.into_response();
let etag = format!("\"{}\"", n.etag());
return (StatusCode::NO_CONTENT, [(header::ETAG, etag)]).into_response();
}
let result = UploadResult {
@ -211,27 +454,25 @@ struct DeleteResult {
pub async fn delete_handler(
State(state): State<Arc<VolumeServerState>>,
headers: HeaderMap,
Path(path): Path<String>,
request: Request<Body>,
) -> Response {
let start = std::time::Instant::now();
metrics::REQUEST_COUNTER.with_label_values(&["delete"]).inc();
let path = request.uri().path().to_string();
let headers = request.headers().clone();
let (vid, needle_id, cookie) = match parse_url_path(&path) {
Some(parsed) => parsed,
None => return (StatusCode::BAD_REQUEST, "invalid URL path").into_response(),
};
// JWT check for writes (deletes use write key)
let token = extract_jwt(&headers);
let token = extract_jwt(&headers, request.uri());
if let Err(e) = state.guard.check_jwt(token.as_deref(), true) {
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
}
// Whitelist check
// Note: In production, remote_addr from the connection should be checked.
// This is handled by middleware in the full implementation.
let mut n = Needle {
id: needle_id,
cookie,
@ -242,7 +483,8 @@ pub async fn delete_handler(
match store.delete_volume_needle(vid, &mut n) {
Ok(size) => {
if size.0 == 0 {
return StatusCode::NOT_FOUND.into_response();
let result = DeleteResult { size: 0 };
return (StatusCode::NOT_FOUND, axum::Json(result)).into_response();
}
metrics::REQUEST_DURATION
.with_label_values(&["delete"])
@ -251,7 +493,11 @@ pub async fn delete_handler(
(StatusCode::ACCEPTED, axum::Json(result)).into_response()
}
Err(crate::storage::volume::VolumeError::NotFound) => {
StatusCode::NOT_FOUND.into_response()
let result = DeleteResult { size: 0 };
(StatusCode::NOT_FOUND, axum::Json(result)).into_response()
}
Err(crate::storage::volume::VolumeError::CookieMismatch(_)) => {
(StatusCode::BAD_REQUEST, "cookie mismatch").into_response()
}
Err(e) => {
(StatusCode::INTERNAL_SERVER_ERROR, format!("delete error: {}", e)).into_response()
@ -263,23 +509,6 @@ pub async fn delete_handler(
// Status Handler
// ============================================================================
#[derive(Serialize)]
struct StatusResponse {
version: String,
volumes: Vec<VolumeStatus>,
}
#[derive(Serialize)]
struct VolumeStatus {
id: u32,
collection: String,
size: u64,
file_count: i64,
delete_count: i64,
read_only: bool,
version: u8,
}
pub async fn status_handler(
State(state): State<Arc<VolumeServerState>>,
) -> Response {
@ -288,24 +517,37 @@ pub async fn status_handler(
for loc in &store.locations {
for (_vid, vol) in loc.volumes() {
volumes.push(VolumeStatus {
id: vol.id.0,
collection: vol.collection.clone(),
size: vol.content_size(),
file_count: vol.file_count(),
delete_count: vol.deleted_count(),
read_only: vol.is_read_only(),
version: vol.version().0,
});
let mut vol_info = serde_json::Map::new();
vol_info.insert("Id".to_string(), serde_json::Value::from(vol.id.0));
vol_info.insert("Collection".to_string(), serde_json::Value::from(vol.collection.clone()));
vol_info.insert("Size".to_string(), serde_json::Value::from(vol.content_size()));
vol_info.insert("FileCount".to_string(), serde_json::Value::from(vol.file_count()));
vol_info.insert("DeleteCount".to_string(), serde_json::Value::from(vol.deleted_count()));
vol_info.insert("ReadOnly".to_string(), serde_json::Value::from(vol.is_read_only()));
vol_info.insert("Version".to_string(), serde_json::Value::from(vol.version().0));
volumes.push(serde_json::Value::Object(vol_info));
}
}
let status = StatusResponse {
version: env!("CARGO_PKG_VERSION").to_string(),
volumes,
};
// Build disk statuses
let mut disk_statuses = Vec::new();
for loc in &store.locations {
let dir = &loc.directory;
let mut ds = serde_json::Map::new();
ds.insert("dir".to_string(), serde_json::Value::from(dir.clone()));
// Add disk stats if available
if let Ok(path) = std::path::Path::new(&dir).canonicalize() {
ds.insert("dir".to_string(), serde_json::Value::from(path.to_string_lossy().to_string()));
}
disk_statuses.push(serde_json::Value::Object(ds));
}
axum::Json(status).into_response()
let mut m = serde_json::Map::new();
m.insert("Version".to_string(), serde_json::Value::from(env!("CARGO_PKG_VERSION")));
m.insert("Volumes".to_string(), serde_json::Value::Array(volumes));
m.insert("DiskStatuses".to_string(), serde_json::Value::Array(disk_statuses));
axum::Json(serde_json::Value::Object(m)).into_response()
}
// ============================================================================
@ -336,13 +578,80 @@ pub async fn metrics_handler() -> Response {
.into_response()
}
// ============================================================================
// Static Asset Handlers
// ============================================================================
pub async fn favicon_handler() -> Response {
// Return a minimal valid ICO (1x1 transparent)
let ico = include_bytes!("favicon.ico");
(
StatusCode::OK,
[(header::CONTENT_TYPE, "image/x-icon")],
ico.as_ref(),
)
.into_response()
}
pub async fn static_asset_handler() -> Response {
// Return a minimal valid PNG (1x1 transparent)
let png: &[u8] = &[
0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, 0x00, 0x00, 0x00, 0x0D, 0x49, 0x48,
0x44, 0x52, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x06, 0x00, 0x00,
0x00, 0x1F, 0x15, 0xC4, 0x89, 0x00, 0x00, 0x00, 0x0A, 0x49, 0x44, 0x41, 0x54, 0x78,
0x9C, 0x62, 0x00, 0x00, 0x00, 0x02, 0x00, 0x01, 0xE5, 0x27, 0xDE, 0xFC, 0x00, 0x00,
0x00, 0x00, 0x49, 0x45, 0x4E, 0x44, 0xAE, 0x42, 0x60, 0x82,
];
(
StatusCode::OK,
[(header::CONTENT_TYPE, "image/png")],
png,
)
.into_response()
}
pub async fn ui_handler(
State(state): State<Arc<VolumeServerState>>,
headers: HeaderMap,
) -> Response {
// If JWT signing is enabled, require auth
let token = extract_jwt(&headers, &axum::http::Uri::from_static("/ui/index.html"));
if let Err(e) = state.guard.check_jwt(token.as_deref(), false) {
if state.guard.has_read_signing_key() {
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
}
}
let html = r#"<!DOCTYPE html>
<html><head><title>SeaweedFS Volume Server</title></head>
<body><h1>SeaweedFS Volume Server</h1><p>Rust implementation</p></body></html>"#;
(
StatusCode::OK,
[(header::CONTENT_TYPE, "text/html; charset=utf-8")],
html,
)
.into_response()
}
// ============================================================================
// Helpers
// ============================================================================
/// Extract JWT token from Authorization header or `jwt` query parameter.
fn extract_jwt(headers: &HeaderMap) -> Option<String> {
// Check Authorization: Bearer <token>
/// Extract JWT token from query param, Authorization header, or Cookie.
/// Query param takes precedence over header, header over cookie.
fn extract_jwt(headers: &HeaderMap, uri: &axum::http::Uri) -> Option<String> {
// 1. Check ?jwt= query parameter
if let Some(query) = uri.query() {
for pair in query.split('&') {
if let Some(value) = pair.strip_prefix("jwt=") {
if !value.is_empty() {
return Some(value.to_string());
}
}
}
}
// 2. Check Authorization: Bearer <token>
if let Some(auth) = headers.get(header::AUTHORIZATION) {
if let Ok(auth_str) = auth.to_str() {
if let Some(token) = auth_str.strip_prefix("Bearer ") {
@ -350,6 +659,21 @@ fn extract_jwt(headers: &HeaderMap) -> Option<String> {
}
}
}
// 3. Check Cookie
if let Some(cookie_header) = headers.get(header::COOKIE) {
if let Ok(cookie_str) = cookie_header.to_str() {
for cookie in cookie_str.split(';') {
let cookie = cookie.trim();
if let Some(value) = cookie.strip_prefix("jwt=") {
if !value.is_empty() {
return Some(value.to_string());
}
}
}
}
}
None
}
@ -365,8 +689,6 @@ mod tests {
fn test_parse_url_path_comma() {
let (vid, nid, cookie) = parse_url_path("/3,01637037d6").unwrap();
assert_eq!(vid, VolumeId(3));
// "01637037d6" → 5 bytes → padded to 12 bytes: [0,0,0,0,0,0,0,0x01,0x63,0x70,0x37,0xd6]
// NeedleId = first 8 bytes, Cookie = last 4 bytes
assert_eq!(nid, NeedleId(0x01));
assert_eq!(cookie, Cookie(0x637037d6));
}
@ -383,6 +705,14 @@ mod tests {
assert!(result.is_some());
}
#[test]
fn test_parse_url_path_slash_with_filename() {
let result = parse_url_path("3/01637037d6/report.txt");
assert!(result.is_some());
let (vid, _, _) = result.unwrap();
assert_eq!(vid, VolumeId(3));
}
#[test]
fn test_parse_url_path_invalid() {
assert!(parse_url_path("/invalid").is_none());
@ -393,12 +723,45 @@ mod tests {
fn test_extract_jwt_bearer() {
let mut headers = HeaderMap::new();
headers.insert(header::AUTHORIZATION, "Bearer abc123".parse().unwrap());
assert_eq!(extract_jwt(&headers), Some("abc123".to_string()));
let uri: axum::http::Uri = "/test".parse().unwrap();
assert_eq!(extract_jwt(&headers, &uri), Some("abc123".to_string()));
}
#[test]
fn test_extract_jwt_query_param() {
let headers = HeaderMap::new();
let uri: axum::http::Uri = "/test?jwt=mytoken".parse().unwrap();
assert_eq!(extract_jwt(&headers, &uri), Some("mytoken".to_string()));
}
#[test]
fn test_extract_jwt_query_over_header() {
let mut headers = HeaderMap::new();
headers.insert(header::AUTHORIZATION, "Bearer header_token".parse().unwrap());
let uri: axum::http::Uri = "/test?jwt=query_token".parse().unwrap();
assert_eq!(extract_jwt(&headers, &uri), Some("query_token".to_string()));
}
#[test]
fn test_extract_jwt_none() {
let headers = HeaderMap::new();
assert_eq!(extract_jwt(&headers), None);
let uri: axum::http::Uri = "/test".parse().unwrap();
assert_eq!(extract_jwt(&headers, &uri), None);
}
#[test]
fn test_handle_range_single() {
let data = b"hello world";
let headers = HeaderMap::new();
let resp = handle_range_request("bytes=0-4", data, headers);
assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT);
}
#[test]
fn test_handle_range_invalid() {
let data = b"hello";
let headers = HeaderMap::new();
let resp = handle_range_request("bytes=999-1000", data, headers);
assert_eq!(resp.status(), StatusCode::RANGE_NOT_SATISFIABLE);
}
}

174
seaweed-volume/src/server/volume_server.rs

@ -10,8 +10,16 @@
//! Matches Go's server/volume_server.go.
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use axum::{Router, routing::get};
use axum::{
Router,
routing::{get, any},
middleware::{self, Next},
extract::{Request, State},
response::{IntoResponse, Response},
http::{StatusCode, HeaderValue, Method},
};
use crate::security::Guard;
use crate::storage::store::Store;
@ -23,6 +31,105 @@ pub struct VolumeServerState {
pub store: RwLock<Store>,
pub guard: Guard,
pub is_stopping: RwLock<bool>,
/// Maintenance mode flag.
pub maintenance: AtomicBool,
/// State version — incremented on each SetState call.
pub state_version: AtomicU32,
}
impl VolumeServerState {
/// Check if the server is in maintenance mode; return gRPC error if so.
pub fn check_maintenance(&self) -> Result<(), tonic::Status> {
if self.maintenance.load(Ordering::Relaxed) {
return Err(tonic::Status::unavailable("maintenance mode"));
}
Ok(())
}
}
/// Middleware: set Server header, echo x-amz-request-id, set CORS if Origin present.
async fn common_headers_middleware(request: Request, next: Next) -> Response {
let origin = request.headers().get("origin").cloned();
let request_id = request.headers().get("x-amz-request-id").cloned();
let mut response = next.run(request).await;
let headers = response.headers_mut();
headers.insert(
"Server",
HeaderValue::from_static("SeaweedFS Volume 0.1.0"),
);
if let Some(rid) = request_id {
headers.insert("x-amz-request-id", rid);
}
if origin.is_some() {
headers.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
headers.insert("Access-Control-Allow-Credentials", HeaderValue::from_static("true"));
}
response
}
/// Admin store handler — dispatches based on HTTP method.
/// Matches Go's privateStoreHandler: GET/HEAD → read, POST/PUT → write,
/// DELETE → delete, OPTIONS → CORS headers, anything else → 400.
async fn admin_store_handler(
state: State<Arc<VolumeServerState>>,
request: Request,
) -> Response {
match request.method().clone() {
Method::GET | Method::HEAD => handlers::get_or_head_handler_from_request(state, request).await,
Method::POST | Method::PUT => handlers::post_handler(state, request).await,
Method::DELETE => handlers::delete_handler(state, request).await,
Method::OPTIONS => admin_options_response(),
_ => (StatusCode::BAD_REQUEST, format!("{{\"error\":\"unsupported method {}\"}}", request.method())).into_response(),
}
}
/// Public store handler — dispatches based on HTTP method.
/// Matches Go's publicReadOnlyHandler: GET/HEAD → read, OPTIONS → CORS,
/// anything else → 200 (passthrough no-op).
async fn public_store_handler(
state: State<Arc<VolumeServerState>>,
request: Request,
) -> Response {
match request.method().clone() {
Method::GET | Method::HEAD => handlers::get_or_head_handler_from_request(state, request).await,
Method::OPTIONS => public_options_response(),
_ => StatusCode::OK.into_response(),
}
}
/// Build OPTIONS response for admin port.
fn admin_options_response() -> Response {
let mut response = StatusCode::OK.into_response();
let headers = response.headers_mut();
headers.insert(
"Access-Control-Allow-Methods",
HeaderValue::from_static("PUT, POST, GET, DELETE, OPTIONS"),
);
headers.insert(
"Access-Control-Allow-Headers",
HeaderValue::from_static("*"),
);
response
}
/// Build OPTIONS response for public port.
fn public_options_response() -> Response {
let mut response = StatusCode::OK.into_response();
let headers = response.headers_mut();
headers.insert(
"Access-Control-Allow-Methods",
HeaderValue::from_static("GET, OPTIONS"),
);
headers.insert(
"Access-Control-Allow-Headers",
HeaderValue::from_static("*"),
);
response
}
/// Build the admin (private) HTTP router — supports all operations.
@ -31,29 +138,20 @@ pub fn build_admin_router(state: Arc<VolumeServerState>) -> Router {
.route("/status", get(handlers::status_handler))
.route("/healthz", get(handlers::healthz_handler))
.route("/metrics", get(handlers::metrics_handler))
// Volume operations: GET/HEAD/POST/PUT/DELETE on /{vid},{fid}
.route(
"/:path",
get(handlers::get_or_head_handler)
.head(handlers::get_or_head_handler)
.post(handlers::post_handler)
.put(handlers::post_handler)
.delete(handlers::delete_handler),
)
// Also support /{vid}/{fid} and /{vid}/{fid}/{filename} paths
.route(
"/:vid/:fid",
get(handlers::get_or_head_handler)
.head(handlers::get_or_head_handler)
.post(handlers::post_handler)
.put(handlers::post_handler)
.delete(handlers::delete_handler),
)
.route(
"/:vid/:fid/:filename",
get(handlers::get_or_head_handler)
.head(handlers::get_or_head_handler),
)
.route("/favicon.ico", get(handlers::favicon_handler))
.route("/seaweedfsstatic/*path", get(handlers::static_asset_handler))
.route("/ui/index.html", get(handlers::ui_handler))
.route("/", any(|state: State<Arc<VolumeServerState>>, request: Request| async move {
match request.method().clone() {
Method::OPTIONS => admin_options_response(),
Method::GET => StatusCode::OK.into_response(),
_ => (StatusCode::BAD_REQUEST, format!("{{\"error\":\"unsupported method {}\"}}", request.method())).into_response(),
}
}))
.route("/:path", any(admin_store_handler))
.route("/:vid/:fid", any(admin_store_handler))
.route("/:vid/:fid/:filename", any(admin_store_handler))
.layer(middleware::from_fn(common_headers_middleware))
.with_state(state)
}
@ -61,20 +159,18 @@ pub fn build_admin_router(state: Arc<VolumeServerState>) -> Router {
pub fn build_public_router(state: Arc<VolumeServerState>) -> Router {
Router::new()
.route("/healthz", get(handlers::healthz_handler))
.route(
"/:path",
get(handlers::get_or_head_handler)
.head(handlers::get_or_head_handler),
)
.route(
"/:vid/:fid",
get(handlers::get_or_head_handler)
.head(handlers::get_or_head_handler),
)
.route(
"/:vid/:fid/:filename",
get(handlers::get_or_head_handler)
.head(handlers::get_or_head_handler),
)
.route("/favicon.ico", get(handlers::favicon_handler))
.route("/seaweedfsstatic/*path", get(handlers::static_asset_handler))
.route("/", any(|_state: State<Arc<VolumeServerState>>, request: Request| async move {
match request.method().clone() {
Method::OPTIONS => public_options_response(),
Method::GET => StatusCode::OK.into_response(),
_ => StatusCode::OK.into_response(),
}
}))
.route("/:path", any(public_store_handler))
.route("/:vid/:fid", any(public_store_handler))
.route("/:vid/:fid/:filename", any(public_store_handler))
.layer(middleware::from_fn(common_headers_middleware))
.with_state(state)
}

10
seaweed-volume/src/storage/volume.rs

@ -462,6 +462,11 @@ impl Volume {
}
fn do_write_request(&mut self, n: &mut Needle, check_cookie: bool) -> Result<(u64, Size, bool), VolumeError> {
// Ensure checksum is computed before dedup check
if n.checksum == crate::storage::needle::crc::CRC(0) && !n.data.is_empty() {
n.checksum = crate::storage::needle::crc::CRC::new(&n.data);
}
// Dedup check
if self.is_file_unchanged(n) {
return Ok((0, Size(n.data_size as i32), true));
@ -549,11 +554,6 @@ impl Volume {
if old.cookie == n.cookie
&& old.checksum == n.checksum
&& old.data == n.data
&& old.flags == n.flags
&& old.name == n.name
&& old.mime == n.mime
&& old.pairs == n.pairs
&& old.last_modified == n.last_modified
{
return true;
}

18
seaweed-volume/tests/http_integration.rs

@ -36,6 +36,8 @@ fn test_state() -> (Arc<VolumeServerState>, TempDir) {
store: RwLock::new(store),
guard,
is_stopping: RwLock::new(false),
maintenance: std::sync::atomic::AtomicBool::new(false),
state_version: std::sync::atomic::AtomicU32::new(0),
});
(state, tmp)
}
@ -119,22 +121,22 @@ async fn status_returns_json_with_version_and_volumes() {
let json: serde_json::Value =
serde_json::from_slice(&body).expect("response is not valid JSON");
assert!(json.get("version").is_some(), "missing 'version' field");
assert!(json.get("Version").is_some(), "missing 'Version' field");
assert!(
json["version"].is_string(),
"'version' should be a string"
json["Version"].is_string(),
"'Version' should be a string"
);
assert!(json.get("volumes").is_some(), "missing 'volumes' field");
assert!(json.get("Volumes").is_some(), "missing 'Volumes' field");
assert!(
json["volumes"].is_array(),
"'volumes' should be an array"
json["Volumes"].is_array(),
"'Volumes' should be an array"
);
// We created one volume in test_state, so the array should have one entry
let volumes = json["volumes"].as_array().unwrap();
let volumes = json["Volumes"].as_array().unwrap();
assert_eq!(volumes.len(), 1, "expected 1 volume");
assert_eq!(volumes[0]["id"], 1);
assert_eq!(volumes[0]["Id"], 1);
}
// ============================================================================

Loading…
Cancel
Save