Browse Source

volume: fix BatchDelete, readDeleted, multipart/MD5, needle blob RPCs

- BatchDelete: use proper HTTP status codes (400/404/202/304/406/500),
  cookie validation with break on mismatch, chunk manifest rejection
- Remove cookie validation from do_delete_request (match Go behavior),
  add it to HTTP delete handler and BatchDelete handler instead
- Fix needle_map delete to keep original offset (not tombstone offset)
  so readDeleted can find original data
- Add readDeleted query param support for reading deleted needles
- Add multipart boundary validation and Content-MD5 check on uploads
- Add ?cm=true upload param for chunk manifest flag
- Fix ReadNeedleMeta to read at offset/size (not by needle ID)
- Fix ReadNeedleBlob error message format
- Fix ping target type: "volumeServer" not "volume"
- Populate idx_file_size in ReadVolumeFileStatus
- Add md-5 and base64 crate dependencies

Test results: gRPC 35/75, HTTP 39/55 (was 27/75, 36/55)
rust-volume-server
Chris Lu 3 days ago
parent
commit
459abdf0c6
  1. 12
      seaweed-volume/Cargo.lock
  2. 4
      seaweed-volume/Cargo.toml
  3. 154
      seaweed-volume/src/server/grpc_server.rs
  4. 56
      seaweed-volume/src/server/handlers.rs
  5. 3
      seaweed-volume/src/storage/needle_map.rs
  6. 8
      seaweed-volume/src/storage/store.rs
  7. 32
      seaweed-volume/src/storage/volume.rs

12
seaweed-volume/Cargo.lock

