From f3d4a39b2741699113972dd6034df3501e5e971f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Mar 2026 09:05:26 -0700 Subject: [PATCH] http: honor multipart upload metadata fields --- seaweed-volume/src/server/handlers.rs | 117 +++++++++++++----- .../http/production_features_test.go | 76 ++++++++++++ 2 files changed, 161 insertions(+), 32 deletions(-) diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index 434ec9ff1..8ee0b5e3e 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -1425,6 +1425,7 @@ pub async fn post_handler( let path = request.uri().path().to_string(); let query = request.uri().query().unwrap_or("").to_string(); let headers = request.headers().clone(); + let query_fields: Vec<(String, String)> = serde_urlencoded::from_str(&query).unwrap_or_default(); let (vid, needle_id, cookie) = match parse_url_path(&path) { Some(parsed) => parsed, @@ -1505,9 +1506,6 @@ pub async fn post_handler( None }; - // Check for chunk manifest flag - let is_chunk_manifest = query.split('&').any(|p| p == "cm=true" || p == "cm=1"); - // Validate multipart/form-data has a boundary if let Some(ct) = headers.get(header::CONTENT_TYPE) { if let Ok(ct_str) = ct.to_str() { @@ -1545,7 +1543,14 @@ pub async fn post_handler( .unwrap_or("") .to_string(); - let (body_data_raw, parsed_filename, parsed_content_type, _parsed_content_encoding) = + let ( + body_data_raw, + parsed_filename, + parsed_content_type, + parsed_content_encoding, + parsed_content_md5, + multipart_form_fields, + ) = if content_type_str.starts_with("multipart/form-data") { // Extract boundary from Content-Type let boundary = content_type_str @@ -1569,6 +1574,8 @@ pub async fn post_handler( let mut file_name: Option = None; let mut file_content_type: Option = None; let mut file_content_encoding: Option = None; + let mut file_content_md5: Option = None; + let mut form_fields = std::collections::HashMap::new(); while let Ok(Some(field)) = multipart.next_field().await { let field_name = field.name().map(|s| s.to_string()); @@ -1578,17 +1585,29 @@ pub async fn post_handler( cleaned.rsplit('/').next().unwrap_or(&cleaned).to_string() }); let fct = field.content_type().map(|m| m.to_string()); + let field_headers = field.headers().clone(); + let fce = field_headers + .get(header::CONTENT_ENCODING) + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + let fmd5 = field_headers + .get("Content-MD5") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); if let Ok(data) = field.bytes().await { - if file_data.is_none() { + if file_data.is_none() && fname.is_some() { // First file field file_data = Some(data.to_vec()); file_name = fname; file_content_type = fct; - // Content-Encoding comes from headers, not multipart field - file_content_encoding = None; + file_content_encoding = fce; + file_content_md5 = fmd5; + } else if let Some(name) = field_name { + form_fields + .entry(name) + .or_insert_with(|| String::from_utf8_lossy(&data).to_string()); } - let _ = field_name; // suppress unused warning } } @@ -1598,15 +1617,38 @@ pub async fn post_handler( file_name.unwrap_or_default(), file_content_type, file_content_encoding, + file_content_md5, + form_fields, ) } else { // No file field found, use raw body - (body.to_vec(), String::new(), None, None) + (body.to_vec(), String::new(), None, None, None, form_fields) } } else { - (body.to_vec(), String::new(), None, None) + ( + body.to_vec(), + String::new(), + None, + None, + None, + std::collections::HashMap::new(), + ) }; + let form_value = |name: &str| { + query_fields + .iter() + .find_map(|(k, v)| if k == name { Some(v.clone()) } else { None }) + .or_else(|| multipart_form_fields.get(name).cloned()) + }; + + // Check for chunk manifest flag. + // Go uses r.FormValue("cm"), which falls back to multipart fields when present. + let is_chunk_manifest = matches!( + form_value("cm").as_deref(), + Some("1" | "t" | "T" | "TRUE" | "True" | "true") + ); + // Check file size limit if state.file_size_limit_bytes > 0 && body_data_raw.len() as i64 > state.file_size_limit_bytes { return json_error_with_query( @@ -1617,6 +1659,7 @@ pub async fn post_handler( } // Validate Content-MD5 if provided + let content_md5 = content_md5.or(parsed_content_md5); if let Some(ref expected_md5) = content_md5 { use base64::Engine; use md5::{Digest, Md5}; @@ -1641,10 +1684,7 @@ pub async fn post_handler( .as_secs(); // Parse custom timestamp from query param - let ts_str = query - .split('&') - .find_map(|p| p.strip_prefix("ts=")) - .unwrap_or(""); + let ts_str = form_value("ts").unwrap_or_default(); let last_modified = if !ts_str.is_empty() { ts_str.parse::().unwrap_or(now) } else { @@ -1652,36 +1692,55 @@ pub async fn post_handler( }; // Check if upload is pre-compressed - let is_gzipped = headers - .get(header::CONTENT_ENCODING) - .and_then(|v| v.to_str().ok()) - .map(|s| s == "gzip") - .unwrap_or(false); + let is_gzipped = if content_type_str.starts_with("multipart/form-data") { + parsed_content_encoding.as_deref() == Some("gzip") + } else { + headers + .get(header::CONTENT_ENCODING) + .and_then(|v| v.to_str().ok()) + .map(|s| s == "gzip") + .unwrap_or(false) + }; + + // Prefer the multipart filename before deriving MIME and other metadata. + let filename = if !parsed_filename.is_empty() { + parsed_filename + } else { + extract_filename_from_path(&path) + }; // Extract MIME type: prefer multipart-parsed content type, else from Content-Type header let mime_type = if let Some(ref pct) = parsed_content_type { pct.clone() } else { + let multipart_fallback = if content_type_str.starts_with("multipart/form-data") + && !filename.is_empty() + && !is_chunk_manifest + { + mime_guess::from_path(&filename) + .first() + .map(|m| m.to_string()) + .unwrap_or_default() + } else { + String::new() + }; headers .get(header::CONTENT_TYPE) .and_then(|v| v.to_str().ok()) .map(|ct| { if ct.starts_with("multipart/") { - "application/octet-stream".to_string() + multipart_fallback.clone() } else { ct.to_string() } }) - .unwrap_or_else(|| "application/octet-stream".to_string()) + .unwrap_or(multipart_fallback) }; // Parse TTL from query param (matches Go's r.FormValue("ttl")) - let ttl_str = query - .split('&') - .find_map(|p| p.strip_prefix("ttl=")) - .unwrap_or(""); + let ttl_str = form_value("ttl").unwrap_or_default(); let ttl = if !ttl_str.is_empty() { - crate::storage::needle::TTL::read(ttl_str).ok() + crate::storage::needle::TTL::read(&ttl_str).ok() } else { None }; @@ -1791,12 +1850,6 @@ pub async fn post_handler( } // Set filename on needle (matches Go: n.Name = []byte(pu.FileName)) - // Prefer multipart-parsed filename, else extract from URL path - let filename = if !parsed_filename.is_empty() { - parsed_filename - } else { - extract_filename_from_path(&path) - }; if !filename.is_empty() && filename.len() < 256 { n.name = filename.as_bytes().to_vec(); n.name_size = filename.len() as u8; diff --git a/test/volume_server/http/production_features_test.go b/test/volume_server/http/production_features_test.go index da6711556..cfb692646 100644 --- a/test/volume_server/http/production_features_test.go +++ b/test/volume_server/http/production_features_test.go @@ -2,8 +2,10 @@ package volume_server_http_test import ( "bytes" + "context" "encoding/json" "fmt" + "mime/multipart" "net/http" "strings" "testing" @@ -11,6 +13,7 @@ import ( "github.com/seaweedfs/seaweedfs/test/volume_server/framework" "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" ) func TestStatsEndpoints(t *testing.T) { @@ -159,6 +162,79 @@ func TestUploadWithCustomTimestamp(t *testing.T) { } } +func TestMultipartUploadUsesFormFieldsForTimestampAndTTL(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress()) + defer conn.Close() + + const volumeID = uint32(94) + const needleID = uint64(940001) + const cookie = uint32(0xAABBCC04) + framework.AllocateVolume(t, grpcClient, volumeID, "") + + fid := framework.NewFileID(volumeID, needleID, cookie) + payload := []byte("multipart-form-fields-data") + + var body bytes.Buffer + writer := multipart.NewWriter(&body) + if err := writer.WriteField("ts", "1700000000"); err != nil { + t.Fatalf("write multipart ts field: %v", err) + } + if err := writer.WriteField("ttl", "7d"); err != nil { + t.Fatalf("write multipart ttl field: %v", err) + } + filePart, err := writer.CreateFormFile("file", "multipart.txt") + if err != nil { + t.Fatalf("create multipart file field: %v", err) + } + if _, err := filePart.Write(payload); err != nil { + t.Fatalf("write multipart file payload: %v", err) + } + if err := writer.Close(); err != nil { + t.Fatalf("close multipart writer: %v", err) + } + + req, err := http.NewRequest(http.MethodPost, cluster.VolumeAdminURL()+"/"+fid, &body) + if err != nil { + t.Fatalf("create multipart upload request: %v", err) + } + req.Header.Set("Content-Type", writer.FormDataContentType()) + + client := framework.NewHTTPClient() + uploadResp := framework.DoRequest(t, client, req) + uploadBody := framework.ReadAllAndClose(t, uploadResp) + if uploadResp.StatusCode != http.StatusCreated { + t.Fatalf("multipart upload expected 201, got %d, body: %s", uploadResp.StatusCode, string(uploadBody)) + } + + readResp := framework.ReadBytes(t, client, cluster.VolumeAdminURL(), fid) + _ = framework.ReadAllAndClose(t, readResp) + if readResp.StatusCode != http.StatusOK { + t.Fatalf("multipart upload read expected 200, got %d", readResp.StatusCode) + } + expectedLastModified := time.Unix(1700000000, 0).UTC().Format(http.TimeFormat) + if got := readResp.Header.Get("Last-Modified"); got != expectedLastModified { + t.Fatalf("multipart upload Last-Modified mismatch: got %q want %q", got, expectedLastModified) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + statusResp, err := grpcClient.VolumeNeedleStatus(ctx, &volume_server_pb.VolumeNeedleStatusRequest{ + VolumeId: volumeID, + NeedleId: needleID, + }) + if err != nil { + t.Fatalf("VolumeNeedleStatus after multipart upload failed: %v", err) + } + if got := statusResp.GetTtl(); got != "7d" { + t.Fatalf("multipart upload TTL mismatch: got %q want %q", got, "7d") + } +} + func TestRequestIdGeneration(t *testing.T) { if testing.Short() { t.Skip("skipping integration test in short mode")