Browse Source

http: honor multipart upload metadata fields

rust-volume-server
Chris Lu 4 days ago
parent
commit
f3d4a39b27
  1. 117
      seaweed-volume/src/server/handlers.rs
  2. 76
      test/volume_server/http/production_features_test.go

117
seaweed-volume/src/server/handlers.rs

@ -1425,6 +1425,7 @@ pub async fn post_handler(
let path = request.uri().path().to_string();
let query = request.uri().query().unwrap_or("").to_string();
let headers = request.headers().clone();
let query_fields: Vec<(String, String)> = serde_urlencoded::from_str(&query).unwrap_or_default();
let (vid, needle_id, cookie) = match parse_url_path(&path) {
Some(parsed) => parsed,
@ -1505,9 +1506,6 @@ pub async fn post_handler(
None
};
// Check for chunk manifest flag
let is_chunk_manifest = query.split('&').any(|p| p == "cm=true" || p == "cm=1");
// Validate multipart/form-data has a boundary
if let Some(ct) = headers.get(header::CONTENT_TYPE) {
if let Ok(ct_str) = ct.to_str() {
@ -1545,7 +1543,14 @@ pub async fn post_handler(
.unwrap_or("")
.to_string();
let (body_data_raw, parsed_filename, parsed_content_type, _parsed_content_encoding) =
let (
body_data_raw,
parsed_filename,
parsed_content_type,
parsed_content_encoding,
parsed_content_md5,
multipart_form_fields,
) =
if content_type_str.starts_with("multipart/form-data") {
// Extract boundary from Content-Type
let boundary = content_type_str
@ -1569,6 +1574,8 @@ pub async fn post_handler(
let mut file_name: Option<String> = None;
let mut file_content_type: Option<String> = None;
let mut file_content_encoding: Option<String> = None;
let mut file_content_md5: Option<String> = None;
let mut form_fields = std::collections::HashMap::new();
while let Ok(Some(field)) = multipart.next_field().await {
let field_name = field.name().map(|s| s.to_string());
@ -1578,17 +1585,29 @@ pub async fn post_handler(
cleaned.rsplit('/').next().unwrap_or(&cleaned).to_string()
});
let fct = field.content_type().map(|m| m.to_string());
let field_headers = field.headers().clone();
let fce = field_headers
.get(header::CONTENT_ENCODING)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let fmd5 = field_headers
.get("Content-MD5")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
if let Ok(data) = field.bytes().await {
if file_data.is_none() {
if file_data.is_none() && fname.is_some() {
// First file field
file_data = Some(data.to_vec());
file_name = fname;
file_content_type = fct;
// Content-Encoding comes from headers, not multipart field
file_content_encoding = None;
file_content_encoding = fce;
file_content_md5 = fmd5;
} else if let Some(name) = field_name {
form_fields
.entry(name)
.or_insert_with(|| String::from_utf8_lossy(&data).to_string());
}
let _ = field_name; // suppress unused warning
}
}
@ -1598,15 +1617,38 @@ pub async fn post_handler(
file_name.unwrap_or_default(),
file_content_type,
file_content_encoding,
file_content_md5,
form_fields,
)
} else {
// No file field found, use raw body
(body.to_vec(), String::new(), None, None)
(body.to_vec(), String::new(), None, None, None, form_fields)
}
} else {
(body.to_vec(), String::new(), None, None)
(
body.to_vec(),
String::new(),
None,
None,
None,
std::collections::HashMap::new(),
)
};
let form_value = |name: &str| {
query_fields
.iter()
.find_map(|(k, v)| if k == name { Some(v.clone()) } else { None })
.or_else(|| multipart_form_fields.get(name).cloned())
};
// Check for chunk manifest flag.
// Go uses r.FormValue("cm"), which falls back to multipart fields when present.
let is_chunk_manifest = matches!(
form_value("cm").as_deref(),
Some("1" | "t" | "T" | "TRUE" | "True" | "true")
);
// Check file size limit
if state.file_size_limit_bytes > 0 && body_data_raw.len() as i64 > state.file_size_limit_bytes {
return json_error_with_query(
@ -1617,6 +1659,7 @@ pub async fn post_handler(
}
// Validate Content-MD5 if provided
let content_md5 = content_md5.or(parsed_content_md5);
if let Some(ref expected_md5) = content_md5 {
use base64::Engine;
use md5::{Digest, Md5};
@ -1641,10 +1684,7 @@ pub async fn post_handler(
.as_secs();
// Parse custom timestamp from query param
let ts_str = query
.split('&')
.find_map(|p| p.strip_prefix("ts="))
.unwrap_or("");
let ts_str = form_value("ts").unwrap_or_default();
let last_modified = if !ts_str.is_empty() {
ts_str.parse::<u64>().unwrap_or(now)
} else {
@ -1652,36 +1692,55 @@ pub async fn post_handler(
};
// Check if upload is pre-compressed
let is_gzipped = headers
.get(header::CONTENT_ENCODING)
.and_then(|v| v.to_str().ok())
.map(|s| s == "gzip")
.unwrap_or(false);
let is_gzipped = if content_type_str.starts_with("multipart/form-data") {
parsed_content_encoding.as_deref() == Some("gzip")
} else {
headers
.get(header::CONTENT_ENCODING)
.and_then(|v| v.to_str().ok())
.map(|s| s == "gzip")
.unwrap_or(false)
};
// Prefer the multipart filename before deriving MIME and other metadata.
let filename = if !parsed_filename.is_empty() {
parsed_filename
} else {
extract_filename_from_path(&path)
};
// Extract MIME type: prefer multipart-parsed content type, else from Content-Type header
let mime_type = if let Some(ref pct) = parsed_content_type {
pct.clone()
} else {
let multipart_fallback = if content_type_str.starts_with("multipart/form-data")
&& !filename.is_empty()
&& !is_chunk_manifest
{
mime_guess::from_path(&filename)
.first()
.map(|m| m.to_string())
.unwrap_or_default()
} else {
String::new()
};
headers
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map(|ct| {
if ct.starts_with("multipart/") {
"application/octet-stream".to_string()
multipart_fallback.clone()
} else {
ct.to_string()
}
})
.unwrap_or_else(|| "application/octet-stream".to_string())
.unwrap_or(multipart_fallback)
};
// Parse TTL from query param (matches Go's r.FormValue("ttl"))
let ttl_str = query
.split('&')
.find_map(|p| p.strip_prefix("ttl="))
.unwrap_or("");
let ttl_str = form_value("ttl").unwrap_or_default();
let ttl = if !ttl_str.is_empty() {
crate::storage::needle::TTL::read(ttl_str).ok()
crate::storage::needle::TTL::read(&ttl_str).ok()
} else {
None
};
@ -1791,12 +1850,6 @@ pub async fn post_handler(
}
// Set filename on needle (matches Go: n.Name = []byte(pu.FileName))
// Prefer multipart-parsed filename, else extract from URL path
let filename = if !parsed_filename.is_empty() {
parsed_filename
} else {
extract_filename_from_path(&path)
};
if !filename.is_empty() && filename.len() < 256 {
n.name = filename.as_bytes().to_vec();
n.name_size = filename.len() as u8;

76
test/volume_server/http/production_features_test.go

@ -2,8 +2,10 @@ package volume_server_http_test
import (
"bytes"
"context"
"encoding/json"
"fmt"
"mime/multipart"
"net/http"
"strings"
"testing"
@ -11,6 +13,7 @@ import (
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
)
func TestStatsEndpoints(t *testing.T) {
@ -159,6 +162,79 @@ func TestUploadWithCustomTimestamp(t *testing.T) {
}
}
func TestMultipartUploadUsesFormFieldsForTimestampAndTTL(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
cluster := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress())
defer conn.Close()
const volumeID = uint32(94)
const needleID = uint64(940001)
const cookie = uint32(0xAABBCC04)
framework.AllocateVolume(t, grpcClient, volumeID, "")
fid := framework.NewFileID(volumeID, needleID, cookie)
payload := []byte("multipart-form-fields-data")
var body bytes.Buffer
writer := multipart.NewWriter(&body)
if err := writer.WriteField("ts", "1700000000"); err != nil {
t.Fatalf("write multipart ts field: %v", err)
}
if err := writer.WriteField("ttl", "7d"); err != nil {
t.Fatalf("write multipart ttl field: %v", err)
}
filePart, err := writer.CreateFormFile("file", "multipart.txt")
if err != nil {
t.Fatalf("create multipart file field: %v", err)
}
if _, err := filePart.Write(payload); err != nil {
t.Fatalf("write multipart file payload: %v", err)
}
if err := writer.Close(); err != nil {
t.Fatalf("close multipart writer: %v", err)
}
req, err := http.NewRequest(http.MethodPost, cluster.VolumeAdminURL()+"/"+fid, &body)
if err != nil {
t.Fatalf("create multipart upload request: %v", err)
}
req.Header.Set("Content-Type", writer.FormDataContentType())
client := framework.NewHTTPClient()
uploadResp := framework.DoRequest(t, client, req)
uploadBody := framework.ReadAllAndClose(t, uploadResp)
if uploadResp.StatusCode != http.StatusCreated {
t.Fatalf("multipart upload expected 201, got %d, body: %s", uploadResp.StatusCode, string(uploadBody))
}
readResp := framework.ReadBytes(t, client, cluster.VolumeAdminURL(), fid)
_ = framework.ReadAllAndClose(t, readResp)
if readResp.StatusCode != http.StatusOK {
t.Fatalf("multipart upload read expected 200, got %d", readResp.StatusCode)
}
expectedLastModified := time.Unix(1700000000, 0).UTC().Format(http.TimeFormat)
if got := readResp.Header.Get("Last-Modified"); got != expectedLastModified {
t.Fatalf("multipart upload Last-Modified mismatch: got %q want %q", got, expectedLastModified)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
statusResp, err := grpcClient.VolumeNeedleStatus(ctx, &volume_server_pb.VolumeNeedleStatusRequest{
VolumeId: volumeID,
NeedleId: needleID,
})
if err != nil {
t.Fatalf("VolumeNeedleStatus after multipart upload failed: %v", err)
}
if got := statusResp.GetTtl(); got != "7d" {
t.Fatalf("multipart upload TTL mismatch: got %q want %q", got, "7d")
}
}
func TestRequestIdGeneration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")

Loading…
Cancel
Save