From 11c8dc344708c815a32b9dff8d21a8ccf148f98c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Mar 2026 20:41:38 -0700 Subject: [PATCH] Add read_ec_shard_needle for full needle reconstruction from local EC shards --- .../src/storage/erasure_coding/ec_volume.rs | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/seaweed-volume/src/storage/erasure_coding/ec_volume.rs b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs index 1a752eb21..cf24ab10c 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_volume.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs @@ -9,6 +9,7 @@ use std::io::{self, Write}; use crate::storage::erasure_coding::ec_locate; use crate::storage::erasure_coding::ec_shard::*; +use crate::storage::needle::needle::{get_actual_size, Needle}; use crate::storage::types::*; /// An erasure-coded volume managing its local shards and index. @@ -289,6 +290,71 @@ impl EcVolume { Ok(Some((offset, size, intervals))) } + /// Read a full needle from locally available EC shards. + /// + /// Locates the needle in the .ecx index, determines which shard intervals + /// contain its data, reads from local shards, and parses the result into + /// a fully populated Needle (including last_modified, checksum, ttl). + /// + /// Returns `Ok(None)` if the needle is not found or is deleted. + /// Returns an error if a required shard is not available locally. + pub fn read_ec_shard_needle(&self, needle_id: NeedleId) -> io::Result> { + let (offset, size, intervals) = match self.locate_needle(needle_id)? { + Some(v) => v, + None => return Ok(None), + }; + + if intervals.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "no intervals for needle", + )); + } + + // Compute the total bytes we need to read (full needle on disk) + let actual_size = get_actual_size(size, self.version) as usize; + let mut bytes = Vec::with_capacity(actual_size); + + for interval in &intervals { + let (shard_id, shard_offset) = interval.to_shard_id_and_offset(self.data_shards); + let shard = self + .shards + .get(shard_id as usize) + .and_then(|s| s.as_ref()) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::NotFound, + format!("ec shard {} not available locally", shard_id), + ) + })?; + + let mut buf = vec![0u8; interval.size as usize]; + shard.read_at(&mut buf, shard_offset as u64)?; + bytes.extend_from_slice(&buf); + } + + // Truncate to exact actual_size (intervals may span more than needed) + bytes.truncate(actual_size); + + if bytes.len() < actual_size { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!( + "read {} bytes but need {} for needle {}", + bytes.len(), + actual_size, + needle_id + ), + )); + } + + let mut n = Needle::default(); + n.read_bytes(&bytes, offset.to_actual_offset(), size, self.version) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("{}", e)))?; + + Ok(Some(n)) + } + /// Get the size of a single shard (all shards are the same size). fn shard_file_size(&self) -> i64 { for shard in &self.shards {