|
|
|
@ -9,6 +9,7 @@ use std::io::{self, Write}; |
|
|
|
|
|
|
|
use crate::storage::erasure_coding::ec_locate;
|
|
|
|
use crate::storage::erasure_coding::ec_shard::*;
|
|
|
|
use crate::storage::needle::needle::{get_actual_size, Needle};
|
|
|
|
use crate::storage::types::*;
|
|
|
|
|
|
|
|
/// An erasure-coded volume managing its local shards and index.
|
|
|
|
@ -289,6 +290,71 @@ impl EcVolume { |
|
|
|
Ok(Some((offset, size, intervals)))
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Read a full needle from locally available EC shards.
|
|
|
|
///
|
|
|
|
/// Locates the needle in the .ecx index, determines which shard intervals
|
|
|
|
/// contain its data, reads from local shards, and parses the result into
|
|
|
|
/// a fully populated Needle (including last_modified, checksum, ttl).
|
|
|
|
///
|
|
|
|
/// Returns `Ok(None)` if the needle is not found or is deleted.
|
|
|
|
/// Returns an error if a required shard is not available locally.
|
|
|
|
pub fn read_ec_shard_needle(&self, needle_id: NeedleId) -> io::Result<Option<Needle>> {
|
|
|
|
let (offset, size, intervals) = match self.locate_needle(needle_id)? {
|
|
|
|
Some(v) => v,
|
|
|
|
None => return Ok(None),
|
|
|
|
};
|
|
|
|
|
|
|
|
if intervals.is_empty() {
|
|
|
|
return Err(io::Error::new(
|
|
|
|
io::ErrorKind::InvalidData,
|
|
|
|
"no intervals for needle",
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Compute the total bytes we need to read (full needle on disk)
|
|
|
|
let actual_size = get_actual_size(size, self.version) as usize;
|
|
|
|
let mut bytes = Vec::with_capacity(actual_size);
|
|
|
|
|
|
|
|
for interval in &intervals {
|
|
|
|
let (shard_id, shard_offset) = interval.to_shard_id_and_offset(self.data_shards);
|
|
|
|
let shard = self
|
|
|
|
.shards
|
|
|
|
.get(shard_id as usize)
|
|
|
|
.and_then(|s| s.as_ref())
|
|
|
|
.ok_or_else(|| {
|
|
|
|
io::Error::new(
|
|
|
|
io::ErrorKind::NotFound,
|
|
|
|
format!("ec shard {} not available locally", shard_id),
|
|
|
|
)
|
|
|
|
})?;
|
|
|
|
|
|
|
|
let mut buf = vec![0u8; interval.size as usize];
|
|
|
|
shard.read_at(&mut buf, shard_offset as u64)?;
|
|
|
|
bytes.extend_from_slice(&buf);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Truncate to exact actual_size (intervals may span more than needed)
|
|
|
|
bytes.truncate(actual_size);
|
|
|
|
|
|
|
|
if bytes.len() < actual_size {
|
|
|
|
return Err(io::Error::new(
|
|
|
|
io::ErrorKind::UnexpectedEof,
|
|
|
|
format!(
|
|
|
|
"read {} bytes but need {} for needle {}",
|
|
|
|
bytes.len(),
|
|
|
|
actual_size,
|
|
|
|
needle_id
|
|
|
|
),
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut n = Needle::default();
|
|
|
|
n.read_bytes(&bytes, offset.to_actual_offset(), size, self.version)
|
|
|
|
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("{}", e)))?;
|
|
|
|
|
|
|
|
Ok(Some(n))
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Get the size of a single shard (all shards are the same size).
|
|
|
|
fn shard_file_size(&self) -> i64 {
|
|
|
|
for shard in &self.shards {
|
|
|
|
|