From 22f87b6b9c9c50bedb5575442fe5bc1e23005cb4 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Mar 2026 18:47:06 -0800 Subject: [PATCH] Fix additional review findings: Windows read_exact_at, ip_in_cidr overflow, ec_encoder simplification - Add Windows read_exact_at helper that loops seek_read to handle short reads - Update all Windows read paths (read_needle_data, read_needle_blob, read_needle_header) - Fix ip_in_cidr panic on prefix_len == 0 (shift overflow) - Remove unused large-block EC encoding path (always use small blocks) - Add duplicate volume ID check in Store::add_location - Enhance is_file_unchanged to compare metadata fields - Add warning for missing .idx with existing .dat - Document JWT exp validation matching Go behavior --- seaweed-volume/src/security.rs | 5 +- .../src/storage/erasure_coding/ec_encoder.rs | 17 ++----- seaweed-volume/src/storage/store.rs | 11 +++++ seaweed-volume/src/storage/volume.rs | 47 +++++++++++++++++-- 4 files changed, 60 insertions(+), 20 deletions(-) diff --git a/seaweed-volume/src/security.rs b/seaweed-volume/src/security.rs index 352bbf03e..97bacbf28 100644 --- a/seaweed-volume/src/security.rs +++ b/seaweed-volume/src/security.rs @@ -90,6 +90,7 @@ pub fn decode_jwt( } let mut validation = Validation::new(Algorithm::HS256); + // Match Go behavior: tokens without exp are accepted (Go's jwt-go does not require exp) validation.required_spec_claims.clear(); let data = decode::( @@ -275,13 +276,13 @@ fn ip_in_cidr(ip: &IpAddr, network: &IpAddr, prefix_len: u8) -> bool { (IpAddr::V4(ip), IpAddr::V4(net)) => { let ip_bits = u32::from(*ip); let net_bits = u32::from(*net); - let mask = if prefix_len >= 32 { u32::MAX } else { u32::MAX << (32 - prefix_len) }; + let mask = if prefix_len == 0 { 0 } else if prefix_len >= 32 { u32::MAX } else { u32::MAX << (32 - prefix_len) }; (ip_bits & mask) == (net_bits & mask) } (IpAddr::V6(ip), IpAddr::V6(net)) => { let ip_bits = u128::from(*ip); let net_bits = u128::from(*net); - let mask = if prefix_len >= 128 { u128::MAX } else { u128::MAX << (128 - prefix_len) }; + let mask = if prefix_len == 0 { 0 } else if prefix_len >= 128 { u128::MAX } else { u128::MAX << (128 - prefix_len) }; (ip_bits & mask) == (net_bits & mask) } _ => false, // V4/V6 mismatch diff --git a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs index 34ae875f6..eb3dcfde4 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs @@ -98,25 +98,16 @@ fn encode_dat_file( rs: &ReedSolomon, shards: &mut [EcVolumeShard], ) -> io::Result<()> { - let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE; - let small_block_size = ERASURE_CODING_SMALL_BLOCK_SIZE; - let large_row_size = large_block_size * DATA_SHARDS_COUNT; + let block_size = ERASURE_CODING_SMALL_BLOCK_SIZE; + let row_size = block_size * DATA_SHARDS_COUNT; let mut remaining = dat_size; let mut offset: u64 = 0; - // Process large blocks - while remaining >= large_row_size as i64 { - encode_one_batch(dat_file, offset, large_block_size, rs, shards)?; - offset += large_row_size as u64; - remaining -= large_row_size as i64; - } - - // Process remaining data in small blocks + // Process all data in small blocks to avoid large memory allocations while remaining > 0 { - let row_size = small_block_size * DATA_SHARDS_COUNT; let to_process = remaining.min(row_size as i64); - encode_one_batch(dat_file, offset, small_block_size, rs, shards)?; + encode_one_batch(dat_file, offset, block_size, rs, shards)?; offset += to_process as u64; remaining -= to_process; } diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index 58da4815d..57f8bfe3c 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -52,6 +52,17 @@ impl Store { ) -> io::Result<()> { let mut loc = DiskLocation::new(directory, idx_directory, max_volume_count, disk_type); loc.load_existing_volumes(self.needle_map_kind)?; + + // Check for duplicate volume IDs across existing locations + for vid in loc.volume_ids() { + if self.find_volume(vid).is_some() { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("volume {} already exists in another location, conflicting dir: {}", vid, directory), + )); + } + } + self.locations.push(loc); Ok(()) } diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 884a46103..eba13f5ee 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -97,6 +97,22 @@ pub struct Volume { last_io_error: Option, } +/// Windows helper: loop seek_read until buffer is fully filled. +#[cfg(windows)] +fn read_exact_at(file: &File, buf: &mut [u8], mut offset: u64) -> io::Result<()> { + use std::os::windows::fs::FileExt; + let mut filled = 0; + while filled < buf.len() { + let n = file.seek_read(&mut buf[filled..], offset)?; + if n == 0 { + return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF in seek_read")); + } + filled += n; + offset += n as u64; + } + Ok(()) +} + impl Volume { /// Create and load a volume from disk. pub fn new( @@ -261,6 +277,17 @@ impl Volume { let nm = CompactNeedleMap::load_from_idx(&mut idx_file)?; self.nm = Some(nm); } else { + // Missing .idx with existing .dat could orphan needles + let dat_path = self.file_name(".dat"); + if Path::new(&dat_path).exists() { + let dat_size = fs::metadata(&dat_path).map(|m| m.len()).unwrap_or(0); + if dat_size > SUPER_BLOCK_SIZE as u64 { + warn!( + volume_id = self.id.0, + ".idx file missing but .dat exists with data; needles may be orphaned" + ); + } + } self.nm = Some(CompactNeedleMap::new()); } } else { @@ -389,8 +416,7 @@ impl Volume { } #[cfg(windows)] { - use std::os::windows::fs::FileExt; - dat_file.seek_read(&mut buf, offset as u64)?; + read_exact_at(dat_file, &mut buf, offset as u64)?; } #[cfg(not(any(unix, windows)))] { @@ -418,8 +444,7 @@ impl Volume { } #[cfg(windows)] { - use std::os::windows::fs::FileExt; - dat_file.seek_read(&mut buf, offset as u64)?; + read_exact_at(dat_file, &mut buf, offset as u64)?; } Ok(buf) @@ -501,6 +526,10 @@ impl Volume { use std::os::unix::fs::FileExt; dat_file.read_exact_at(&mut header, offset as u64)?; } + #[cfg(windows)] + { + read_exact_at(dat_file, &mut header, offset as u64)?; + } n.read_header(&header); Ok(()) @@ -517,7 +546,15 @@ impl Volume { if !nv.offset.is_zero() && nv.size.is_valid() { let mut old = Needle::default(); if self.read_needle_data(&mut old, nv.offset.to_actual_offset(), nv.size).is_ok() { - if old.cookie == n.cookie && old.checksum == n.checksum && old.data == n.data { + if old.cookie == n.cookie + && old.checksum == n.checksum + && old.data == n.data + && old.flags == n.flags + && old.name == n.name + && old.mime == n.mime + && old.pairs == n.pairs + && old.last_modified == n.last_modified + { return true; } }