From fc2ab297ba285d128528c711513512850cdfd39f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2026 15:31:20 -0700 Subject: [PATCH] =?UTF-8?q?fix:=20round=204=20Go=20parity=20=E2=80=94=20Vo?= =?UTF-8?q?lumeConfigure=20unmount,=20304=20headers,=20EC=20encoding,=20co?= =?UTF-8?q?mpaction=20TTL?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - VolumeConfigure: don't fail on unmount of non-existent volume (Go returns nil) - 304 responses include ETag/Last-Modified headers per HTTP spec - Conditional header checks run before chunk manifest expansion - EC encoder uses two-phase approach: 1GB large blocks then 1MB small blocks - Compaction uses volume-level TTL (not per-needle TTL) for filtering - VolumeConfigure does unmount/modify-disk/remount cycle matching Go - VolumeMarkReadonly persists flag to .vif when persist=true - AllocateVolume accepts version parameter - Multipart boundary uses leading CRLF per RFC 2046 - MIME type override skipped for chunk manifests --- seaweed-volume/src/server/grpc_server.rs | 49 +++++++---- seaweed-volume/src/server/handlers.rs | 51 +++++++++--- seaweed-volume/src/server/heartbeat.rs | 4 +- seaweed-volume/src/storage/disk_location.rs | 19 +++-- .../src/storage/erasure_coding/ec_encoder.rs | 37 +++++++-- seaweed-volume/src/storage/store.rs | 82 +++++++++++++++++-- seaweed-volume/src/storage/volume.rs | 23 ++++-- seaweed-volume/tests/http_integration.rs | 4 +- 8 files changed, 204 insertions(+), 65 deletions(-) diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 1e1b95391..52039139d 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -353,6 +353,12 @@ impl VolumeServer for VolumeGrpcService { }; let disk_type = DiskType::from_string(&req.disk_type); + let version = if req.version > 0 { + crate::storage::types::Version(req.version as u8) + } else { + crate::storage::types::Version::current() + }; + let mut store = self.state.store.write().unwrap(); store .add_volume( @@ -362,6 +368,7 @@ impl VolumeServer for VolumeGrpcService { ttl, req.preallocate as u64, disk_type, + version, ) .map_err(|e| Status::internal(e.to_string()))?; @@ -541,7 +548,7 @@ impl VolumeServer for VolumeGrpcService { let (_, vol) = store .find_volume_mut(vid) .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; - vol.set_read_only(); + vol.set_read_only_persist(req.persist); drop(store); self.state.volume_state_notify.notify_one(); Ok(Response::new( @@ -582,29 +589,39 @@ impl VolumeServer for VolumeGrpcService { Ok(rp) => rp, Err(e) => { return Ok(Response::new(volume_server_pb::VolumeConfigureResponse { - error: format!("invalid replica placement: {}", e), + error: format!("volume configure replication {}: {}", req.replication, e), })); } }; let mut store = self.state.store.write().unwrap(); - let (_, vol) = match store.find_volume_mut(vid) { - Some(v) => v, - None => { - return Ok(Response::new(volume_server_pb::VolumeConfigureResponse { - error: format!("volume {} not found on disk, failed to restore mount", vid), - })); + + // Unmount the volume (Go returns nil for non-existent volumes, so we don't + // treat a missing volume as an error here — configure_volume will catch it) + store.unmount_volume(vid); + + // Modify the super block on disk (replica_placement byte) + if let Err(e) = store.configure_volume(vid, rp) { + let mut error = format!("volume configure {}: {}", vid, e); + // Error recovery: try to re-mount anyway + if let Err(mount_err) = store.mount_volume(vid, "", DiskType::HardDrive) { + error += &format!(". Also failed to restore mount: {}", mount_err); } - }; + return Ok(Response::new(volume_server_pb::VolumeConfigureResponse { + error, + })); + } - match vol.set_replica_placement(rp) { - Ok(()) => Ok(Response::new(volume_server_pb::VolumeConfigureResponse { - error: String::new(), - })), - Err(e) => Ok(Response::new(volume_server_pb::VolumeConfigureResponse { - error: e.to_string(), - })), + // Re-mount the volume + if let Err(e) = store.mount_volume(vid, "", DiskType::HardDrive) { + return Ok(Response::new(volume_server_pb::VolumeConfigureResponse { + error: format!("volume configure mount {}: {}", vid, e), + })); } + + Ok(Response::new(volume_server_pb::VolumeConfigureResponse { + error: String::new(), + })) } async fn volume_status( diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index ed0f7d11f..83754a885 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -795,15 +795,8 @@ async fn get_or_head_handler_inner( n = n_full; } - // Chunk manifest expansion (needs full data) - if n.is_chunk_manifest() && !bypass_cm { - if let Some(resp) = try_expand_chunk_manifest(&state, &n, &headers, &method) { - return resp; - } - // If manifest expansion fails (invalid JSON etc.), fall through to raw data - } - - // Build ETag + // Build ETag and Last-Modified BEFORE conditional checks and chunk manifest expansion + // (matches Go order: conditional checks first, then chunk manifest) let etag = format!("\"{}\"", n.etag()); // Build Last-Modified header (RFC 1123 format) — must be done before conditional checks @@ -827,7 +820,13 @@ async fn get_or_head_handler_inner( chrono::NaiveDateTime::parse_from_str(ims_str, "%a, %d %b %Y %H:%M:%S GMT") { if (n.last_modified as i64) <= ims_time.and_utc().timestamp() { - return StatusCode::NOT_MODIFIED.into_response(); + let mut not_modified_headers = HeaderMap::new(); + not_modified_headers.insert(header::ETAG, etag.parse().unwrap()); + if let Some(ref lm) = last_modified_str { + not_modified_headers + .insert(header::LAST_MODIFIED, lm.parse().unwrap()); + } + return (StatusCode::NOT_MODIFIED, not_modified_headers).into_response(); } } } @@ -838,11 +837,24 @@ async fn get_or_head_handler_inner( if let Some(if_none_match) = headers.get(header::IF_NONE_MATCH) { if let Ok(inm) = if_none_match.to_str() { if inm == etag { - return StatusCode::NOT_MODIFIED.into_response(); + let mut not_modified_headers = HeaderMap::new(); + not_modified_headers.insert(header::ETAG, etag.parse().unwrap()); + if let Some(ref lm) = last_modified_str { + not_modified_headers.insert(header::LAST_MODIFIED, lm.parse().unwrap()); + } + return (StatusCode::NOT_MODIFIED, not_modified_headers).into_response(); } } } + // Chunk manifest expansion (needs full data) — after conditional checks, before response + if n.is_chunk_manifest() && !bypass_cm { + if let Some(resp) = try_expand_chunk_manifest(&state, &n, &headers, &method) { + return resp; + } + // If manifest expansion fails (invalid JSON etc.), fall through to raw data + } + let mut response_headers = HeaderMap::new(); response_headers.insert(header::ETAG, etag.parse().unwrap()); @@ -875,8 +887,16 @@ async fn get_or_head_handler_inner( } // H6: Determine Content-Type: filter application/octet-stream, use mime_guess + // For chunk manifests, skip extension-based MIME override — use stored MIME as-is (Go parity) let content_type = if let Some(ref ct) = query.response_content_type { Some(ct.clone()) + } else if n.is_chunk_manifest() { + // Chunk manifests: use the stored MIME directly without filtering or extension detection + if !n.mime.is_empty() { + Some(String::from_utf8_lossy(&n.mime).to_string()) + } else { + None + } } else { // Get MIME from needle, but filter out application/octet-stream let needle_mime = if !n.mime.is_empty() { @@ -1172,8 +1192,13 @@ fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) -> .to_string(); let mut body = Vec::new(); - for &(start, end) in &ranges { - body.extend_from_slice(format!("\r\n--{}\r\n", boundary).as_bytes()); + for (i, &(start, end)) in ranges.iter().enumerate() { + // First boundary has no leading CRLF per RFC 2046 + if i == 0 { + body.extend_from_slice(format!("--{}\r\n", boundary).as_bytes()); + } else { + body.extend_from_slice(format!("\r\n--{}\r\n", boundary).as_bytes()); + } body.extend_from_slice(format!("Content-Type: {}\r\n", content_type).as_bytes()); body.extend_from_slice( format!("Content-Range: bytes {}-{}/{}\r\n\r\n", start, end, total).as_bytes(), diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index 17a9cb6bf..dd589e768 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -484,7 +484,7 @@ mod tests { use crate::remote_storage::s3_tier::S3TierRegistry; use crate::security::{Guard, SigningKey}; use crate::storage::needle_map::NeedleMapKind; - use crate::storage::types::{DiskType, VolumeId}; + use crate::storage::types::{DiskType, Version, VolumeId}; use std::sync::RwLock; fn test_config() -> HeartbeatConfig { @@ -553,7 +553,7 @@ mod tests { ) .unwrap(); store - .add_volume(VolumeId(7), "pics", None, None, 0, DiskType::HardDrive) + .add_volume(VolumeId(7), "pics", None, None, 0, DiskType::HardDrive, Version::current()) .unwrap(); let heartbeat = build_heartbeat(&test_config(), &store); diff --git a/seaweed-volume/src/storage/disk_location.rs b/seaweed-volume/src/storage/disk_location.rs index 2515ff0c4..0ed307781 100644 --- a/seaweed-volume/src/storage/disk_location.rs +++ b/seaweed-volume/src/storage/disk_location.rs @@ -152,6 +152,7 @@ impl DiskLocation { replica_placement: Option, ttl: Option, preallocate: u64, + version: Version, ) -> Result<(), VolumeError> { let v = Volume::new( &self.directory, @@ -162,7 +163,7 @@ impl DiskLocation { replica_placement, ttl, preallocate, - Version::current(), + version, )?; self.volumes.insert(vid, v); Ok(()) @@ -348,7 +349,7 @@ mod tests { ) .unwrap(); - loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0) + loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0, Version::current()) .unwrap(); assert_eq!(loc.volumes_len(), 1); @@ -373,9 +374,9 @@ mod tests { Vec::new(), ) .unwrap(); - loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0) + loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0, Version::current()) .unwrap(); - loc.create_volume(VolumeId(2), "test", NeedleMapKind::InMemory, None, None, 0) + loc.create_volume(VolumeId(2), "test", NeedleMapKind::InMemory, None, None, 0, Version::current()) .unwrap(); loc.close(); } @@ -412,9 +413,9 @@ mod tests { ) .unwrap(); - loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0) + loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0, Version::current()) .unwrap(); - loc.create_volume(VolumeId(2), "", NeedleMapKind::InMemory, None, None, 0) + loc.create_volume(VolumeId(2), "", NeedleMapKind::InMemory, None, None, 0, Version::current()) .unwrap(); assert_eq!(loc.volumes_len(), 2); @@ -437,11 +438,11 @@ mod tests { ) .unwrap(); - loc.create_volume(VolumeId(1), "pics", NeedleMapKind::InMemory, None, None, 0) + loc.create_volume(VolumeId(1), "pics", NeedleMapKind::InMemory, None, None, 0, Version::current()) .unwrap(); - loc.create_volume(VolumeId(2), "pics", NeedleMapKind::InMemory, None, None, 0) + loc.create_volume(VolumeId(2), "pics", NeedleMapKind::InMemory, None, None, 0, Version::current()) .unwrap(); - loc.create_volume(VolumeId(3), "docs", NeedleMapKind::InMemory, None, None, 0) + loc.create_volume(VolumeId(3), "docs", NeedleMapKind::InMemory, None, None, 0, Version::current()) .unwrap(); assert_eq!(loc.volumes_len(), 3); diff --git a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs index 44b1fa5bb..ef765475b 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs @@ -313,6 +313,10 @@ fn write_sorted_ecx_from_idx(idx_path: &str, ecx_path: &str) -> io::Result<()> { } /// Encode the .dat file data into shard files. +/// +/// Uses a two-phase approach matching Go's ec_encoder.go: +/// 1. Process as many large blocks (1GB) as possible +/// 2. Process remaining data with small blocks (1MB) fn encode_dat_file( dat_file: &File, dat_size: i64, @@ -321,19 +325,37 @@ fn encode_dat_file( data_shards: usize, parity_shards: usize, ) -> io::Result<()> { - let block_size = ERASURE_CODING_SMALL_BLOCK_SIZE; - let row_size = block_size * data_shards; - let mut remaining = dat_size; let mut offset: u64 = 0; - // Process all data in small blocks to avoid large memory allocations + // Phase 1: Process large blocks (1GB each) while enough data remains + let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE; + let large_row_size = large_block_size * data_shards; + + while remaining >= large_row_size as i64 { + encode_one_batch( + dat_file, + offset, + large_block_size, + rs, + shards, + data_shards, + parity_shards, + )?; + offset += large_row_size as u64; + remaining -= large_row_size as i64; + } + + // Phase 2: Process remaining data with small blocks (1MB each) + let small_block_size = ERASURE_CODING_SMALL_BLOCK_SIZE; + let small_row_size = small_block_size * data_shards; + while remaining > 0 { - let to_process = remaining.min(row_size as i64); + let to_process = remaining.min(small_row_size as i64); encode_one_batch( dat_file, offset, - block_size, + small_block_size, rs, shards, data_shards, @@ -365,7 +387,8 @@ fn encode_one_batch( "block_size * shard count overflows usize", ) })?; - const MAX_BATCH_ALLOC: usize = 1024 * 1024 * 1024; // 1 GiB safety limit + // Large-block encoding uses 1 GiB * 14 shards = 14 GiB; allow up to 16 GiB. + const MAX_BATCH_ALLOC: usize = 16 * 1024 * 1024 * 1024; // 16 GiB safety limit if total_alloc > MAX_BATCH_ALLOC { return Err(io::Error::new( io::ErrorKind::InvalidInput, diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index fe1c251d8..9dd5e0053 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -149,6 +149,7 @@ impl Store { ttl: Option, preallocate: u64, disk_type: DiskType, + version: Version, ) -> Result<(), VolumeError> { if self.find_volume(vid).is_some() { return Err(VolumeError::AlreadyExists); @@ -167,6 +168,7 @@ impl Store { replica_placement, ttl, preallocate, + version, ) } @@ -208,7 +210,71 @@ impl Store { let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid); let dat_path = format!("{}.dat", base); if std::path::Path::new(&dat_path).exists() { - return loc.create_volume(vid, collection, self.needle_map_kind, None, None, 0); + return loc.create_volume(vid, collection, self.needle_map_kind, None, None, 0, Version::current()); + } + } + Err(VolumeError::Io(io::Error::new( + io::ErrorKind::NotFound, + format!("volume {} not found on disk", vid), + ))) + } + + /// Configure a volume's replica placement on disk. + /// The volume must already be unmounted. This opens the .dat file directly, + /// modifies the replica_placement byte (offset 1), and writes it back. + pub fn configure_volume( + &self, + vid: VolumeId, + rp: ReplicaPlacement, + ) -> Result<(), VolumeError> { + use std::io::{Read, Seek, SeekFrom, Write}; + // Find the .dat file across all locations + for loc in &self.locations { + // Try both empty and all known collection prefixes + // We scan the directory for matching .dat files + let dir = &loc.directory; + let patterns = [ + format!("{}/{}.dat", dir, vid.0), + ]; + for dat_path in &patterns { + if std::path::Path::new(dat_path).exists() { + let mut file = std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(dat_path) + .map_err(VolumeError::Io)?; + // Read the super block header (at least 8 bytes) + let mut header = [0u8; 8]; + file.read_exact(&mut header).map_err(VolumeError::Io)?; + // Byte 1 is the replica_placement + header[1] = rp.to_byte(); + file.seek(SeekFrom::Start(0)).map_err(VolumeError::Io)?; + file.write_all(&header).map_err(VolumeError::Io)?; + file.sync_all().map_err(VolumeError::Io)?; + return Ok(()); + } + } + // Also check collection-prefixed files + if let Ok(entries) = std::fs::read_dir(dir) { + for entry in entries.flatten() { + let name = entry.file_name(); + let name = name.to_string_lossy(); + if name.ends_with(&format!("_{}.dat", vid.0)) { + let dat_path = entry.path(); + let mut file = std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(&dat_path) + .map_err(VolumeError::Io)?; + let mut header = [0u8; 8]; + file.read_exact(&mut header).map_err(VolumeError::Io)?; + header[1] = rp.to_byte(); + file.seek(SeekFrom::Start(0)).map_err(VolumeError::Io)?; + file.write_all(&header).map_err(VolumeError::Io)?; + file.sync_all().map_err(VolumeError::Io)?; + return Ok(()); + } + } } } Err(VolumeError::Io(io::Error::new( @@ -582,7 +648,7 @@ mod tests { let mut store = make_test_store(&[dir]); store - .add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive) + .add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive, Version::current()) .unwrap(); assert!(store.has_volume(VolumeId(1))); assert!(!store.has_volume(VolumeId(2))); @@ -595,7 +661,7 @@ mod tests { let dir = tmp.path().to_str().unwrap(); let mut store = make_test_store(&[dir]); store - .add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive) + .add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive, Version::current()) .unwrap(); // Write @@ -661,10 +727,10 @@ mod tests { // Add volumes — should go to location with fewest volumes store - .add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive) + .add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive, Version::current()) .unwrap(); store - .add_volume(VolumeId(2), "", None, None, 0, DiskType::HardDrive) + .add_volume(VolumeId(2), "", None, None, 0, DiskType::HardDrive, Version::current()) .unwrap(); assert_eq!(store.total_volume_count(), 2); @@ -680,13 +746,13 @@ mod tests { let mut store = make_test_store(&[dir]); store - .add_volume(VolumeId(1), "pics", None, None, 0, DiskType::HardDrive) + .add_volume(VolumeId(1), "pics", None, None, 0, DiskType::HardDrive, Version::current()) .unwrap(); store - .add_volume(VolumeId(2), "pics", None, None, 0, DiskType::HardDrive) + .add_volume(VolumeId(2), "pics", None, None, 0, DiskType::HardDrive, Version::current()) .unwrap(); store - .add_volume(VolumeId(3), "docs", None, None, 0, DiskType::HardDrive) + .add_volume(VolumeId(3), "docs", None, None, 0, DiskType::HardDrive, Version::current()) .unwrap(); assert_eq!(store.total_volume_count(), 3); diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 33ab0f801..7bb985437 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -1305,11 +1305,20 @@ impl Volume { } /// Mark this volume as read-only (no writes or deletes). + /// If `persist` is true, the readonly state is saved to the .vif file. pub fn set_read_only(&mut self) { self.no_write_or_delete = true; self.save_vif(); } + /// Mark this volume as read-only, optionally persisting to .vif. + pub fn set_read_only_persist(&mut self, persist: bool) { + self.no_write_or_delete = true; + if persist { + self.save_vif(); + } + } + /// Mark this volume as writable (allow writes and deletes). pub fn set_writable(&mut self) { self.no_write_or_delete = false; @@ -1756,15 +1765,13 @@ impl Volume { }; self.read_needle_data_at(&mut n, offset.to_actual_offset(), size)?; - // Skip TTL-expired needles + // Skip TTL-expired needles using the volume's TTL (matches Go's volume_vacuum.go) if n.has_ttl() { - if let Some(ref ttl) = n.ttl { - let ttl_minutes = ttl.minutes(); - if ttl_minutes > 0 && n.last_modified > 0 { - let expire_at = n.last_modified + (ttl_minutes as u64) * 60; - if now >= expire_at { - continue; - } + let ttl_minutes = self.super_block.ttl.minutes(); + if ttl_minutes > 0 && n.last_modified > 0 { + let expire_at = n.last_modified + (ttl_minutes as u64) * 60; + if now >= expire_at { + continue; } } } diff --git a/seaweed-volume/tests/http_integration.rs b/seaweed-volume/tests/http_integration.rs index a3703fb66..db506c534 100644 --- a/seaweed-volume/tests/http_integration.rs +++ b/seaweed-volume/tests/http_integration.rs @@ -16,7 +16,7 @@ use seaweed_volume::server::volume_server::{ }; use seaweed_volume::storage::needle_map::NeedleMapKind; use seaweed_volume::storage::store::Store; -use seaweed_volume::storage::types::{DiskType, VolumeId}; +use seaweed_volume::storage::types::{DiskType, Version, VolumeId}; use tempfile::TempDir; @@ -42,7 +42,7 @@ fn test_state_with_signing_key(signing_key: Vec) -> (Arc, ) .expect("failed to add location"); store - .add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive) + .add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive, Version::current()) .expect("failed to create volume"); let guard = Guard::new(&[], SigningKey(signing_key), 0, SigningKey(vec![]), 0);