From 089d94e5ffcf588fc766224092dd29fe77f41ff8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Mar 2026 18:19:43 -0800 Subject: [PATCH] Address remaining review comments from PR #8539 Security: - Remove unused is_write_active field from Guard gRPC: - Update needle index after write_needle_blob via new put_needle_index() - Return unimplemented for set_state (no state persistence yet) - Add TODO for DiskStatus total/used metrics Storage: - Track overwritten needle bytes in deletion_byte_count for garbage_level - Fix sync order: flush dat file before idx for crash safety - Reject duplicate volume IDs in mount_volume Erasure coding: - Add OOM guard (1 GiB limit) for encode_one_batch allocation - Add non-Unix read fallback in encode_one_batch - Validate shard ID bounds in add/remove/has_shard_id --- seaweed-volume/src/security.rs | 2 -- seaweed-volume/src/server/grpc_server.rs | 12 +++++++++- .../src/storage/erasure_coding/ec_encoder.rs | 23 +++++++++++++++++++ .../src/storage/erasure_coding/ec_shard.rs | 15 ++++++++++++ seaweed-volume/src/storage/needle_map.rs | 2 ++ seaweed-volume/src/storage/store.rs | 3 +++ seaweed-volume/src/storage/volume.rs | 23 +++++++++++++------ 7 files changed, 70 insertions(+), 10 deletions(-) diff --git a/seaweed-volume/src/security.rs b/seaweed-volume/src/security.rs index 654538c40..352bbf03e 100644 --- a/seaweed-volume/src/security.rs +++ b/seaweed-volume/src/security.rs @@ -113,7 +113,6 @@ pub struct Guard { pub expires_after_sec: i64, pub read_signing_key: SigningKey, pub read_expires_after_sec: i64, - is_write_active: bool, } impl Guard { @@ -127,7 +126,6 @@ impl Guard { let mut guard = Guard { whitelist_ips: HashSet::new(), whitelist_cidrs: Vec::new(), - is_write_active: !signing_key.is_empty(), signing_key, expires_after_sec, read_signing_key, diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 4b7c7aa2e..e223753e6 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -298,6 +298,7 @@ impl VolumeServer for VolumeGrpcService { &self, request: Request, ) -> Result, Status> { + // TODO: Persist state changes. Currently echoes back the request state. let req = request.into_inner(); Ok(Response::new(volume_server_pb::SetStateResponse { state: req.state, @@ -414,6 +415,12 @@ impl VolumeServer for VolumeGrpcService { vol.write_needle_blob(dat_size, &req.needle_blob) .map_err(|e| Status::internal(e.to_string()))?; + // Update the needle index so the written blob is discoverable + let needle_id = NeedleId(req.needle_id); + let size = Size(req.size); + vol.put_needle_index(needle_id, Offset::from_actual_offset(dat_size), size) + .map_err(|e| Status::internal(e.to_string()))?; + Ok(Response::new(volume_server_pb::WriteNeedleBlobResponse {})) } @@ -556,11 +563,14 @@ impl VolumeServer for VolumeGrpcService { let mut disk_statuses = Vec::new(); for loc in &store.locations { + let free = loc.available_space.load(std::sync::atomic::Ordering::Relaxed); + // TODO: DiskLocation does not yet track total disk size. + // Once implemented, compute all/used/percent from real values. disk_statuses.push(volume_server_pb::DiskStatus { dir: loc.directory.clone(), all: 0, used: 0, - free: loc.available_space.load(std::sync::atomic::Ordering::Relaxed), + free, percent_free: 0.0, percent_used: 0.0, disk_type: loc.disk_type.to_string(), diff --git a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs index 3283bda0d..34ae875f6 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs @@ -132,6 +132,22 @@ fn encode_one_batch( rs: &ReedSolomon, shards: &mut [EcVolumeShard], ) -> io::Result<()> { + // Each batch allocates block_size * TOTAL_SHARDS_COUNT bytes. + // With large blocks (1 GiB) this is 14 GiB -- guard against OOM. + let total_alloc = block_size.checked_mul(TOTAL_SHARDS_COUNT).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "block_size * shard count overflows usize") + })?; + const MAX_BATCH_ALLOC: usize = 1024 * 1024 * 1024; // 1 GiB safety limit + if total_alloc > MAX_BATCH_ALLOC { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "batch allocation too large ({} bytes, limit {} bytes); block_size={} shards={}", + total_alloc, MAX_BATCH_ALLOC, block_size, TOTAL_SHARDS_COUNT, + ), + )); + } + // Allocate buffers for all shards let mut buffers: Vec> = (0..TOTAL_SHARDS_COUNT) .map(|_| vec![0u8; block_size]) @@ -146,6 +162,13 @@ fn encode_one_batch( use std::os::unix::fs::FileExt; dat_file.read_at(&mut buffers[i], read_offset)?; } + + #[cfg(not(unix))] + { + let mut f = dat_file.try_clone()?; + f.seek(SeekFrom::Start(read_offset))?; + f.read(&mut buffers[i])?; + } } // Encode parity shards diff --git a/seaweed-volume/src/storage/erasure_coding/ec_shard.rs b/seaweed-volume/src/storage/erasure_coding/ec_shard.rs index 5bd410606..bccfb96b8 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_shard.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_shard.rs @@ -126,14 +126,29 @@ pub struct ShardBits(pub u32); impl ShardBits { pub fn add_shard_id(&mut self, id: ShardId) { + assert!( + (id as usize) < TOTAL_SHARDS_COUNT, + "shard id {} out of bounds (max {})", + id, + TOTAL_SHARDS_COUNT - 1, + ); self.0 |= 1 << id; } pub fn remove_shard_id(&mut self, id: ShardId) { + assert!( + (id as usize) < TOTAL_SHARDS_COUNT, + "shard id {} out of bounds (max {})", + id, + TOTAL_SHARDS_COUNT - 1, + ); self.0 &= !(1 << id); } pub fn has_shard_id(&self, id: ShardId) -> bool { + if (id as usize) >= TOTAL_SHARDS_COUNT { + return false; + } self.0 & (1 << id) != 0 } diff --git a/seaweed-volume/src/storage/needle_map.rs b/seaweed-volume/src/storage/needle_map.rs index 284610042..ff0acb450 100644 --- a/seaweed-volume/src/storage/needle_map.rs +++ b/seaweed-volume/src/storage/needle_map.rs @@ -45,6 +45,8 @@ impl NeedleMapMetric { if let Some(old_val) = old { if old_val.size.is_valid() { self.file_byte_count.fetch_sub(old_val.size.0 as u64, Ordering::Relaxed); + // Track overwritten bytes as garbage for compaction (garbage_level) + self.deletion_byte_count.fetch_add(old_val.size.0 as u64, Ordering::Relaxed); } } } diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index 03e0058b9..58da4815d 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -159,6 +159,9 @@ impl Store { collection: &str, disk_type: DiskType, ) -> Result<(), VolumeError> { + if self.find_volume(vid).is_some() { + return Err(VolumeError::AlreadyExists); + } // Find the location where the .dat file exists for loc in &mut self.locations { if &loc.disk_type != &disk_type { diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 12ffb4371..884a46103 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -621,6 +621,15 @@ impl Volume { self.no_write_or_delete || self.no_write_can_delete } + /// Insert or update a needle index entry (for low-level blob writes). + pub fn put_needle_index(&mut self, key: NeedleId, offset: Offset, size: Size) -> Result<(), VolumeError> { + if let Some(ref mut nm) = self.nm { + nm.put(key, offset, size) + .map_err(VolumeError::Io)?; + } + Ok(()) + } + /// Mark this volume as read-only (no writes or deletes). pub fn set_read_only(&mut self) { self.no_write_or_delete = true; @@ -684,24 +693,24 @@ impl Volume { // ---- Sync / Close ---- pub fn sync_to_disk(&mut self) -> io::Result<()> { - if let Some(ref nm) = self.nm { - nm.sync()?; - } if let Some(ref dat_file) = self.dat_file { dat_file.sync_all()?; } + if let Some(ref nm) = self.nm { + nm.sync()?; + } Ok(()) } pub fn close(&mut self) { - if let Some(ref nm) = self.nm { - let _ = nm.sync(); - } - self.nm = None; if let Some(ref dat_file) = self.dat_file { let _ = dat_file.sync_all(); } self.dat_file = None; + if let Some(ref nm) = self.nm { + let _ = nm.sync(); + } + self.nm = None; } /// Remove all volume files from disk.