Browse Source

Fix additional review findings: Windows read_exact_at, ip_in_cidr overflow, ec_encoder simplification

- Add Windows read_exact_at helper that loops seek_read to handle short reads
- Update all Windows read paths (read_needle_data, read_needle_blob, read_needle_header)
- Fix ip_in_cidr panic on prefix_len == 0 (shift overflow)
- Remove unused large-block EC encoding path (always use small blocks)
- Add duplicate volume ID check in Store::add_location
- Enhance is_file_unchanged to compare metadata fields
- Add warning for missing .idx with existing .dat
- Document JWT exp validation matching Go behavior
pull/8539/head
Chris Lu 3 weeks ago
parent
commit
22f87b6b9c
  1. 5
      seaweed-volume/src/security.rs
  2. 17
      seaweed-volume/src/storage/erasure_coding/ec_encoder.rs
  3. 11
      seaweed-volume/src/storage/store.rs
  4. 47
      seaweed-volume/src/storage/volume.rs

5
seaweed-volume/src/security.rs

@ -90,6 +90,7 @@ pub fn decode_jwt(
}
let mut validation = Validation::new(Algorithm::HS256);
// Match Go behavior: tokens without exp are accepted (Go's jwt-go does not require exp)
validation.required_spec_claims.clear();
let data = decode::<FileIdClaims>(
@ -275,13 +276,13 @@ fn ip_in_cidr(ip: &IpAddr, network: &IpAddr, prefix_len: u8) -> bool {
(IpAddr::V4(ip), IpAddr::V4(net)) => {
let ip_bits = u32::from(*ip);
let net_bits = u32::from(*net);
let mask = if prefix_len >= 32 { u32::MAX } else { u32::MAX << (32 - prefix_len) };
let mask = if prefix_len == 0 { 0 } else if prefix_len >= 32 { u32::MAX } else { u32::MAX << (32 - prefix_len) };
(ip_bits & mask) == (net_bits & mask)
}
(IpAddr::V6(ip), IpAddr::V6(net)) => {
let ip_bits = u128::from(*ip);
let net_bits = u128::from(*net);
let mask = if prefix_len >= 128 { u128::MAX } else { u128::MAX << (128 - prefix_len) };
let mask = if prefix_len == 0 { 0 } else if prefix_len >= 128 { u128::MAX } else { u128::MAX << (128 - prefix_len) };
(ip_bits & mask) == (net_bits & mask)
}
_ => false, // V4/V6 mismatch

17
seaweed-volume/src/storage/erasure_coding/ec_encoder.rs

@ -98,25 +98,16 @@ fn encode_dat_file(
rs: &ReedSolomon,
shards: &mut [EcVolumeShard],
) -> io::Result<()> {
let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE;
let small_block_size = ERASURE_CODING_SMALL_BLOCK_SIZE;
let large_row_size = large_block_size * DATA_SHARDS_COUNT;
let block_size = ERASURE_CODING_SMALL_BLOCK_SIZE;
let row_size = block_size * DATA_SHARDS_COUNT;
let mut remaining = dat_size;
let mut offset: u64 = 0;
// Process large blocks
while remaining >= large_row_size as i64 {
encode_one_batch(dat_file, offset, large_block_size, rs, shards)?;
offset += large_row_size as u64;
remaining -= large_row_size as i64;
}
// Process remaining data in small blocks
// Process all data in small blocks to avoid large memory allocations
while remaining > 0 {
let row_size = small_block_size * DATA_SHARDS_COUNT;
let to_process = remaining.min(row_size as i64);
encode_one_batch(dat_file, offset, small_block_size, rs, shards)?;
encode_one_batch(dat_file, offset, block_size, rs, shards)?;
offset += to_process as u64;
remaining -= to_process;
}

11
seaweed-volume/src/storage/store.rs

@ -52,6 +52,17 @@ impl Store {
) -> io::Result<()> {
let mut loc = DiskLocation::new(directory, idx_directory, max_volume_count, disk_type);
loc.load_existing_volumes(self.needle_map_kind)?;
// Check for duplicate volume IDs across existing locations
for vid in loc.volume_ids() {
if self.find_volume(vid).is_some() {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
format!("volume {} already exists in another location, conflicting dir: {}", vid, directory),
));
}
}
self.locations.push(loc);
Ok(())
}

47
seaweed-volume/src/storage/volume.rs

@ -97,6 +97,22 @@ pub struct Volume {
last_io_error: Option<io::Error>,
}
/// Windows helper: loop seek_read until buffer is fully filled.
#[cfg(windows)]
fn read_exact_at(file: &File, buf: &mut [u8], mut offset: u64) -> io::Result<()> {
use std::os::windows::fs::FileExt;
let mut filled = 0;
while filled < buf.len() {
let n = file.seek_read(&mut buf[filled..], offset)?;
if n == 0 {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF in seek_read"));
}
filled += n;
offset += n as u64;
}
Ok(())
}
impl Volume {
/// Create and load a volume from disk.
pub fn new(
@ -261,6 +277,17 @@ impl Volume {
let nm = CompactNeedleMap::load_from_idx(&mut idx_file)?;
self.nm = Some(nm);
} else {
// Missing .idx with existing .dat could orphan needles
let dat_path = self.file_name(".dat");
if Path::new(&dat_path).exists() {
let dat_size = fs::metadata(&dat_path).map(|m| m.len()).unwrap_or(0);
if dat_size > SUPER_BLOCK_SIZE as u64 {
warn!(
volume_id = self.id.0,
".idx file missing but .dat exists with data; needles may be orphaned"
);
}
}
self.nm = Some(CompactNeedleMap::new());
}
} else {
@ -389,8 +416,7 @@ impl Volume {
}
#[cfg(windows)]
{
use std::os::windows::fs::FileExt;
dat_file.seek_read(&mut buf, offset as u64)?;
read_exact_at(dat_file, &mut buf, offset as u64)?;
}
#[cfg(not(any(unix, windows)))]
{
@ -418,8 +444,7 @@ impl Volume {
}
#[cfg(windows)]
{
use std::os::windows::fs::FileExt;
dat_file.seek_read(&mut buf, offset as u64)?;
read_exact_at(dat_file, &mut buf, offset as u64)?;
}
Ok(buf)
@ -501,6 +526,10 @@ impl Volume {
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut header, offset as u64)?;
}
#[cfg(windows)]
{
read_exact_at(dat_file, &mut header, offset as u64)?;
}
n.read_header(&header);
Ok(())
@ -517,7 +546,15 @@ impl Volume {
if !nv.offset.is_zero() && nv.size.is_valid() {
let mut old = Needle::default();
if self.read_needle_data(&mut old, nv.offset.to_actual_offset(), nv.size).is_ok() {
if old.cookie == n.cookie && old.checksum == n.checksum && old.data == n.data {
if old.cookie == n.cookie
&& old.checksum == n.checksum
&& old.data == n.data
&& old.flags == n.flags
&& old.name == n.name
&& old.mime == n.mime
&& old.pairs == n.pairs
&& old.last_modified == n.last_modified
{
return true;
}
}

Loading…
Cancel
Save