From 459abdf0c64a20e5350c123138ce7c11cbce05d2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Mar 2026 21:10:59 -0800 Subject: [PATCH] volume: fix BatchDelete, readDeleted, multipart/MD5, needle blob RPCs - BatchDelete: use proper HTTP status codes (400/404/202/304/406/500), cookie validation with break on mismatch, chunk manifest rejection - Remove cookie validation from do_delete_request (match Go behavior), add it to HTTP delete handler and BatchDelete handler instead - Fix needle_map delete to keep original offset (not tombstone offset) so readDeleted can find original data - Add readDeleted query param support for reading deleted needles - Add multipart boundary validation and Content-MD5 check on uploads - Add ?cm=true upload param for chunk manifest flag - Fix ReadNeedleMeta to read at offset/size (not by needle ID) - Fix ReadNeedleBlob error message format - Fix ping target type: "volumeServer" not "volume" - Populate idx_file_size in ReadVolumeFileStatus - Add md-5 and base64 crate dependencies Test results: gRPC 35/75, HTTP 39/55 (was 27/75, 36/55) --- seaweed-volume/Cargo.lock | 12 ++ seaweed-volume/Cargo.toml | 4 + seaweed-volume/src/server/grpc_server.rs | 154 ++++++++++++++++------- seaweed-volume/src/server/handlers.rs | 56 ++++++++- seaweed-volume/src/storage/needle_map.rs | 3 +- seaweed-volume/src/storage/store.rs | 8 +- seaweed-volume/src/storage/volume.rs | 32 ++--- 7 files changed, 207 insertions(+), 62 deletions(-) diff --git a/seaweed-volume/Cargo.lock b/seaweed-volume/Cargo.lock index d47debca0..7dcb9c0df 100644 --- a/seaweed-volume/Cargo.lock +++ b/seaweed-volume/Cargo.lock @@ -1468,6 +1468,16 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.8.0" @@ -2427,6 +2437,7 @@ dependencies = [ "anyhow", "async-trait", "axum", + "base64", "bytes", "chrono", "clap", @@ -2439,6 +2450,7 @@ dependencies = [ "hyper-util", "jsonwebtoken", "lazy_static", + "md-5", "memmap2", "parking_lot 0.12.5", "prometheus", diff --git a/seaweed-volume/Cargo.toml b/seaweed-volume/Cargo.toml index a2594620a..ea631f6d4 100644 --- a/seaweed-volume/Cargo.toml +++ b/seaweed-volume/Cargo.toml @@ -66,6 +66,10 @@ uuid = { version = "1", features = ["v4"] } # HTTP client (for proxying, remote fetch) reqwest = { version = "0.12", features = ["rustls-tls", "stream"] } +# Content hashing +md-5 = "0.10" +base64 = "0.22" + # Misc bytes = "1" rand = "0.8" diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 6a16659c3..dff7219fb 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -36,41 +36,107 @@ impl VolumeServer for VolumeGrpcService { let req = request.into_inner(); let mut results = Vec::new(); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + for fid_str in &req.file_ids { - let result = match needle::FileId::parse(fid_str) { - Ok(file_id) => { - let mut n = Needle { - id: file_id.key, - cookie: file_id.cookie, - ..Needle::default() - }; - let mut store = self.state.store.write().unwrap(); - match store.delete_volume_needle(file_id.volume_id, &mut n) { - Ok(size) => volume_server_pb::DeleteResult { - file_id: fid_str.clone(), - status: 0, - error: String::new(), - size: size.0 as u32, - version: 0, - }, - Err(e) => volume_server_pb::DeleteResult { + let file_id = match needle::FileId::parse(fid_str) { + Ok(fid) => fid, + Err(e) => { + results.push(volume_server_pb::DeleteResult { + file_id: fid_str.clone(), + status: 400, // Bad Request + error: e, + size: 0, + version: 0, + }); + continue; + } + }; + + let mut n = Needle { + id: file_id.key, + cookie: file_id.cookie, + ..Needle::default() + }; + + // Cookie validation (unless skip_cookie_check) + if !req.skip_cookie_check { + let original_cookie = n.cookie; + let store = self.state.store.read().unwrap(); + match store.read_volume_needle(file_id.volume_id, &mut n) { + Ok(_) => {} + Err(e) => { + results.push(volume_server_pb::DeleteResult { file_id: fid_str.clone(), - status: 1, + status: 404, // Not Found error: e.to_string(), size: 0, version: 0, - }, + }); + continue; } } - Err(e) => volume_server_pb::DeleteResult { + if n.cookie != original_cookie { + results.push(volume_server_pb::DeleteResult { + file_id: fid_str.clone(), + status: 400, // Bad Request + error: "File Random Cookie does not match.".to_string(), + size: 0, + version: 0, + }); + break; // Stop processing on cookie mismatch + } + } + + // Reject chunk manifest needles + if n.is_chunk_manifest() { + results.push(volume_server_pb::DeleteResult { file_id: fid_str.clone(), - status: 1, - error: e, + status: 406, // Not Acceptable + error: "ChunkManifest: not allowed in batch delete mode.".to_string(), size: 0, version: 0, - }, - }; - results.push(result); + }); + continue; + } + + n.last_modified = now; + n.set_has_last_modified_date(); + + let mut store = self.state.store.write().unwrap(); + match store.delete_volume_needle(file_id.volume_id, &mut n) { + Ok(size) => { + if size.0 == 0 { + results.push(volume_server_pb::DeleteResult { + file_id: fid_str.clone(), + status: 304, // Not Modified + error: String::new(), + size: 0, + version: 0, + }); + } else { + results.push(volume_server_pb::DeleteResult { + file_id: fid_str.clone(), + status: 202, // Accepted + error: String::new(), + size: size.0 as u32, + version: 0, + }); + } + } + Err(e) => { + results.push(volume_server_pb::DeleteResult { + file_id: fid_str.clone(), + status: 500, // Internal Server Error + error: e.to_string(), + size: 0, + version: 0, + }); + } + } } Ok(Response::new(volume_server_pb::BatchDeleteResponse { results })) @@ -370,7 +436,7 @@ impl VolumeServer for VolumeGrpcService { Ok(Response::new(volume_server_pb::ReadVolumeFileStatusResponse { volume_id: vid.0, idx_file_timestamp_seconds: 0, - idx_file_size: 0, + idx_file_size: vol.idx_file_size(), dat_file_timestamp_seconds: 0, dat_file_size: vol.dat_file_size().unwrap_or(0), file_count: vol.file_count() as u64, @@ -411,7 +477,7 @@ impl VolumeServer for VolumeGrpcService { .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; let blob = vol.read_needle_blob(offset, size) - .map_err(|e| Status::internal(e.to_string()))?; + .map_err(|e| Status::internal(format!("read needle blob: {}", e)))?; Ok(Response::new(volume_server_pb::ReadNeedleBlobResponse { needle_blob: blob, @@ -427,20 +493,24 @@ impl VolumeServer for VolumeGrpcService { let needle_id = NeedleId(req.needle_id); let store = self.state.store.read().unwrap(); - let mut n = Needle { id: needle_id, ..Needle::default() }; - match store.read_volume_needle(vid, &mut n) { - Ok(_) => { - let ttl_str = n.ttl.as_ref().map_or(String::new(), |t| t.to_string()); - Ok(Response::new(volume_server_pb::ReadNeedleMetaResponse { - cookie: n.cookie.0, - last_modified: n.last_modified, - crc: n.checksum.0, - ttl: ttl_str, - append_at_ns: n.append_at_ns, - })) - } - Err(e) => Err(Status::not_found(e.to_string())), - } + let (_, vol) = store.find_volume(vid) + .ok_or_else(|| Status::not_found(format!("not found volume id {} and read needle metadata at ec shards is not supported", vid)))?; + + let offset = req.offset; + let size = crate::storage::types::Size(req.size); + + let mut n = Needle { id: needle_id, flags: 0x08, ..Needle::default() }; + vol.read_needle_data_at(&mut n, offset, size) + .map_err(|e| Status::internal(format!("read needle meta: {}", e)))?; + + let ttl_str = n.ttl.as_ref().map_or(String::new(), |t| t.to_string()); + Ok(Response::new(volume_server_pb::ReadNeedleMetaResponse { + cookie: n.cookie.0, + last_modified: n.last_modified, + crc: n.checksum.0, + ttl: ttl_str, + append_at_ns: n.append_at_ns, + })) } async fn write_needle_blob( @@ -724,7 +794,7 @@ impl VolumeServer for VolumeGrpcService { let start = now_ns(); // Route ping based on target type - let remote_time_ns = if req.target.is_empty() || req.target_type == "volume" { + let remote_time_ns = if req.target.is_empty() || req.target_type == "volumeServer" { // Volume self-ping: return our own time now_ns() } else if req.target_type == "master" { diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index 207e27048..4e5821f57 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -66,6 +66,8 @@ pub struct ReadQueryParams { #[serde(rename = "response-cache-control")] pub response_cache_control: Option, pub dl: Option, + #[serde(rename = "readDeleted")] + pub read_deleted: Option, } // ============================================================================ @@ -127,8 +129,10 @@ async fn get_or_head_handler_inner( ..Needle::default() }; + let read_deleted = query.read_deleted.as_deref() == Some("true"); + let store = state.store.read().unwrap(); - match store.read_volume_needle(vid, &mut n) { + match store.read_volume_needle_opt(vid, &mut n, read_deleted) { Ok(count) => { if count <= 0 { return StatusCode::NOT_FOUND.into_response(); @@ -379,6 +383,7 @@ pub async fn post_handler( metrics::REQUEST_COUNTER.with_label_values(&["write"]).inc(); let path = request.uri().path().to_string(); + let query = request.uri().query().unwrap_or("").to_string(); let headers = request.headers().clone(); let (vid, needle_id, cookie) = match parse_url_path(&path) { @@ -392,12 +397,39 @@ pub async fn post_handler( return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response(); } + // Check for chunk manifest flag + let is_chunk_manifest = query.split('&') + .any(|p| p == "cm=true" || p == "cm=1"); + + // Validate multipart/form-data has a boundary + if let Some(ct) = headers.get(header::CONTENT_TYPE) { + if let Ok(ct_str) = ct.to_str() { + if ct_str.starts_with("multipart/form-data") && !ct_str.contains("boundary=") { + return (StatusCode::BAD_REQUEST, "no multipart boundary param in Content-Type").into_response(); + } + } + } + + let content_md5 = headers.get("Content-MD5").and_then(|v| v.to_str().ok()).map(|s| s.to_string()); + // Read body let body = match axum::body::to_bytes(request.into_body(), usize::MAX).await { Ok(b) => b, Err(e) => return (StatusCode::BAD_REQUEST, format!("read body: {}", e)).into_response(), }; + // Validate Content-MD5 if provided + if let Some(ref expected_md5) = content_md5 { + use md5::{Md5, Digest}; + use base64::Engine; + let mut hasher = Md5::new(); + hasher.update(&body); + let actual = base64::engine::general_purpose::STANDARD.encode(hasher.finalize()); + if actual != *expected_md5 { + return (StatusCode::BAD_REQUEST, format!("Content-MD5 mismatch: expected {} got {}", expected_md5, actual)).into_response(); + } + } + let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() @@ -412,6 +444,9 @@ pub async fn post_handler( ..Needle::default() }; n.set_has_last_modified_date(); + if is_chunk_manifest { + n.set_is_chunk_manifest(); + } let mut store = state.store.write().unwrap(); match store.write_volume_needle(vid, &mut n) { @@ -479,6 +514,22 @@ pub async fn delete_handler( ..Needle::default() }; + // Read needle first to validate cookie (matching Go behavior) + let original_cookie = cookie; + { + let store = state.store.read().unwrap(); + match store.read_volume_needle(vid, &mut n) { + Ok(_) => {} + Err(_) => { + let result = DeleteResult { size: 0 }; + return (StatusCode::NOT_FOUND, axum::Json(result)).into_response(); + } + } + } + if n.cookie != original_cookie { + return (StatusCode::BAD_REQUEST, "File Random Cookie does not match.").into_response(); + } + let mut store = state.store.write().unwrap(); match store.delete_volume_needle(vid, &mut n) { Ok(size) => { @@ -496,9 +547,6 @@ pub async fn delete_handler( let result = DeleteResult { size: 0 }; (StatusCode::NOT_FOUND, axum::Json(result)).into_response() } - Err(crate::storage::volume::VolumeError::CookieMismatch(_)) => { - (StatusCode::BAD_REQUEST, "cookie mismatch").into_response() - } Err(e) => { (StatusCode::INTERNAL_SERVER_ERROR, format!("delete error: {}", e)).into_response() } diff --git a/seaweed-volume/src/storage/needle_map.rs b/seaweed-volume/src/storage/needle_map.rs index ff0acb450..a63211dac 100644 --- a/seaweed-volume/src/storage/needle_map.rs +++ b/seaweed-volume/src/storage/needle_map.rs @@ -174,7 +174,8 @@ impl CompactNeedleMap { self.metric.on_delete(&old); let deleted_size = Size(-(old.size.0)); - self.map.insert(key, NeedleValue { offset, size: deleted_size }); + // Keep original offset so readDeleted can find original data (matching Go behavior) + self.map.insert(key, NeedleValue { offset: old.offset, size: deleted_size }); return Ok(Some(old.size)); } } diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index 57f8bfe3c..54c6b1872 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -191,7 +191,7 @@ impl Store { } Err(VolumeError::Io(io::Error::new( io::ErrorKind::NotFound, - format!("volume {} not found on any disk", vid), + format!("volume {} not found on disk", vid), ))) } @@ -203,6 +203,12 @@ impl Store { vol.read_needle(n) } + /// Read a needle from a volume, optionally reading deleted needles. + pub fn read_volume_needle_opt(&self, vid: VolumeId, n: &mut Needle, read_deleted: bool) -> Result { + let (_, vol) = self.find_volume(vid).ok_or(VolumeError::NotFound)?; + vol.read_needle_opt(n, read_deleted) + } + /// Write a needle to a volume. pub fn write_volume_needle( &mut self, vid: VolumeId, n: &mut Needle, diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 562133c5f..5e74c4a5c 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -361,6 +361,10 @@ impl Volume { /// Read a needle by its ID from the volume. pub fn read_needle(&self, n: &mut Needle) -> Result { + self.read_needle_opt(n, false) + } + + pub fn read_needle_opt(&self, n: &mut Needle, read_deleted: bool) -> Result { let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?; @@ -368,15 +372,20 @@ impl Volume { return Err(VolumeError::NotFound); } - let read_size = nv.size; + let mut read_size = nv.size; if read_size.is_deleted() { - return Err(VolumeError::Deleted); + if read_deleted && !read_size.is_tombstone() { + // Negate to get original size + read_size = Size(-read_size.0); + } else { + return Err(VolumeError::Deleted); + } } if read_size.0 == 0 { return Ok(0); } - self.read_needle_data(n, nv.offset.to_actual_offset(), read_size)?; + self.read_needle_data_at(n, nv.offset.to_actual_offset(), read_size)?; // TTL expiry check if n.has_ttl() { @@ -399,7 +408,7 @@ impl Volume { } /// Read needle data from .dat file at given offset. - fn read_needle_data(&self, n: &mut Needle, offset: i64, size: Size) -> Result<(), VolumeError> { + pub fn read_needle_data_at(&self, n: &mut Needle, offset: i64, size: Size) -> Result<(), VolumeError> { let dat_file = self.dat_file.as_ref().ok_or_else(|| { VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open")) })?; @@ -550,7 +559,7 @@ impl Volume { if let Some(nv) = nm.get(n.id) { if !nv.offset.is_zero() && nv.size.is_valid() { let mut old = Needle::default(); - if self.read_needle_data(&mut old, nv.offset.to_actual_offset(), nv.size).is_ok() { + if self.read_needle_data_at(&mut old, nv.offset.to_actual_offset(), nv.size).is_ok() { if old.cookie == n.cookie && old.checksum == n.checksum && old.data == n.data @@ -609,15 +618,6 @@ impl Volume { return Ok(Size(0)); } - // Cookie validation: read stored needle header and verify cookie matches - { - let mut existing = Needle::default(); - self.read_needle_header(&mut existing, stored_offset.to_actual_offset())?; - if existing.cookie != n.cookie { - return Err(VolumeError::CookieMismatch(n.cookie.0)); - } - } - // Write tombstone: append needle with empty data n.data = vec![]; n.append_at_ns = get_append_at_ns(self.last_append_at_ns); @@ -727,6 +727,10 @@ impl Volume { } } + pub fn idx_file_size(&self) -> u64 { + self.nm.as_ref().map_or(0, |nm| nm.index_file_size()) + } + // ---- Sync / Close ---- pub fn sync_to_disk(&mut self) -> io::Result<()> {