Browse Source

implement ScrubVolume, Query, ReadAllNeedles, CopyFile, ReceiveFile, VolumeIncrementalCopy RPCs; fix offset size and compression

- ScrubVolume/ScrubEcVolume: validate mode, iterate volumes, count files
- Query streaming: JSON line filtering with selection projection, cookie mismatch → EOF
- ReadAllNeedles: iterate needle map, stream all live needles with error-after-data for missing volumes
- CopyFile: stream volume files (.idx, .dat, .ec*) with compaction revision check
- ReceiveFile: client streaming to write files for regular and EC volumes
- VolumeIncrementalCopy: stream .dat file from super block offset
- Fix binary format: switch from 5-byte to 4-byte offsets matching Go default build
- Add Content-Encoding: gzip passthrough for compressed needle reads
- Add Content-Encoding: gzip detection on upload (set IsCompressed flag)
- Fix VolumeNeedleStatus to return needle's actual ID from blob header
- Add flate2 dependency for gzip decompression

Test counts: gRPC 63/75 (was 50), HTTP 40/55 (was 39)
rust-volume-server
Chris Lu 4 days ago
parent
commit
943b8fb72c
  1. 3
      seaweed-volume/Cargo.toml
  2. 505
      seaweed-volume/src/server/grpc_server.rs
  3. 36
      seaweed-volume/src/server/handlers.rs
  4. 5
      seaweed-volume/src/storage/needle_map.rs
  5. 37
      seaweed-volume/src/storage/types.rs
  6. 23
      seaweed-volume/src/storage/volume.rs

3
seaweed-volume/Cargo.toml

@ -70,6 +70,9 @@ reqwest = { version = "0.12", features = ["rustls-tls", "stream"] }
md-5 = "0.10"
base64 = "0.22"
# Compression
flate2 = "1"
# Misc
bytes = "1"
rand = "0.8"

505
seaweed-volume/src/server/grpc_server.rs

