Browse Source

fix: round 5 Go parity — range clamping, ETag on 201, image decompress, tier maintenance

- Range requests: clamp end to file size instead of returning 416 (matches Go)
- Suffix ranges larger than file size: clamp instead of skipping (matches Go)
- Set ETag response header on 201 Created write responses (matches Go SetEtag)
- Always decompress before image resize/crop even when client accepts gzip
- VolumeTierMoveDatFromRemote: don't check maintenance mode (Go doesn't)
- Fix stale integration test: stats routes are now exposed on admin router
rust-volume-server
Chris Lu 1 day ago
parent
commit
6de2d09869
  1. 1
      seaweed-volume/src/lib.rs
  2. 2
      seaweed-volume/src/main.rs
  3. 4
      seaweed-volume/src/server/grpc_server.rs
  4. 64
      seaweed-volume/src/server/handlers.rs
  5. 4
      seaweed-volume/src/server/volume_server.rs
  6. 70
      seaweed-volume/src/version.rs
  7. 4
      seaweed-volume/tests/http_integration.rs

1
seaweed-volume/src/lib.rs

@ -5,6 +5,7 @@ pub mod remote_storage;
pub mod security;
pub mod server;
pub mod storage;
pub mod version;
/// Generated protobuf modules.
pub mod pb {

2
seaweed-volume/src/main.rs

@ -29,7 +29,7 @@ fn main() {
let config = config::parse_cli();
info!(
"SeaweedFS Volume Server (Rust) v{}",
env!("CARGO_PKG_VERSION")
seaweed_volume::version::full_version()
);
// Register Prometheus metrics

4
seaweed-volume/src/server/grpc_server.rs

@ -2350,7 +2350,7 @@ impl VolumeServer for VolumeGrpcService {
&self,
request: Request<volume_server_pb::VolumeTierMoveDatFromRemoteRequest>,
) -> Result<Response<Self::VolumeTierMoveDatFromRemoteStream>, Status> {
self.state.check_maintenance()?;
// Note: Go does NOT check maintenance mode for TierMoveDatFromRemote
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
@ -2522,7 +2522,7 @@ impl VolumeServer for VolumeGrpcService {
heap: 0,
stack: 0,
}),
version: env!("CARGO_PKG_VERSION").to_string(),
version: crate::version::full_version().to_string(),
data_center: self.state.data_center.clone(),
rack: self.state.rack.clone(),
state: Some(volume_server_pb::VolumeServerState {

64
seaweed-volume/src/server/handlers.rs

@ -1038,15 +1038,14 @@ async fn get_or_head_handler_inner(
// Handle compressed data: if needle is compressed, either pass through or decompress
let is_compressed = n.is_compressed();
let mut data = n.data;
// Check if image operations are needed — must decompress first regardless of Accept-Encoding
let needs_image_ops = is_image
&& (query.width.is_some() || query.height.is_some() || query.mode.is_some());
if is_compressed {
let accept_encoding = headers
.get(header::ACCEPT_ENCODING)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if accept_encoding.contains("gzip") {
response_headers.insert(header::CONTENT_ENCODING, "gzip".parse().unwrap());
} else {
// Decompress for client
if needs_image_ops {
// Always decompress for image operations (Go decompresses before resize/crop)
use flate2::read::GzDecoder;
use std::io::Read as _;
let mut decoder = GzDecoder::new(&data[..]);
@ -1054,6 +1053,23 @@ async fn get_or_head_handler_inner(
if decoder.read_to_end(&mut decompressed).is_ok() {
data = decompressed;
}
} else {
let accept_encoding = headers
.get(header::ACCEPT_ENCODING)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if accept_encoding.contains("gzip") {
response_headers.insert(header::CONTENT_ENCODING, "gzip".parse().unwrap());
} else {
// Decompress for client
use flate2::read::GzDecoder;
use std::io::Read as _;
let mut decoder = GzDecoder::new(&data[..]);
let mut decompressed = Vec::new();
if decoder.read_to_end(&mut decompressed).is_ok() {
data = decompressed;
}
}
}
}
@ -1127,9 +1143,10 @@ fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) ->
if start_str.is_empty() {
// Suffix range: -N means last N bytes
let suffix: usize = end_str.parse().ok()?;
let mut suffix: usize = end_str.parse().ok()?;
// Go clamps suffix to file size
if suffix > total {
return None;
suffix = total;
}
Some((total - suffix, total - 1))
} else {
@ -1151,9 +1168,18 @@ fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) ->
return (StatusCode::OK, headers, data.to_vec()).into_response();
}
// Check all ranges are valid
// Clamp range ends and validate (Go clamps end to size-1 instead of returning 416)
let ranges: Vec<(usize, usize)> = ranges
.into_iter()
.map(|(start, mut end)| {
if end >= total {
end = total - 1;
}
(start, end)
})
.collect();
for &(start, end) in &ranges {
if start >= total || end >= total || start > end {
if start >= total || start > end {
headers.insert(
"Content-Range",
format!("bytes */{}", total).parse().unwrap(),
@ -1774,7 +1800,15 @@ pub async fn post_handler(
metrics::REQUEST_DURATION
.with_label_values(&["write"])
.observe(start.elapsed().as_secs_f64());
let etag = n.etag();
let etag_header = if etag.starts_with('"') {
etag.clone()
} else {
format!("\"{}\"", etag)
};
let mut resp = (StatusCode::CREATED, axum::Json(result)).into_response();
resp.headers_mut()
.insert(header::ETAG, etag_header.parse().unwrap());
resp.headers_mut()
.insert("Content-MD5", content_md5_value.parse().unwrap());
resp
@ -2087,7 +2121,7 @@ pub async fn status_handler(
let mut m = serde_json::Map::new();
m.insert(
"Version".to_string(),
serde_json::Value::from(env!("CARGO_PKG_VERSION")),
serde_json::Value::from(crate::version::full_version()),
);
m.insert("Volumes".to_string(), serde_json::Value::Array(volumes));
m.insert(
@ -2163,7 +2197,7 @@ pub async fn stats_counter_handler() -> Response {
pub async fn stats_memory_handler() -> Response {
// Basic memory stats - Rust doesn't have GC stats like Go
let info = serde_json::json!({
"Version": env!("CARGO_PKG_VERSION"),
"Version": crate::version::full_version(),
"Memory": {
"Mallocs": 0,
"Frees": 0,
@ -2195,7 +2229,7 @@ pub async fn stats_disk_handler(State(state): State<Arc<VolumeServerState>>) ->
}));
}
let info = serde_json::json!({
"Version": env!("CARGO_PKG_VERSION"),
"Version": crate::version::full_version(),
"DiskStatuses": ds,
});
(

4
seaweed-volume/src/server/volume_server.rs

@ -109,7 +109,9 @@ async fn common_headers_middleware(request: Request, next: Next) -> Response {
let mut response = next.run(request).await;
let headers = response.headers_mut();
headers.insert("Server", HeaderValue::from_static("SeaweedFS Volume 0.1.0"));
if let Ok(val) = HeaderValue::from_str(crate::version::server_header()) {
headers.insert("Server", val);
}
if let Some(rid) = request_id {
headers.insert("x-amz-request-id", rid);

70
seaweed-volume/src/version.rs

@ -0,0 +1,70 @@
//! Version helpers aligned with Go's util/version package.
use std::sync::OnceLock;
const SIZE_LIMIT: &str = "30GB"; // Go default build (!5BytesOffset)
pub fn size_limit() -> &'static str {
SIZE_LIMIT
}
pub fn commit() -> &'static str {
option_env!("SEAWEEDFS_COMMIT")
.or(option_env!("GIT_COMMIT"))
.or(option_env!("GIT_SHA"))
.unwrap_or("")
}
pub fn version_number() -> &'static str {
static VERSION_NUMBER: OnceLock<String> = OnceLock::new();
VERSION_NUMBER
.get_or_init(|| parse_go_version_number().unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_string()))
.as_str()
}
pub fn version() -> &'static str {
static VERSION: OnceLock<String> = OnceLock::new();
VERSION
.get_or_init(|| format!("{} {}", size_limit(), version_number()))
.as_str()
}
pub fn full_version() -> &'static str {
static FULL: OnceLock<String> = OnceLock::new();
FULL.get_or_init(|| format!("{} {}", version(), commit())).as_str()
}
pub fn server_header() -> &'static str {
static HEADER: OnceLock<String> = OnceLock::new();
HEADER
.get_or_init(|| format!("SeaweedFS Volume {}", version()))
.as_str()
}
fn parse_go_version_number() -> Option<String> {
let src = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../weed/util/version/constants.go"));
let mut major: Option<u32> = None;
let mut minor: Option<u32> = None;
for line in src.lines() {
let l = line.trim();
if l.starts_with("MAJOR_VERSION") {
major = parse_int32_line(l);
} else if l.starts_with("MINOR_VERSION") {
minor = parse_int32_line(l);
}
if major.is_some() && minor.is_some() {
break;
}
}
match (major, minor) {
(Some(maj), Some(min)) => Some(format!("{}.{}", maj, format!("{:02}", min))),
_ => None,
}
}
fn parse_int32_line(line: &str) -> Option<u32> {
let start = line.find("int32(")? + "int32(".len();
let rest = &line[start..];
let end = rest.find(')')?;
rest[..end].trim().parse::<u32>().ok()
}

4
seaweed-volume/tests/http_integration.rs

@ -465,7 +465,7 @@ async fn public_router_does_not_expose_healthz() {
}
#[tokio::test]
async fn admin_router_does_not_expose_stats_routes() {
async fn admin_router_exposes_stats_routes() {
let (state, _tmp) = test_state();
let app = build_admin_router(state);
@ -479,7 +479,7 @@ async fn admin_router_does_not_expose_stats_routes() {
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]

Loading…
Cancel
Save