@ -1468,6 +1468,16 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "md-5"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
dependencies = [
"cfg-if",
"digest",
]
[[package]]
name = "memchr"
version = "2.8.0"
@ -2427,6 +2437,7 @@ dependencies = [
"anyhow",
"async-trait",
"axum",
"base64",
"bytes",
"chrono",
"clap",
@ -2439,6 +2450,7 @@ dependencies = [
"hyper-util",
"jsonwebtoken",
"lazy_static",
"md-5",
"memmap2",
"parking_lot 0.12.5",
"prometheus",

4
seaweed-volume/Cargo.toml

@ -66,6 +66,10 @@ uuid = { version = "1", features = ["v4"] }
# HTTP client (for proxying, remote fetch)
reqwest = { version = "0.12", features = ["rustls-tls", "stream"] }
# Content hashing
md-5 = "0.10"
base64 = "0.22"
# Misc
bytes = "1"
rand = "0.8"

154
seaweed-volume/src/server/grpc_server.rs

@ -36,41 +36,107 @@ impl VolumeServer for VolumeGrpcService {
let req = request.into_inner();
let mut results = Vec::new();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
for fid_str in &req.file_ids {
let result = match needle::FileId::parse(fid_str) {
Ok(file_id) => {
let mut n = Needle {
id: file_id.key,
cookie: file_id.cookie,
..Needle::default()
};
let mut store = self.state.store.write().unwrap();
match store.delete_volume_needle(file_id.volume_id, &mut n) {
Ok(size) => volume_server_pb::DeleteResult {
file_id: fid_str.clone(),
status: 0,
error: String::new(),
size: size.0 as u32,
version: 0,
},
Err(e) => volume_server_pb::DeleteResult {
let file_id = match needle::FileId::parse(fid_str) {
Ok(fid) => fid,
Err(e) => {
results.push(volume_server_pb::DeleteResult {
file_id: fid_str.clone(),
status: 400, // Bad Request
error: e,
size: 0,
version: 0,
});
continue;
}
};
let mut n = Needle {
id: file_id.key,
cookie: file_id.cookie,
..Needle::default()
};
// Cookie validation (unless skip_cookie_check)
if !req.skip_cookie_check {
let original_cookie = n.cookie;
let store = self.state.store.read().unwrap();
match store.read_volume_needle(file_id.volume_id, &mut n) {
Ok(_) => {}
Err(e) => {
results.push(volume_server_pb::DeleteResult {
file_id: fid_str.clone(),
status: 1,
status: 404, // Not Found
error: e.to_string(),
size: 0,
version: 0,
},
});
continue;
}
}
Err(e) => volume_server_pb::DeleteResult {
if n.cookie != original_cookie {
results.push(volume_server_pb::DeleteResult {
file_id: fid_str.clone(),
status: 400, // Bad Request
error: "File Random Cookie does not match.".to_string(),
size: 0,
version: 0,
});
break; // Stop processing on cookie mismatch
}
}
// Reject chunk manifest needles
if n.is_chunk_manifest() {
results.push(volume_server_pb::DeleteResult {
file_id: fid_str.clone(),
status: 1,
error: e,
status: 406, // Not Acceptable
error: "ChunkManifest: not allowed in batch delete mode.".to_string(),
size: 0,
version: 0,
},
};
results.push(result);
});
continue;
}
n.last_modified = now;
n.set_has_last_modified_date();
let mut store = self.state.store.write().unwrap();
match store.delete_volume_needle(file_id.volume_id, &mut n) {
Ok(size) => {
if size.0 == 0 {
results.push(volume_server_pb::DeleteResult {
file_id: fid_str.clone(),
status: 304, // Not Modified
error: String::new(),
size: 0,
version: 0,
});
} else {
results.push(volume_server_pb::DeleteResult {
file_id: fid_str.clone(),
status: 202, // Accepted
error: String::new(),
size: size.0 as u32,
version: 0,
});
}
}
Err(e) => {
results.push(volume_server_pb::DeleteResult {
file_id: fid_str.clone(),
status: 500, // Internal Server Error
error: e.to_string(),
size: 0,
version: 0,
});
}
}
}
Ok(Response::new(volume_server_pb::BatchDeleteResponse { results }))
@ -370,7 +436,7 @@ impl VolumeServer for VolumeGrpcService {
Ok(Response::new(volume_server_pb::ReadVolumeFileStatusResponse {
volume_id: vid.0,
idx_file_timestamp_seconds: 0,
idx_file_size: 0,
idx_file_size: vol.idx_file_size(),
dat_file_timestamp_seconds: 0,
dat_file_size: vol.dat_file_size().unwrap_or(0),
file_count: vol.file_count() as u64,
@ -411,7 +477,7 @@ impl VolumeServer for VolumeGrpcService {
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
let blob = vol.read_needle_blob(offset, size)
.map_err(|e| Status::internal(e.to_string()))?;
.map_err(|e| Status::internal(format!("read needle blob: {}", e)))?;
Ok(Response::new(volume_server_pb::ReadNeedleBlobResponse {
needle_blob: blob,
@ -427,20 +493,24 @@ impl VolumeServer for VolumeGrpcService {
let needle_id = NeedleId(req.needle_id);
let store = self.state.store.read().unwrap();
let mut n = Needle { id: needle_id, ..Needle::default() };
match store.read_volume_needle(vid, &mut n) {
Ok(_) => {
let ttl_str = n.ttl.as_ref().map_or(String::new(), |t| t.to_string());
Ok(Response::new(volume_server_pb::ReadNeedleMetaResponse {
cookie: n.cookie.0,
last_modified: n.last_modified,
crc: n.checksum.0,
ttl: ttl_str,
append_at_ns: n.append_at_ns,
}))
}
Err(e) => Err(Status::not_found(e.to_string())),
}
let (_, vol) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {} and read needle metadata at ec shards is not supported", vid)))?;
let offset = req.offset;
let size = crate::storage::types::Size(req.size);
let mut n = Needle { id: needle_id, flags: 0x08, ..Needle::default() };
vol.read_needle_data_at(&mut n, offset, size)
.map_err(|e| Status::internal(format!("read needle meta: {}", e)))?;
let ttl_str = n.ttl.as_ref().map_or(String::new(), |t| t.to_string());
Ok(Response::new(volume_server_pb::ReadNeedleMetaResponse {
cookie: n.cookie.0,
last_modified: n.last_modified,
crc: n.checksum.0,
ttl: ttl_str,
append_at_ns: n.append_at_ns,
}))
}
async fn write_needle_blob(
@ -724,7 +794,7 @@ impl VolumeServer for VolumeGrpcService {
let start = now_ns();
// Route ping based on target type
let remote_time_ns = if req.target.is_empty() || req.target_type == "volume" {
let remote_time_ns = if req.target.is_empty() || req.target_type == "volumeServer" {
// Volume self-ping: return our own time
now_ns()
} else if req.target_type == "master" {

56
seaweed-volume/src/server/handlers.rs

@ -66,6 +66,8 @@ pub struct ReadQueryParams {
#[serde(rename = "response-cache-control")]
pub response_cache_control: Option<String>,
pub dl: Option<String>,
#[serde(rename = "readDeleted")]
pub read_deleted: Option<String>,
}
// ============================================================================
@ -127,8 +129,10 @@ async fn get_or_head_handler_inner(
..Needle::default()
};
let read_deleted = query.read_deleted.as_deref() == Some("true");
let store = state.store.read().unwrap();
match store.read_volume_needle(vid, &mut n) {
match store.read_volume_needle_opt(vid, &mut n, read_deleted) {
Ok(count) => {
if count <= 0 {
return StatusCode::NOT_FOUND.into_response();
@ -379,6 +383,7 @@ pub async fn post_handler(
metrics::REQUEST_COUNTER.with_label_values(&["write"]).inc();
let path = request.uri().path().to_string();
let query = request.uri().query().unwrap_or("").to_string();
let headers = request.headers().clone();
let (vid, needle_id, cookie) = match parse_url_path(&path) {
@ -392,12 +397,39 @@ pub async fn post_handler(
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
}
// Check for chunk manifest flag
let is_chunk_manifest = query.split('&')
.any(|p| p == "cm=true" || p == "cm=1");
// Validate multipart/form-data has a boundary
if let Some(ct) = headers.get(header::CONTENT_TYPE) {
if let Ok(ct_str) = ct.to_str() {
if ct_str.starts_with("multipart/form-data") && !ct_str.contains("boundary=") {
return (StatusCode::BAD_REQUEST, "no multipart boundary param in Content-Type").into_response();
}
}
}
let content_md5 = headers.get("Content-MD5").and_then(|v| v.to_str().ok()).map(|s| s.to_string());
// Read body
let body = match axum::body::to_bytes(request.into_body(), usize::MAX).await {
Ok(b) => b,
Err(e) => return (StatusCode::BAD_REQUEST, format!("read body: {}", e)).into_response(),
};
// Validate Content-MD5 if provided
if let Some(ref expected_md5) = content_md5 {
use md5::{Md5, Digest};
use base64::Engine;
let mut hasher = Md5::new();
hasher.update(&body);
let actual = base64::engine::general_purpose::STANDARD.encode(hasher.finalize());
if actual != *expected_md5 {
return (StatusCode::BAD_REQUEST, format!("Content-MD5 mismatch: expected {} got {}", expected_md5, actual)).into_response();
}
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
@ -412,6 +444,9 @@ pub async fn post_handler(
..Needle::default()
};
n.set_has_last_modified_date();
if is_chunk_manifest {
n.set_is_chunk_manifest();
}
let mut store = state.store.write().unwrap();
match store.write_volume_needle(vid, &mut n) {
@ -479,6 +514,22 @@ pub async fn delete_handler(
..Needle::default()
};
// Read needle first to validate cookie (matching Go behavior)
let original_cookie = cookie;
{
let store = state.store.read().unwrap();
match store.read_volume_needle(vid, &mut n) {
Ok(_) => {}
Err(_) => {
let result = DeleteResult { size: 0 };
return (StatusCode::NOT_FOUND, axum::Json(result)).into_response();
}
}
}
if n.cookie != original_cookie {
return (StatusCode::BAD_REQUEST, "File Random Cookie does not match.").into_response();
}
let mut store = state.store.write().unwrap();
match store.delete_volume_needle(vid, &mut n) {
Ok(size) => {
@ -496,9 +547,6 @@ pub async fn delete_handler(
let result = DeleteResult { size: 0 };
(StatusCode::NOT_FOUND, axum::Json(result)).into_response()
}
Err(crate::storage::volume::VolumeError::CookieMismatch(_)) => {
(StatusCode::BAD_REQUEST, "cookie mismatch").into_response()
}
Err(e) => {
(StatusCode::INTERNAL_SERVER_ERROR, format!("delete error: {}", e)).into_response()
}

3
seaweed-volume/src/storage/needle_map.rs

@ -174,7 +174,8 @@ impl CompactNeedleMap {
self.metric.on_delete(&old);
let deleted_size = Size(-(old.size.0));
self.map.insert(key, NeedleValue { offset, size: deleted_size });
// Keep original offset so readDeleted can find original data (matching Go behavior)
self.map.insert(key, NeedleValue { offset: old.offset, size: deleted_size });
return Ok(Some(old.size));
}
}

8
seaweed-volume/src/storage/store.rs

@ -191,7 +191,7 @@ impl Store {
}
Err(VolumeError::Io(io::Error::new(
io::ErrorKind::NotFound,
format!("volume {} not found on any disk", vid),
format!("volume {} not found on disk", vid),
)))
}
@ -203,6 +203,12 @@ impl Store {
vol.read_needle(n)
}
/// Read a needle from a volume, optionally reading deleted needles.
pub fn read_volume_needle_opt(&self, vid: VolumeId, n: &mut Needle, read_deleted: bool) -> Result<i32, VolumeError> {
let (_, vol) = self.find_volume(vid).ok_or(VolumeError::NotFound)?;
vol.read_needle_opt(n, read_deleted)
}
/// Write a needle to a volume.
pub fn write_volume_needle(
&mut self, vid: VolumeId, n: &mut Needle,

32
seaweed-volume/src/storage/volume.rs

@ -361,6 +361,10 @@ impl Volume {
/// Read a needle by its ID from the volume.
pub fn read_needle(&self, n: &mut Needle) -> Result<i32, VolumeError> {
self.read_needle_opt(n, false)
}
pub fn read_needle_opt(&self, n: &mut Needle, read_deleted: bool) -> Result<i32, VolumeError> {
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?;
@ -368,15 +372,20 @@ impl Volume {
return Err(VolumeError::NotFound);
}
let read_size = nv.size;
let mut read_size = nv.size;
if read_size.is_deleted() {
return Err(VolumeError::Deleted);
if read_deleted && !read_size.is_tombstone() {
// Negate to get original size
read_size = Size(-read_size.0);
} else {
return Err(VolumeError::Deleted);
}
}
if read_size.0 == 0 {
return Ok(0);
}
self.read_needle_data(n, nv.offset.to_actual_offset(), read_size)?;
self.read_needle_data_at(n, nv.offset.to_actual_offset(), read_size)?;
// TTL expiry check
if n.has_ttl() {
@ -399,7 +408,7 @@ impl Volume {
}
/// Read needle data from .dat file at given offset.
fn read_needle_data(&self, n: &mut Needle, offset: i64, size: Size) -> Result<(), VolumeError> {
pub fn read_needle_data_at(&self, n: &mut Needle, offset: i64, size: Size) -> Result<(), VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
@ -550,7 +559,7 @@ impl Volume {
if let Some(nv) = nm.get(n.id) {
if !nv.offset.is_zero() && nv.size.is_valid() {
let mut old = Needle::default();
if self.read_needle_data(&mut old, nv.offset.to_actual_offset(), nv.size).is_ok() {
if self.read_needle_data_at(&mut old, nv.offset.to_actual_offset(), nv.size).is_ok() {
if old.cookie == n.cookie
&& old.checksum == n.checksum
&& old.data == n.data
@ -609,15 +618,6 @@ impl Volume {
return Ok(Size(0));
}
// Cookie validation: read stored needle header and verify cookie matches
{
let mut existing = Needle::default();
self.read_needle_header(&mut existing, stored_offset.to_actual_offset())?;
if existing.cookie != n.cookie {
return Err(VolumeError::CookieMismatch(n.cookie.0));
}
}
// Write tombstone: append needle with empty data
n.data = vec![];
n.append_at_ns = get_append_at_ns(self.last_append_at_ns);
@ -727,6 +727,10 @@ impl Volume {
}
}
pub fn idx_file_size(&self) -> u64 {
self.nm.as_ref().map_or(0, |nm| nm.index_file_size())
}
// ---- Sync / Close ----
pub fn sync_to_disk(&mut self) -> io::Result<()> {

Loading…
Cancel
Save