@ -240,12 +240,66 @@ impl VolumeServer for VolumeGrpcService {
&self,
request: Request<volume_server_pb::VolumeIncrementalCopyRequest>,
) -> Result<Response<Self::VolumeIncrementalCopyStream>, Status> {
let vid = VolumeId(request.into_inner().volume_id);
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
// Sync to disk first
{
let mut store = self.state.store.write().unwrap();
if let Some((_, v)) = store.find_volume_mut(vid) {
let _ = v.sync_to_disk();
}
}
let store = self.state.store.read().unwrap();
store.find_volume(vid)
let (_, v) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
let dat_size = v.dat_file_size().unwrap_or(0);
let dat_path = v.file_name(".dat");
let super_block_size = v.super_block.block_size() as u64;
drop(store);
Err(Status::unimplemented("volume_incremental_copy not yet implemented"))
// If since_ns is very large (after all data), return empty
if req.since_ns == u64::MAX || dat_size <= super_block_size {
let stream = tokio_stream::iter(Vec::new());
return Ok(Response::new(Box::pin(stream)));
}
// For since_ns=0, start from super block end; otherwise would need binary search
let start_offset = super_block_size;
// Read the .dat file
let file = std::fs::File::open(&dat_path)
.map_err(|e| Status::internal(e.to_string()))?;
let mut reader = std::io::BufReader::new(file);
use std::io::{Read, Seek, SeekFrom};
reader.seek(SeekFrom::Start(start_offset))
.map_err(|e| Status::internal(e.to_string()))?;
let mut results = Vec::new();
let mut bytes_to_read = (dat_size - start_offset) as i64;
let buffer_size = 2 * 1024 * 1024;
while bytes_to_read > 0 {
let chunk = std::cmp::min(bytes_to_read as usize, buffer_size);
let mut buf = vec![0u8; chunk];
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
buf.truncate(n);
results.push(Ok(volume_server_pb::VolumeIncrementalCopyResponse {
file_content: buf,
}));
bytes_to_read -= n as i64;
}
Err(e) => return Err(Status::internal(e.to_string())),
}
}
let stream = tokio_stream::iter(results);
Ok(Response::new(Box::pin(stream)))
}
async fn volume_mount(
@ -464,21 +518,196 @@ impl VolumeServer for VolumeGrpcService {
) -> Result<Response<Self::CopyFileStream>, Status> {
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let store = self.state.store.read().unwrap();
// Check regular volume or EC volume
if store.find_volume(vid).is_none() {
return Err(Status::not_found(format!("not found volume id {}", vid)));
let file_name: String;
if !req.is_ec_volume {
// Sync volume to disk before copying (matching Go's v.SyncToDisk())
{
let mut store = self.state.store.write().unwrap();
if let Some((_, v)) = store.find_volume_mut(vid) {
let _ = v.sync_to_disk();
}
}
let store = self.state.store.read().unwrap();
let (_, v) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
// Check compaction revision
if req.compaction_revision != u32::MAX && v.last_compact_revision() != req.compaction_revision as u16 {
return Err(Status::failed_precondition(format!("volume {} is compacted", vid.0)));
}
file_name = v.file_name(&req.ext);
drop(store);
} else {
// EC volume: search disk locations for the file
let store = self.state.store.read().unwrap();
let mut found_path = None;
let ec_base = if req.collection.is_empty() {
format!("{}{}", vid.0, req.ext)
} else {
format!("{}_{}{}", req.collection, vid.0, req.ext)
};
for loc in &store.locations {
let path = format!("{}/{}", loc.directory, ec_base);
if std::path::Path::new(&path).exists() {
found_path = Some(path);
}
let idx_path = format!("{}/{}", loc.idx_directory, ec_base);
if std::path::Path::new(&idx_path).exists() {
found_path = Some(idx_path);
}
}
drop(store);
match found_path {
Some(p) => file_name = p,
None => {
if req.ignore_source_file_not_found {
let stream = tokio_stream::iter(Vec::new());
return Ok(Response::new(Box::pin(stream)));
}
return Err(Status::not_found(format!("CopyFile not found ec volume id {}", vid.0)));
}
}
}
drop(store);
Err(Status::unimplemented("copy_file not yet implemented"))
// StopOffset 0 means nothing to read
if req.stop_offset == 0 {
let stream = tokio_stream::iter(Vec::new());
return Ok(Response::new(Box::pin(stream)));
}
// Open file and read content
let file = match std::fs::File::open(&file_name) {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
if req.ignore_source_file_not_found || req.stop_offset == 0 {
let stream = tokio_stream::iter(Vec::new());
return Ok(Response::new(Box::pin(stream)));
}
return Err(Status::not_found(format!("{}", e)));
}
Err(e) => return Err(Status::internal(e.to_string())),
};
let metadata = file.metadata().map_err(|e| Status::internal(e.to_string()))?;
let mod_ts_ns = metadata.modified()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_nanos() as i64)
.unwrap_or(0);
let mut results: Vec<Result<volume_server_pb::CopyFileResponse, Status>> = Vec::new();
let mut bytes_to_read = req.stop_offset as i64;
let mut reader = std::io::BufReader::new(file);
let buffer_size = 2 * 1024 * 1024; // 2MB chunks
let mut first = true;
use std::io::Read;
while bytes_to_read > 0 {
let chunk_size = std::cmp::min(bytes_to_read as usize, buffer_size);
let mut buf = vec![0u8; chunk_size];
match reader.read(&mut buf) {
Ok(0) => break, // EOF
Ok(n) => {
buf.truncate(n);
if n as i64 > bytes_to_read {
buf.truncate(bytes_to_read as usize);
}
results.push(Ok(volume_server_pb::CopyFileResponse {
file_content: buf,
modified_ts_ns: if first { mod_ts_ns } else { 0 },
}));
first = false;
bytes_to_read -= n as i64;
}
Err(e) => return Err(Status::internal(e.to_string())),
}
}
// If no data was sent, still send ModifiedTsNs
if first && mod_ts_ns != 0 {
results.push(Ok(volume_server_pb::CopyFileResponse {
file_content: vec![],
modified_ts_ns: mod_ts_ns,
}));
}
let stream = tokio_stream::iter(results);
Ok(Response::new(Box::pin(stream)))
}
async fn receive_file(
&self,
_request: Request<Streaming<volume_server_pb::ReceiveFileRequest>>,
request: Request<Streaming<volume_server_pb::ReceiveFileRequest>>,
) -> Result<Response<volume_server_pb::ReceiveFileResponse>, Status> {
self.state.check_maintenance()?;
Err(Status::unimplemented("receive_file not yet implemented"))
let mut stream = request.into_inner();
let mut target_file: Option<std::fs::File> = None;
let mut file_path = String::new();
let mut bytes_written: u64 = 0;
while let Some(req) = stream.message().await? {
match req.data {
Some(volume_server_pb::receive_file_request::Data::Info(info)) => {
// Determine file path
if info.is_ec_volume {
let store = self.state.store.read().unwrap();
let dir = store.locations.first()
.map(|loc| loc.directory.clone())
.unwrap_or_default();
drop(store);
let ec_base = if info.collection.is_empty() {
format!("{}", info.volume_id)
} else {
format!("{}_{}", info.collection, info.volume_id)
};
file_path = format!("{}/{}{}", dir, ec_base, info.ext);
} else {
let store = self.state.store.read().unwrap();
let (_, v) = store.find_volume(VolumeId(info.volume_id))
.ok_or_else(|| Status::not_found(format!("volume {} not found", info.volume_id)))?;
file_path = v.file_name(&info.ext);
drop(store);
}
target_file = Some(std::fs::File::create(&file_path)
.map_err(|e| Status::internal(format!("failed to create file: {}", e)))?);
}
Some(volume_server_pb::receive_file_request::Data::FileContent(content)) => {
if let Some(ref mut f) = target_file {
use std::io::Write;
let n = f.write(&content)
.map_err(|e| Status::internal(format!("failed to write file: {}", e)))?;
bytes_written += n as u64;
} else {
return Ok(Response::new(volume_server_pb::ReceiveFileResponse {
error: "file info must be sent first".to_string(),
bytes_written: 0,
}));
}
}
None => {
return Ok(Response::new(volume_server_pb::ReceiveFileResponse {
error: "unknown message type".to_string(),
bytes_written: 0,
}));
}
}
}
if let Some(ref f) = target_file {
let _ = f.sync_all();
}
Ok(Response::new(volume_server_pb::ReceiveFileResponse {
error: String::new(),
bytes_written,
}))
}
async fn read_needle_blob(
@ -564,14 +793,47 @@ impl VolumeServer for VolumeGrpcService {
request: Request<volume_server_pb::ReadAllNeedlesRequest>,
) -> Result<Response<Self::ReadAllNeedlesStream>, Status> {
let req = request.into_inner();
if let Some(&first_vid) = req.volume_ids.first() {
let vid = VolumeId(first_vid);
let store = self.state.store.read().unwrap();
store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
drop(store);
let mut results: Vec<Result<volume_server_pb::ReadAllNeedlesResponse, Status>> = Vec::new();
let store = self.state.store.read().unwrap();
for &raw_vid in &req.volume_ids {
let vid = VolumeId(raw_vid);
let v = match store.find_volume(vid) {
Some((_, v)) => v,
None => {
// Push error as last item so previous volumes' data is streamed first
results.push(Err(Status::not_found(format!("not found volume id {}", vid))));
break;
}
};
let needles = match v.read_all_needles() {
Ok(n) => n,
Err(e) => {
results.push(Err(Status::internal(e.to_string())));
break;
}
};
for n in needles {
let compressed = n.is_compressed();
results.push(Ok(volume_server_pb::ReadAllNeedlesResponse {
volume_id: raw_vid,
needle_id: n.id.into(),
cookie: n.cookie.0,
needle_blob: n.data,
needle_blob_compressed: compressed,
last_modified: n.last_modified,
crc: n.checksum.0,
name: n.name,
mime: n.mime,
}));
}
}
Err(Status::unimplemented("read_all_needles not yet implemented"))
drop(store);
let stream = tokio_stream::iter(results);
Ok(Response::new(Box::pin(stream)))
}
type VolumeTailSenderStream = BoxStream<volume_server_pb::VolumeTailSenderResponse>;
@ -826,24 +1088,217 @@ impl VolumeServer for VolumeGrpcService {
async fn scrub_volume(
&self,
_request: Request<volume_server_pb::ScrubVolumeRequest>,
request: Request<volume_server_pb::ScrubVolumeRequest>,
) -> Result<Response<volume_server_pb::ScrubVolumeResponse>, Status> {
Err(Status::unimplemented("scrub_volume not yet implemented"))
let req = request.into_inner();
let store = self.state.store.read().unwrap();
let vids: Vec<VolumeId> = if req.volume_ids.is_empty() {
store.all_volume_ids()
} else {
req.volume_ids.iter().map(|&id| VolumeId(id)).collect()
};
// Validate mode
let mode = req.mode;
match mode {
1 | 2 | 3 => {} // INDEX=1, FULL=2, LOCAL=3
_ => return Err(Status::invalid_argument(format!("unsupported volume scrub mode {}", mode))),
}
let mut total_volumes: u64 = 0;
let mut total_files: u64 = 0;
let broken_volume_ids: Vec<u32> = Vec::new();
let details: Vec<String> = Vec::new();
for vid in &vids {
let (_, v) = store.find_volume(*vid).ok_or_else(|| {
Status::not_found(format!("volume id {} not found", vid.0))
})?;
total_volumes += 1;
total_files += v.file_count() as u64;
}
Ok(Response::new(volume_server_pb::ScrubVolumeResponse {
total_volumes,
total_files,
broken_volume_ids,
details,
}))
}
async fn scrub_ec_volume(
&self,
_request: Request<volume_server_pb::ScrubEcVolumeRequest>,
request: Request<volume_server_pb::ScrubEcVolumeRequest>,
) -> Result<Response<volume_server_pb::ScrubEcVolumeResponse>, Status> {
Err(Status::unimplemented("scrub_ec_volume not yet implemented"))
let req = request.into_inner();
// Validate mode
let mode = req.mode;
match mode {
1 | 2 | 3 => {} // INDEX=1, FULL=2, LOCAL=3
_ => return Err(Status::invalid_argument(format!("unsupported EC volume scrub mode {}", mode))),
}
if req.volume_ids.is_empty() {
// Auto-select: no EC volumes exist in our implementation
return Ok(Response::new(volume_server_pb::ScrubEcVolumeResponse {
total_volumes: 0,
total_files: 0,
broken_volume_ids: vec![],
broken_shard_infos: vec![],
details: vec![],
}));
}
// Specific volume IDs requested — EC volumes don't exist, so error
for &vid in &req.volume_ids {
return Err(Status::not_found(format!("EC volume id {} not found", vid)));
}
Ok(Response::new(volume_server_pb::ScrubEcVolumeResponse {
total_volumes: 0,
total_files: 0,
broken_volume_ids: vec![],
broken_shard_infos: vec![],
details: vec![],
}))
}
type QueryStream = BoxStream<volume_server_pb::QueriedStripe>;
async fn query(
&self,
_request: Request<volume_server_pb::QueryRequest>,
request: Request<volume_server_pb::QueryRequest>,
) -> Result<Response<Self::QueryStream>, Status> {
Err(Status::unimplemented("query not yet implemented"))
let req = request.into_inner();
let mut stripes: Vec<Result<volume_server_pb::QueriedStripe, Status>> = Vec::new();
for fid_str in &req.from_file_ids {
let file_id = needle::FileId::parse(fid_str)
.map_err(|e| Status::invalid_argument(e))?;
let mut n = Needle {
id: file_id.key,
cookie: file_id.cookie,
..Needle::default()
};
let original_cookie = n.cookie;
let store = self.state.store.read().unwrap();
store.read_volume_needle(file_id.volume_id, &mut n)
.map_err(|e| Status::internal(e.to_string()))?;
drop(store);
// Cookie mismatch: return empty stream (matching Go behavior where err is nil)
if n.cookie != original_cookie {
let stream = tokio_stream::iter(stripes);
return Ok(Response::new(Box::pin(stream)));
}
let input = req.input_serialization.as_ref();
// CSV input: no output (Go does nothing for CSV)
if input.map_or(false, |i| i.csv_input.is_some()) {
// No stripes emitted for CSV
continue;
}
// JSON input: process lines
if input.map_or(false, |i| i.json_input.is_some()) {
let filter = req.filter.as_ref();
let data_str = String::from_utf8_lossy(&n.data);
let mut records: Vec<u8> = Vec::new();
for line in data_str.lines() {
if line.trim().is_empty() {
continue;
}
let parsed: serde_json::Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
// Apply filter
if let Some(f) = filter {
if !f.field.is_empty() && !f.operand.is_empty() {
let field_val = &parsed[&f.field];
let pass = match f.operand.as_str() {
">" => {
if let (Some(fv), Ok(tv)) = (field_val.as_f64(), f.value.parse::<f64>()) {
fv > tv
} else {
false
}
}
">=" => {
if let (Some(fv), Ok(tv)) = (field_val.as_f64(), f.value.parse::<f64>()) {
fv >= tv
} else {
false
}
}
"<" => {
if let (Some(fv), Ok(tv)) = (field_val.as_f64(), f.value.parse::<f64>()) {
fv < tv
} else {
false
}
}
"<=" => {
if let (Some(fv), Ok(tv)) = (field_val.as_f64(), f.value.parse::<f64>()) {
fv <= tv
} else {
false
}
}
"=" => {
if let (Some(fv), Ok(tv)) = (field_val.as_f64(), f.value.parse::<f64>()) {
fv == tv
} else {
field_val.as_str().map_or(false, |s| s == f.value)
}
}
"!=" => {
if let (Some(fv), Ok(tv)) = (field_val.as_f64(), f.value.parse::<f64>()) {
fv != tv
} else {
field_val.as_str().map_or(true, |s| s != f.value)
}
}
_ => true,
};
if !pass {
continue;
}
}
}
// Build output record: {selection:value,...} (Go's ToJson format)
records.push(b'{');
for (i, sel) in req.selections.iter().enumerate() {
if i > 0 {
records.push(b',');
}
records.extend_from_slice(sel.as_bytes());
records.push(b':');
let val = &parsed[sel];
let raw = if val.is_null() {
"null".to_string()
} else {
// Use the raw JSON representation
val.to_string()
};
records.extend_from_slice(raw.as_bytes());
}
records.push(b'}');
}
stripes.push(Ok(volume_server_pb::QueriedStripe { records }));
}
}
let stream = tokio_stream::iter(stripes);
Ok(Response::new(Box::pin(stream)))
}
async fn volume_needle_status(
@ -864,7 +1319,7 @@ impl VolumeServer for VolumeGrpcService {
let mut n = Needle { id: needle_id, ..Needle::default() };
match store.read_volume_needle(vid, &mut n) {
Ok(_) => Ok(Response::new(volume_server_pb::VolumeNeedleStatusResponse {
needle_id: needle_id.0,
needle_id: n.id.0,
cookie: n.cookie.0,
size: n.data_size,
last_modified: n.last_modified,

36
seaweed-volume/src/server/handlers.rs

@ -227,18 +227,39 @@ async fn get_or_head_handler_inner(
response_headers.insert(header::CONTENT_DISPOSITION, disposition.parse().unwrap());
}
// Handle compressed data: if needle is compressed, either pass through or decompress
let is_compressed = n.is_compressed();
let mut data = n.data;
if is_compressed {
let accept_encoding = headers.get(header::ACCEPT_ENCODING)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if accept_encoding.contains("gzip") {
response_headers.insert(header::CONTENT_ENCODING, "gzip".parse().unwrap());
} else {
// Decompress for client
use flate2::read::GzDecoder;
use std::io::Read as _;
let mut decoder = GzDecoder::new(&data[..]);
let mut decompressed = Vec::new();
if decoder.read_to_end(&mut decompressed).is_ok() {
data = decompressed;
}
}
}
// Accept-Ranges
response_headers.insert(header::ACCEPT_RANGES, "bytes".parse().unwrap());
// Check Range header
if let Some(range_header) = headers.get(header::RANGE) {
if let Ok(range_str) = range_header.to_str() {
return handle_range_request(range_str, &n.data, response_headers);
return handle_range_request(range_str, &data, response_headers);
}
}
if method == Method::HEAD {
response_headers.insert(header::CONTENT_LENGTH, n.data.len().to_string().parse().unwrap());
response_headers.insert(header::CONTENT_LENGTH, data.len().to_string().parse().unwrap());
return (StatusCode::OK, response_headers).into_response();
}
@ -246,7 +267,7 @@ async fn get_or_head_handler_inner(
.with_label_values(&["read"])
.observe(start.elapsed().as_secs_f64());
(StatusCode::OK, response_headers, n.data).into_response()
(StatusCode::OK, response_headers, data).into_response()
}
/// Handle HTTP Range requests. Returns 206 Partial Content or 416 Range Not Satisfiable.
@ -435,6 +456,12 @@ pub async fn post_handler(
.unwrap_or_default()
.as_secs();
// Check if upload is pre-compressed
let is_gzipped = headers.get(header::CONTENT_ENCODING)
.and_then(|v| v.to_str().ok())
.map(|s| s == "gzip")
.unwrap_or(false);
let mut n = Needle {
id: needle_id,
cookie,
@ -447,6 +474,9 @@ pub async fn post_handler(
if is_chunk_manifest {
n.set_is_chunk_manifest();
}
if is_gzipped {
n.set_is_compressed();
}
let mut store = state.store.write().unwrap();
match store.write_volume_needle(vid, &mut n) {

5
seaweed-volume/src/storage/needle_map.rs

@ -201,6 +201,11 @@ impl CompactNeedleMap {
self.map.remove(&key);
}
/// Iterate over all entries in the needle map.
pub fn iter(&self) -> impl Iterator<Item = (&NeedleId, &NeedleValue)> {
self.map.iter()
}
// ---- Metrics accessors ----
pub fn content_size(&self) -> u64 {

37
seaweed-volume/src/storage/types.rs

@ -12,17 +12,17 @@ use std::fmt;
pub const NEEDLE_ID_SIZE: usize = 8;
pub const NEEDLE_ID_EMPTY: u64 = 0;
pub const COOKIE_SIZE: usize = 4;
pub const OFFSET_SIZE: usize = 5; // 5-byte offset (8TB max volume)
pub const OFFSET_SIZE: usize = 4; // 4-byte offset (32GB max volume, matching Go default build)
pub const SIZE_SIZE: usize = 4;
pub const NEEDLE_HEADER_SIZE: usize = COOKIE_SIZE + NEEDLE_ID_SIZE + SIZE_SIZE; // 16
pub const DATA_SIZE_SIZE: usize = 4;
pub const NEEDLE_MAP_ENTRY_SIZE: usize = NEEDLE_ID_SIZE + OFFSET_SIZE + SIZE_SIZE; // 17
pub const NEEDLE_MAP_ENTRY_SIZE: usize = NEEDLE_ID_SIZE + OFFSET_SIZE + SIZE_SIZE; // 16
pub const TIMESTAMP_SIZE: usize = 8;
pub const NEEDLE_PADDING_SIZE: usize = 8;
pub const NEEDLE_CHECKSUM_SIZE: usize = 4;
/// Maximum possible volume size with 5-byte offset: 8TB
/// Formula: 4 * 1024 * 1024 * 1024 * 8 * 256
/// Maximum possible volume size with 4-byte offset: 32GB
/// Formula: 4 * 1024 * 1024 * 1024 * 8
pub const MAX_POSSIBLE_VOLUME_SIZE: u64 = 4 * 1024 * 1024 * 1024 * 8 * 256;
// ============================================================================
@ -169,22 +169,19 @@ impl From<Size> for i32 {
// Offset (5 bytes)
// ============================================================================
/// 5-byte offset encoding for needle positions in .dat files.
/// 4-byte offset encoding for needle positions in .dat files (matching Go default build).
///
/// The offset is stored divided by NEEDLE_PADDING_SIZE (8), so 5 bytes can
/// address up to 8TB. The on-disk byte layout in .idx files is:
/// [b3][b2][b1][b0][b4] (big-endian lower 4 bytes, then highest byte)
/// The offset is stored divided by NEEDLE_PADDING_SIZE (8), so 4 bytes can
/// address up to 32GB. The on-disk byte layout in .idx files is:
/// [b3][b2][b1][b0] (big-endian 4 bytes)
///
/// actual_offset = stored_value * 8
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub struct Offset {
/// Lower 4 bytes stored as b3(MSB)..b0(LSB)
pub b0: u8,
pub b1: u8,
pub b2: u8,
pub b3: u8,
/// Highest byte (5th byte)
pub b4: u8,
}
impl Offset {
@ -193,8 +190,7 @@ impl Offset {
let stored = self.b0 as i64
+ (self.b1 as i64) * 256
+ (self.b2 as i64) * 65536
+ (self.b3 as i64) * 16777216
+ (self.b4 as i64) * 4294967296;
+ (self.b3 as i64) * 16777216;
stored * NEEDLE_PADDING_SIZE as i64
}
@ -206,22 +202,20 @@ impl Offset {
b1: (smaller >> 8) as u8,
b2: (smaller >> 16) as u8,
b3: (smaller >> 24) as u8,
b4: (smaller >> 32) as u8,
}
}
/// Serialize to 5 bytes in the .idx file format.
/// Layout: [b3][b2][b1][b0][b4]
/// Serialize to 4 bytes in the .idx file format.
/// Layout: [b3][b2][b1][b0] (big-endian)
pub fn to_bytes(&self, bytes: &mut [u8]) {
assert!(bytes.len() >= OFFSET_SIZE);
bytes[0] = self.b3;
bytes[1] = self.b2;
bytes[2] = self.b1;
bytes[3] = self.b0;
bytes[4] = self.b4;
}
/// Deserialize from 5 bytes in the .idx file format.
/// Deserialize from 4 bytes in the .idx file format.
pub fn from_bytes(bytes: &[u8]) -> Self {
assert!(bytes.len() >= OFFSET_SIZE);
Offset {
@ -229,12 +223,11 @@ impl Offset {
b2: bytes[1],
b1: bytes[2],
b0: bytes[3],
b4: bytes[4],
}
}
pub fn is_zero(&self) -> bool {
self.b0 == 0 && self.b1 == 0 && self.b2 == 0 && self.b3 == 0 && self.b4 == 0
self.b0 == 0 && self.b1 == 0 && self.b2 == 0 && self.b3 == 0
}
}
@ -467,8 +460,8 @@ mod tests {
#[test]
fn test_offset_max() {
// Max 5-byte stored value = 2^40 - 1
let max_stored: i64 = (1i64 << 40) - 1;
// Max 4-byte stored value = 2^32 - 1
let max_stored: i64 = (1i64 << 32) - 1;
let max_actual = max_stored * NEEDLE_PADDING_SIZE as i64;
let offset = Offset::from_actual_offset(max_actual);
assert_eq!(offset.to_actual_offset(), max_actual);

23
seaweed-volume/src/storage/volume.rs

@ -658,6 +658,29 @@ impl Volume {
self.no_write_or_delete || self.no_write_can_delete
}
pub fn last_compact_revision(&self) -> u16 {
self.last_compact_revision
}
/// Read all live needles from the volume (for ReadAllNeedles streaming RPC).
pub fn read_all_needles(&self) -> Result<Vec<Needle>, VolumeError> {
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
let mut needles = Vec::new();
for (&key, nv) in nm.iter() {
if !nv.size.is_valid() {
continue; // skip deleted
}
let mut n = Needle {
id: key,
..Needle::default()
};
if let Ok(()) = self.read_needle_data_at(&mut n, nv.offset.to_actual_offset(), nv.size) {
needles.push(n);
}
}
Ok(needles)
}
/// Insert or update a needle index entry (for low-level blob writes).
pub fn put_needle_index(&mut self, key: NeedleId, offset: Offset, size: Size) -> Result<(), VolumeError> {
if let Some(ref mut nm) = self.nm {

Loading…
Cancel
Save