Browse Source

add redb disk-backed needle maps, binary search for incremental copy, and proxy/redirect read modes

- RedbNeedleMap: pure-Rust disk-backed needle map using redb, with NeedleMap
  enum wrapping both in-memory and redb variants
- binary_search_by_append_at_ns: port of Go's BinarySearchByAppendAtNs for
  VolumeIncrementalCopy with since_ns > 0
- Proxy/redirect: master volume lookup, HTTP proxy forwarding with ?proxied=true,
  and 301 redirects for non-local volumes based on ReadMode config
- Wire new VolumeServerState fields: read_mode, master_url, self_url, http_client
rust-volume-server
Chris Lu 3 days ago
parent
commit
a184711b3a
  1. 10
      seaweed-volume/Cargo.lock
  2. 5
      seaweed-volume/Cargo.toml
  3. 7
      seaweed-volume/src/main.rs
  4. 30
      seaweed-volume/src/server/grpc_server.rs
  5. 240
      seaweed-volume/src/server/handlers.rs
  6. 9
      seaweed-volume/src/server/volume_server.rs
  7. 4
      seaweed-volume/src/server/write_queue.rs
  8. 725
      seaweed-volume/src/storage/needle_map.rs
  9. 248
      seaweed-volume/src/storage/volume.rs
  10. 4
      seaweed-volume/tests/http_integration.rs

10
seaweed-volume/Cargo.lock

@ -2971,6 +2971,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "redb"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae323eb086579a3769daa2c753bb96deb95993c534711e0dbe881b5192906a06"
dependencies = [
"libc",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
@ -3349,6 +3358,7 @@ dependencies = [
"prost",
"prost-types",
"rand 0.8.5",
"redb",
"reed-solomon-erasure",
"reqwest",
"rustls 0.23.37",

5
seaweed-volume/Cargo.toml

@ -41,6 +41,9 @@ rustls-pemfile = "2"
# Using rusty-leveldb for pure Rust LevelDB
rusty-leveldb = "3"
# Disk-backed needle map (alternative to in-memory HashMap)
redb = "3"
# Reed-Solomon erasure coding
reed-solomon-erasure = "6"
@ -65,7 +68,7 @@ memmap2 = "0.9"
uuid = { version = "1", features = ["v4"] }
# HTTP client (for proxying, remote fetch)
reqwest = { version = "0.12", features = ["rustls-tls", "stream", "multipart"] }
reqwest = { version = "0.12", features = ["rustls-tls", "stream", "multipart", "json"] }
# Content hashing
md-5 = "0.10"

7
seaweed-volume/src/main.rs

@ -98,6 +98,9 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
SigningKey(config.jwt_read_signing_key.clone()),
config.jwt_read_signing_expires_seconds,
);
let master_url = config.masters.first().cloned().unwrap_or_default();
let self_url = format!("{}:{}", config.ip, config.port);
let state = Arc::new(VolumeServerState {
store: RwLock::new(store),
guard,
@ -123,6 +126,10 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error
s3_tier_registry: std::sync::RwLock::new(
seaweed_volume::remote_storage::s3_tier::S3TierRegistry::new(),
),
read_mode: config.read_mode,
master_url,
self_url,
http_client: reqwest::Client::new(),
});
// Initialize the batched write queue if enabled

30
seaweed-volume/src/server/grpc_server.rs

@ -394,16 +394,40 @@ impl VolumeServer for VolumeGrpcService {
let dat_size = v.dat_file_size().unwrap_or(0);
let dat_path = v.file_name(".dat");
let super_block_size = v.super_block.block_size() as u64;
drop(store);
// If since_ns is very large (after all data), return empty
if req.since_ns == u64::MAX || dat_size <= super_block_size {
drop(store);
let stream = tokio_stream::iter(Vec::new());
return Ok(Response::new(Box::pin(stream)));
}
// For since_ns=0, start from super block end; otherwise would need binary search
let start_offset = super_block_size;
// Use binary search to find the starting offset
let start_offset = if req.since_ns == 0 {
super_block_size
} else {
match v.binary_search_by_append_at_ns(req.since_ns) {
Ok((_offset, true)) => {
// All entries are before since_ns — nothing to send
drop(store);
let stream = tokio_stream::iter(Vec::new());
return Ok(Response::new(Box::pin(stream)));
}
Ok((offset, false)) => {
let actual = offset.to_actual_offset();
if actual <= 0 {
super_block_size
} else {
actual as u64
}
}
Err(_e) => {
// On error, fall back to streaming from superblock end
super_block_size
}
}
};
drop(store);
// Read the .dat file
let file = std::fs::File::open(&dat_path)

240
seaweed-volume/src/server/handlers.rs

@ -14,6 +14,7 @@ use axum::http::{header, HeaderMap, Method, Request, StatusCode};
use axum::response::{IntoResponse, Response};
use serde::{Deserialize, Serialize};
use crate::config::ReadMode;
use crate::metrics;
use crate::storage::needle::needle::Needle;
use crate::storage::types::*;
@ -226,6 +227,193 @@ fn parse_url_path(path: &str) -> Option<(VolumeId, NeedleId, Cookie)> {
Some((vid, needle_id, cookie))
}
// ============================================================================
// Volume Lookup + Proxy/Redirect
// ============================================================================
/// A volume location returned by master lookup.
#[derive(Debug, Deserialize)]
struct VolumeLocation {
url: String,
#[serde(rename = "publicUrl")]
public_url: String,
}
/// Master /dir/lookup response.
#[derive(Debug, Deserialize)]
struct LookupResult {
#[serde(default)]
locations: Option<Vec<VolumeLocation>>,
#[serde(default)]
error: Option<String>,
}
/// Look up volume locations from the master via HTTP /dir/lookup.
async fn lookup_volume(
client: &reqwest::Client,
master_url: &str,
volume_id: u32,
) -> Result<Vec<VolumeLocation>, String> {
let url = format!("http://{}/dir/lookup?volumeId={}", master_url, volume_id);
let resp = client.get(&url).send().await.map_err(|e| format!("lookup request failed: {}", e))?;
let result: LookupResult = resp.json().await.map_err(|e| format!("lookup parse failed: {}", e))?;
if let Some(err) = result.error {
if !err.is_empty() {
return Err(err);
}
}
Ok(result.locations.unwrap_or_default())
}
/// Extracted request info needed for proxy/redirect (avoids borrowing Request across await).
struct ProxyRequestInfo {
original_headers: HeaderMap,
original_query: String,
path: String,
vid_str: String,
fid_str: String,
}
/// Handle proxy or redirect for a non-local volume read.
async fn proxy_or_redirect_to_target(
state: &VolumeServerState,
info: ProxyRequestInfo,
vid: VolumeId,
) -> Response {
// Look up volume locations from master
let locations = match lookup_volume(&state.http_client, &state.master_url, vid.0).await {
Ok(locs) => locs,
Err(e) => {
tracing::warn!("volume lookup failed for {}: {}", vid.0, e);
return StatusCode::NOT_FOUND.into_response();
}
};
if locations.is_empty() {
return StatusCode::NOT_FOUND.into_response();
}
// Filter out self, then shuffle remaining
let mut candidates: Vec<&VolumeLocation> = locations
.iter()
.filter(|loc| !loc.url.contains(&state.self_url))
.collect();
if candidates.is_empty() {
return StatusCode::NOT_FOUND.into_response();
}
// Shuffle for load balancing
if candidates.len() >= 2 {
use rand::seq::SliceRandom;
let mut rng = rand::thread_rng();
candidates.shuffle(&mut rng);
}
let target = candidates[0];
match state.read_mode {
ReadMode::Proxy => {
proxy_request(state, &info, target).await
}
ReadMode::Redirect => {
redirect_request(&info, target)
}
ReadMode::Local => unreachable!(),
}
}
/// Proxy the request to the target volume server.
async fn proxy_request(
state: &VolumeServerState,
info: &ProxyRequestInfo,
target: &VolumeLocation,
) -> Response {
// Build target URL, adding proxied=true query param
let scheme = "http";
let target_host = &target.url;
let path = info.path.trim_start_matches('/');
let target_url = if info.original_query.is_empty() {
format!("{}://{}/{}?proxied=true", scheme, target_host, path)
} else {
format!("{}://{}/{}?{}&proxied=true", scheme, target_host, path, info.original_query)
};
// Build the proxy request
let mut req_builder = state.http_client.get(&target_url);
// Forward all original headers
for (name, value) in &info.original_headers {
if let Ok(v) = value.to_str() {
req_builder = req_builder.header(name.as_str(), v);
}
}
let resp = match req_builder.send().await {
Ok(r) => r,
Err(e) => {
tracing::warn!("proxy request to {} failed: {}", target_url, e);
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
};
// Build response, copying headers and body from remote
let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let mut response_headers = HeaderMap::new();
for (name, value) in resp.headers() {
if name.as_str().eq_ignore_ascii_case("server") {
continue;
}
response_headers.insert(name.clone(), value.clone());
}
let body_bytes = match resp.bytes().await {
Ok(b) => b,
Err(e) => {
tracing::warn!("proxy response read failed: {}", e);
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
};
let mut response = Response::new(Body::from(body_bytes));
*response.status_mut() = status;
*response.headers_mut() = response_headers;
response
}
/// Return a redirect response to the target volume server.
fn redirect_request(
info: &ProxyRequestInfo,
target: &VolumeLocation,
) -> Response {
let scheme = "http";
let target_host = &target.public_url;
// Build query string: preserve collection, add proxied=true, drop readDeleted (Go parity)
let mut query_params = Vec::new();
if !info.original_query.is_empty() {
for param in info.original_query.split('&') {
if let Some((key, value)) = param.split_once('=') {
if key == "collection" {
query_params.push(format!("collection={}", value));
}
// Intentionally drop readDeleted and other params (Go parity)
}
}
}
query_params.push("proxied=true".to_string());
let query = query_params.join("&");
let location = format!("{}://{}/{},{}?{}", scheme, target_host, &info.vid_str, &info.fid_str, query);
Response::builder()
.status(StatusCode::MOVED_PERMANENTLY)
.header("Location", &location)
.body(Body::from(format!("<a href=\"{}\">Moved Permanently</a>.\n\n", location)))
.unwrap_or_else(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response())
}
// ============================================================================
// Query parameters
// ============================================================================
@ -320,6 +508,58 @@ async fn get_or_head_handler_inner(
return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response();
}
// Check if volume exists locally; if not, proxy/redirect based on read_mode.
// This mirrors Go's hasVolume check in GetOrHeadHandler.
// NOTE: The RwLockReadGuard must be dropped before any .await to keep the future Send.
let has_volume = state.store.read().unwrap().has_volume(vid);
if !has_volume {
// Check if already proxied (loop prevention)
let query_string = request.uri().query().unwrap_or("").to_string();
let is_proxied = query_string.contains("proxied=true");
if is_proxied || state.read_mode == ReadMode::Local || state.master_url.is_empty() {
return StatusCode::NOT_FOUND.into_response();
}
// Extract vid_str and fid_str from path for redirect URL construction.
// For redirect, fid must be stripped of extension (Go parity: parseURLPath returns raw fid).
let trimmed = path.trim_start_matches('/');
let (vid_str, fid_str) = if let Some(pos) = trimmed.find(',') {
let raw_fid = &trimmed[pos + 1..];
// Strip filename after slash: "fid/filename.ext" -> "fid"
let fid = if let Some(slash) = raw_fid.find('/') {
&raw_fid[..slash]
} else if let Some(dot) = raw_fid.rfind('.') {
// Strip extension: "fid.ext" -> "fid"
&raw_fid[..dot]
} else {
raw_fid
};
(trimmed[..pos].to_string(), fid.to_string())
} else if let Some(pos) = trimmed.find('/') {
let after = &trimmed[pos + 1..];
let fid_part = if let Some(slash) = after.find('/') {
&after[..slash]
} else {
after
};
(trimmed[..pos].to_string(), fid_part.to_string())
} else {
return StatusCode::NOT_FOUND.into_response();
};
let info = ProxyRequestInfo {
original_headers: request.headers().clone(),
original_query: query_string,
path: path.clone(),
vid_str,
fid_str,
};
return proxy_or_redirect_to_target(&state, info, vid).await;
}
// Download throttling
let download_guard = if state.concurrent_download_limit > 0 {
let timeout = if state.inflight_download_data_timeout.is_zero() {

9
seaweed-volume/src/server/volume_server.rs

@ -21,6 +21,7 @@ use axum::{
http::{StatusCode, HeaderValue, Method},
};
use crate::config::ReadMode;
use crate::security::Guard;
use crate::storage::store::Store;
@ -65,6 +66,14 @@ pub struct VolumeServerState {
pub write_queue: std::sync::OnceLock<WriteQueue>,
/// Registry of S3 tier backends for tiered storage operations.
pub s3_tier_registry: std::sync::RwLock<crate::remote_storage::s3_tier::S3TierRegistry>,
/// Read mode: local, proxy, or redirect for non-local volumes.
pub read_mode: ReadMode,
/// First master address for volume lookups (e.g., "localhost:9333").
pub master_url: String,
/// This server's own address (ip:port) for filtering self from lookup results.
pub self_url: String,
/// HTTP client for proxy requests and master lookups.
pub http_client: reqwest::Client,
}
impl VolumeServerState {

4
seaweed-volume/src/server/write_queue.rs

@ -204,6 +204,10 @@ mod tests {
volume_state_notify: tokio::sync::Notify::new(),
write_queue: std::sync::OnceLock::new(),
s3_tier_registry: std::sync::RwLock::new(crate::remote_storage::s3_tier::S3TierRegistry::new()),
read_mode: crate::config::ReadMode::Local,
master_url: String::new(),
self_url: String::new(),
http_client: reqwest::Client::new(),
})
}

725
seaweed-volume/src/storage/needle_map.rs

@ -1,5 +1,10 @@
//! NeedleMapper: in-memory index mapping NeedleId → (Offset, Size).
//! NeedleMapper: index mapping NeedleId -> (Offset, Size).
//!
//! Two implementations:
//! - `CompactNeedleMap`: in-memory HashMap (fast, uses more RAM)
//! - `RedbNeedleMap`: disk-backed via redb (low RAM, slightly slower)
//!
//! The `NeedleMap` enum wraps both and provides a uniform interface.
//! Loaded from .idx file on volume mount. Supports Get, Put, Delete with
//! metrics tracking (file count, byte count, deleted count, deleted bytes).
@ -7,6 +12,8 @@ use std::collections::HashMap;
use std::io::{self, Read, Seek, Write};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
use crate::storage::idx;
use crate::storage::types::*;
@ -20,6 +27,23 @@ pub struct NeedleValue {
pub size: Size,
}
/// Pack an (Offset, Size) pair into 8 bytes for redb storage.
/// Layout: [offset 4 bytes big-endian] [size 4 bytes big-endian]
fn pack_needle_value(nv: &NeedleValue) -> [u8; 8] {
let mut buf = [0u8; 8];
nv.offset.to_bytes(&mut buf[..4]);
nv.size.to_bytes(&mut buf[4..8]);
buf
}
/// Unpack 8 bytes from redb storage into (Offset, Size).
fn unpack_needle_value(bytes: &[u8; 8]) -> NeedleValue {
NeedleValue {
offset: Offset::from_bytes(&bytes[..4]),
size: Size::from_bytes(&bytes[4..8]),
}
}
// ============================================================================
// NeedleMapMetric
// ============================================================================
@ -86,18 +110,9 @@ pub enum NeedleMapKind {
}
// ============================================================================
// CompactNeedleMap (in-memory)
// IdxFileWriter trait
// ============================================================================
/// In-memory needle map backed by a HashMap.
/// The .idx file is kept open for append-only writes.
pub struct CompactNeedleMap {
map: HashMap<NeedleId, NeedleValue>,
metric: NeedleMapMetric,
idx_file: Option<Box<dyn IdxFileWriter>>,
idx_file_offset: u64,
}
/// Trait for appending to an index file.
pub trait IdxFileWriter: Write + Send + Sync {
fn sync_all(&self) -> io::Result<()>;
@ -109,6 +124,19 @@ impl IdxFileWriter for std::fs::File {
}
}
// ============================================================================
// CompactNeedleMap (in-memory)
// ============================================================================
/// In-memory needle map backed by a HashMap.
/// The .idx file is kept open for append-only writes.
pub struct CompactNeedleMap {
map: HashMap<NeedleId, NeedleValue>,
metric: NeedleMapMetric,
idx_file: Option<Box<dyn IdxFileWriter>>,
idx_file_offset: u64,
}
impl CompactNeedleMap {
/// Create a new empty in-memory map.
pub fn new() -> Self {
@ -158,8 +186,8 @@ impl CompactNeedleMap {
}
/// Look up a needle.
pub fn get(&self, key: NeedleId) -> Option<&NeedleValue> {
self.map.get(&key)
pub fn get(&self, key: NeedleId) -> Option<NeedleValue> {
self.map.get(&key).cloned()
}
/// Mark a needle as deleted. Appends tombstone to .idx file.
@ -280,6 +308,482 @@ impl CompactNeedleMap {
}
}
// ============================================================================
// RedbNeedleMap (disk-backed via redb)
// ============================================================================
/// redb table: NeedleId (u64) -> packed [offset(4) + size(4)]
const NEEDLE_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("needles");
/// Disk-backed needle map using redb.
/// Low memory usage — data lives on disk with redb's page cache.
pub struct RedbNeedleMap {
db: Database,
metric: NeedleMapMetric,
idx_file: Option<Box<dyn IdxFileWriter>>,
idx_file_offset: u64,
}
impl RedbNeedleMap {
/// Create a new redb-backed needle map at the given path.
/// The database file will be created if it does not exist.
pub fn new(db_path: &str) -> io::Result<Self> {
let db = Database::create(db_path).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb create error: {}", e))
})?;
// Ensure the table exists
let txn = db.begin_write().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb begin_write: {}", e))
})?;
{
let _table = txn.open_table(NEEDLE_TABLE).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e))
})?;
}
txn.commit().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e))
})?;
Ok(RedbNeedleMap {
db,
metric: NeedleMapMetric::default(),
idx_file: None,
idx_file_offset: 0,
})
}
/// Load from an .idx file, populating the redb database.
pub fn load_from_idx<R: Read + Seek>(db_path: &str, reader: &mut R) -> io::Result<Self> {
let nm = RedbNeedleMap::new(db_path)?;
// Collect entries from idx file, resolving duplicates/deletions
let mut entries: HashMap<NeedleId, Option<NeedleValue>> = HashMap::new();
idx::walk_index_file(reader, 0, |key, offset, size| {
if offset.is_zero() || size.is_deleted() {
entries.insert(key, None);
} else {
entries.insert(key, Some(NeedleValue { offset, size }));
}
Ok(())
})?;
// Write all live entries to redb in a single transaction
let txn = nm.db.begin_write().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb begin_write: {}", e))
})?;
{
let mut table = txn.open_table(NEEDLE_TABLE).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e))
})?;
for (key, maybe_nv) in &entries {
let key_u64: u64 = (*key).into();
if let Some(nv) = maybe_nv {
let packed = pack_needle_value(nv);
table.insert(key_u64, packed.as_slice()).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb insert: {}", e))
})?;
nm.metric.on_put(*key, None, nv.size);
} else {
// Entry was deleted — remove from redb if present
table.remove(key_u64).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb remove: {}", e))
})?;
}
}
}
txn.commit().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e))
})?;
Ok(nm)
}
/// Set the index file for append-only writes.
pub fn set_idx_file(&mut self, file: Box<dyn IdxFileWriter>, offset: u64) {
self.idx_file = Some(file);
self.idx_file_offset = offset;
}
// ---- Map operations ----
/// Insert or update an entry. Writes to idx file first, then redb.
pub fn put(&mut self, key: NeedleId, offset: Offset, size: Size) -> io::Result<()> {
// Persist to idx file BEFORE mutating redb state for crash consistency
if let Some(ref mut idx_file) = self.idx_file {
idx::write_index_entry(idx_file, key, offset, size)?;
self.idx_file_offset += NEEDLE_MAP_ENTRY_SIZE as u64;
}
let key_u64: u64 = key.into();
let nv = NeedleValue { offset, size };
let packed = pack_needle_value(&nv);
// Read old value for metric update
let old = self.get_internal(key_u64)?;
let txn = self.db.begin_write().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb begin_write: {}", e))
})?;
{
let mut table = txn.open_table(NEEDLE_TABLE).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e))
})?;
table.insert(key_u64, packed.as_slice()).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb insert: {}", e))
})?;
}
txn.commit().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e))
})?;
self.metric.on_put(key, old.as_ref(), size);
Ok(())
}
/// Look up a needle.
pub fn get(&self, key: NeedleId) -> Option<NeedleValue> {
let key_u64: u64 = key.into();
self.get_internal(key_u64).ok().flatten()
}
/// Internal get that returns io::Result for error propagation.
fn get_internal(&self, key_u64: u64) -> io::Result<Option<NeedleValue>> {
let txn = self.db.begin_read().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb begin_read: {}", e))
})?;
let table = txn.open_table(NEEDLE_TABLE).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e))
})?;
match table.get(key_u64) {
Ok(Some(guard)) => {
let bytes: &[u8] = guard.value();
if bytes.len() == 8 {
let mut arr = [0u8; 8];
arr.copy_from_slice(bytes);
Ok(Some(unpack_needle_value(&arr)))
} else {
Ok(None)
}
}
Ok(None) => Ok(None),
Err(e) => Err(io::Error::new(io::ErrorKind::Other, format!("redb get: {}", e))),
}
}
/// Mark a needle as deleted. Appends tombstone to .idx file, negates size in redb.
pub fn delete(&mut self, key: NeedleId, offset: Offset) -> io::Result<Option<Size>> {
let key_u64: u64 = key.into();
if let Some(old) = self.get_internal(key_u64)? {
if old.size.is_valid() {
// Persist tombstone to idx file BEFORE mutating redb
if let Some(ref mut idx_file) = self.idx_file {
idx::write_index_entry(idx_file, key, offset, TOMBSTONE_FILE_SIZE)?;
self.idx_file_offset += NEEDLE_MAP_ENTRY_SIZE as u64;
}
self.metric.on_delete(&old);
let deleted_size = Size(-(old.size.0));
// Keep original offset so readDeleted can find original data (matching Go behavior)
let deleted_nv = NeedleValue { offset: old.offset, size: deleted_size };
let packed = pack_needle_value(&deleted_nv);
let txn = self.db.begin_write().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb begin_write: {}", e))
})?;
{
let mut table = txn.open_table(NEEDLE_TABLE).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e))
})?;
table.insert(key_u64, packed.as_slice()).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb insert: {}", e))
})?;
}
txn.commit().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e))
})?;
return Ok(Some(old.size));
}
}
Ok(None)
}
// ---- Metrics accessors ----
pub fn content_size(&self) -> u64 {
self.metric.file_byte_count.load(Ordering::Relaxed)
}
pub fn deleted_size(&self) -> u64 {
self.metric.deletion_byte_count.load(Ordering::Relaxed)
}
pub fn file_count(&self) -> i64 {
self.metric.file_count.load(Ordering::Relaxed)
}
pub fn deleted_count(&self) -> i64 {
self.metric.deletion_count.load(Ordering::Relaxed)
}
pub fn max_file_key(&self) -> NeedleId {
NeedleId(self.metric.max_file_key.load(Ordering::Relaxed))
}
pub fn index_file_size(&self) -> u64 {
self.idx_file_offset
}
/// Sync index file to disk.
pub fn sync(&self) -> io::Result<()> {
if let Some(ref idx_file) = self.idx_file {
idx_file.sync_all()?;
}
Ok(())
}
/// Close index file.
pub fn close(&mut self) {
let _ = self.sync();
self.idx_file = None;
}
/// Save the redb contents to an index file, sorted by needle ID ascending.
pub fn save_to_idx(&self, path: &str) -> io::Result<()> {
let txn = self.db.begin_read().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb begin_read: {}", e))
})?;
let table = txn.open_table(NEEDLE_TABLE).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e))
})?;
let mut file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)?;
// redb iterates in key order (u64 ascending)
let iter = table.iter().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb iter: {}", e))
})?;
for entry in iter {
let (key_guard, val_guard) = entry.map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("redb iter next: {}", e))
})?;
let key_u64: u64 = key_guard.value();
let bytes: &[u8] = val_guard.value();
if bytes.len() == 8 {
let mut arr = [0u8; 8];
arr.copy_from_slice(bytes);
let nv = unpack_needle_value(&arr);
if nv.size.is_valid() {
idx::write_index_entry(&mut file, NeedleId(key_u64), nv.offset, nv.size)?;
}
}
}
file.sync_all()?;
Ok(())
}
/// Visit all entries in ascending order by needle ID.
pub fn ascending_visit<F>(&self, mut f: F) -> Result<(), String>
where
F: FnMut(NeedleId, &NeedleValue) -> Result<(), String>,
{
let txn = self.db.begin_read().map_err(|e| format!("redb begin_read: {}", e))?;
let table = txn.open_table(NEEDLE_TABLE).map_err(|e| format!("redb open_table: {}", e))?;
let iter = table.iter().map_err(|e| format!("redb iter: {}", e))?;
for entry in iter {
let (key_guard, val_guard) = entry.map_err(|e| format!("redb iter next: {}", e))?;
let key_u64: u64 = key_guard.value();
let bytes: &[u8] = val_guard.value();
if bytes.len() == 8 {
let mut arr = [0u8; 8];
arr.copy_from_slice(bytes);
let nv = unpack_needle_value(&arr);
f(NeedleId(key_u64), &nv)?;
}
}
Ok(())
}
/// Collect all entries as a Vec for iteration (used by volume.rs iter patterns).
pub fn collect_entries(&self) -> Vec<(NeedleId, NeedleValue)> {
let mut result = Vec::new();
let txn: redb::ReadTransaction = match self.db.begin_read() {
Ok(t) => t,
Err(_) => return result,
};
let table = match txn.open_table(NEEDLE_TABLE) {
Ok(t) => t,
Err(_) => return result,
};
let iter = match table.iter() {
Ok(i) => i,
Err(_) => return result,
};
for entry in iter {
if let Ok((key_guard, val_guard)) = entry {
let key_u64: u64 = key_guard.value();
let bytes: &[u8] = val_guard.value();
if bytes.len() == 8 {
let mut arr = [0u8; 8];
arr.copy_from_slice(bytes);
let nv = unpack_needle_value(&arr);
result.push((NeedleId(key_u64), nv));
}
}
}
result
}
}
// ============================================================================
// NeedleMap enum — unified interface over both implementations
// ============================================================================
/// Unified needle map wrapping either in-memory or redb-backed storage.
pub enum NeedleMap {
InMemory(CompactNeedleMap),
Redb(RedbNeedleMap),
}
impl NeedleMap {
/// Insert or update an entry.
pub fn put(&mut self, key: NeedleId, offset: Offset, size: Size) -> io::Result<()> {
match self {
NeedleMap::InMemory(nm) => nm.put(key, offset, size),
NeedleMap::Redb(nm) => nm.put(key, offset, size),
}
}
/// Look up a needle.
pub fn get(&self, key: NeedleId) -> Option<NeedleValue> {
match self {
NeedleMap::InMemory(nm) => nm.get(key),
NeedleMap::Redb(nm) => nm.get(key),
}
}
/// Mark a needle as deleted.
pub fn delete(&mut self, key: NeedleId, offset: Offset) -> io::Result<Option<Size>> {
match self {
NeedleMap::InMemory(nm) => nm.delete(key, offset),
NeedleMap::Redb(nm) => nm.delete(key, offset),
}
}
/// Set the index file for append-only writes.
pub fn set_idx_file(&mut self, file: Box<dyn IdxFileWriter>, offset: u64) {
match self {
NeedleMap::InMemory(nm) => nm.set_idx_file(file, offset),
NeedleMap::Redb(nm) => nm.set_idx_file(file, offset),
}
}
/// Content byte count.
pub fn content_size(&self) -> u64 {
match self {
NeedleMap::InMemory(nm) => nm.content_size(),
NeedleMap::Redb(nm) => nm.content_size(),
}
}
/// Deleted byte count.
pub fn deleted_size(&self) -> u64 {
match self {
NeedleMap::InMemory(nm) => nm.deleted_size(),
NeedleMap::Redb(nm) => nm.deleted_size(),
}
}
/// Live file count.
pub fn file_count(&self) -> i64 {
match self {
NeedleMap::InMemory(nm) => nm.file_count(),
NeedleMap::Redb(nm) => nm.file_count(),
}
}
/// Deleted file count.
pub fn deleted_count(&self) -> i64 {
match self {
NeedleMap::InMemory(nm) => nm.deleted_count(),
NeedleMap::Redb(nm) => nm.deleted_count(),
}
}
/// Maximum needle ID seen.
pub fn max_file_key(&self) -> NeedleId {
match self {
NeedleMap::InMemory(nm) => nm.max_file_key(),
NeedleMap::Redb(nm) => nm.max_file_key(),
}
}
/// Index file size in bytes.
pub fn index_file_size(&self) -> u64 {
match self {
NeedleMap::InMemory(nm) => nm.index_file_size(),
NeedleMap::Redb(nm) => nm.index_file_size(),
}
}
/// Sync index file to disk.
pub fn sync(&self) -> io::Result<()> {
match self {
NeedleMap::InMemory(nm) => nm.sync(),
NeedleMap::Redb(nm) => nm.sync(),
}
}
/// Close index file.
pub fn close(&mut self) {
match self {
NeedleMap::InMemory(nm) => nm.close(),
NeedleMap::Redb(nm) => nm.close(),
}
}
/// Save to an index file.
pub fn save_to_idx(&self, path: &str) -> io::Result<()> {
match self {
NeedleMap::InMemory(nm) => nm.save_to_idx(path),
NeedleMap::Redb(nm) => nm.save_to_idx(path),
}
}
/// Visit all entries in ascending order by needle ID.
pub fn ascending_visit<F>(&self, f: F) -> Result<(), String>
where
F: FnMut(NeedleId, &NeedleValue) -> Result<(), String>,
{
match self {
NeedleMap::InMemory(nm) => nm.ascending_visit(f),
NeedleMap::Redb(nm) => nm.ascending_visit(f),
}
}
/// Iterate all entries. Returns a Vec of (NeedleId, NeedleValue) pairs.
/// For InMemory this collects from the HashMap; for Redb it reads from disk.
pub fn iter_entries(&self) -> Vec<(NeedleId, NeedleValue)> {
match self {
NeedleMap::InMemory(nm) => nm.iter().map(|(&id, &nv)| (id, nv)).collect(),
NeedleMap::Redb(nm) => nm.collect_entries(),
}
}
}
// ============================================================================
// Tests
// ============================================================================
#[cfg(test)]
mod tests {
use super::*;
@ -371,4 +875,199 @@ mod tests {
assert_eq!(r2, None);
assert_eq!(nm.deleted_count(), 1); // not double counted
}
// ---- RedbNeedleMap tests ----
#[test]
fn test_redb_needle_map_put_get() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.rdb");
let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap();
let v1 = nm.get(NeedleId(1)).unwrap();
assert_eq!(v1.size, Size(100));
let v2 = nm.get(NeedleId(2)).unwrap();
assert_eq!(v2.size, Size(200));
assert!(nm.get(NeedleId(99)).is_none());
}
#[test]
fn test_redb_needle_map_delete() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.rdb");
let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
assert_eq!(nm.file_count(), 1);
assert_eq!(nm.content_size(), 100);
let deleted = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
assert_eq!(deleted, Some(Size(100)));
assert_eq!(nm.file_count(), 0);
assert_eq!(nm.deleted_count(), 1);
assert_eq!(nm.deleted_size(), 100);
// Deleted entry should have negated size
let nv = nm.get(NeedleId(1)).unwrap();
assert_eq!(nv.size, Size(-100));
}
#[test]
fn test_redb_needle_map_metrics() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.rdb");
let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap();
nm.put(NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap();
assert_eq!(nm.file_count(), 3);
assert_eq!(nm.content_size(), 600);
assert_eq!(nm.max_file_key(), NeedleId(3));
// Update existing
nm.put(NeedleId(2), Offset::from_actual_offset(700), Size(250)).unwrap();
assert_eq!(nm.file_count(), 3);
assert_eq!(nm.content_size(), 650); // 100 + 250 + 300
// Delete
nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
assert_eq!(nm.file_count(), 2);
assert_eq!(nm.deleted_count(), 1);
}
#[test]
fn test_redb_needle_map_load_from_idx() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.rdb");
let mut idx_data = Vec::new();
idx::write_index_entry(&mut idx_data, NeedleId(1), Offset::from_actual_offset(8), Size(100)).unwrap();
idx::write_index_entry(&mut idx_data, NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap();
idx::write_index_entry(&mut idx_data, NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap();
// Delete needle 2
idx::write_index_entry(&mut idx_data, NeedleId(2), Offset::default(), TOMBSTONE_FILE_SIZE).unwrap();
let mut cursor = Cursor::new(idx_data);
let nm = RedbNeedleMap::load_from_idx(db_path.to_str().unwrap(), &mut cursor).unwrap();
assert!(nm.get(NeedleId(1)).is_some());
assert!(nm.get(NeedleId(2)).is_none()); // deleted and removed
assert!(nm.get(NeedleId(3)).is_some());
assert_eq!(nm.file_count(), 2);
}
#[test]
fn test_redb_needle_map_double_delete() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.rdb");
let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
let r1 = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
assert_eq!(r1, Some(Size(100)));
// Second delete should return None (already deleted)
let r2 = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap();
assert_eq!(r2, None);
assert_eq!(nm.deleted_count(), 1); // not double counted
}
#[test]
fn test_redb_needle_map_ascending_visit() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.rdb");
let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap();
nm.put(NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap();
let mut visited = Vec::new();
nm.ascending_visit(|id, nv| {
visited.push((id, nv.size));
Ok(())
}).unwrap();
assert_eq!(visited.len(), 3);
assert_eq!(visited[0], (NeedleId(1), Size(100)));
assert_eq!(visited[1], (NeedleId(2), Size(200)));
assert_eq!(visited[2], (NeedleId(3), Size(300)));
}
#[test]
fn test_redb_needle_map_save_to_idx() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.rdb");
let idx_path = dir.path().join("test.idx");
let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap();
nm.put(NeedleId(1), Offset::from_actual_offset(8), Size(100)).unwrap();
nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap();
nm.put(NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap();
// Delete needle 2
nm.delete(NeedleId(2), Offset::from_actual_offset(128)).unwrap();
nm.save_to_idx(idx_path.to_str().unwrap()).unwrap();
// Load back with CompactNeedleMap to verify
let mut idx_file = std::fs::File::open(&idx_path).unwrap();
let loaded = CompactNeedleMap::load_from_idx(&mut idx_file).unwrap();
assert_eq!(loaded.file_count(), 2); // only live entries
assert!(loaded.get(NeedleId(1)).is_some());
assert!(loaded.get(NeedleId(2)).is_none()); // deleted, not saved
assert!(loaded.get(NeedleId(3)).is_some());
}
#[test]
fn test_pack_unpack_needle_value() {
let nv = NeedleValue {
offset: Offset::from_actual_offset(8 * 1000),
size: Size(4096),
};
let packed = pack_needle_value(&nv);
let unpacked = unpack_needle_value(&packed);
assert_eq!(nv.offset.to_actual_offset(), unpacked.offset.to_actual_offset());
assert_eq!(nv.size, unpacked.size);
}
#[test]
fn test_pack_unpack_negative_size() {
let nv = NeedleValue {
offset: Offset::from_actual_offset(8 * 500),
size: Size(-100),
};
let packed = pack_needle_value(&nv);
let unpacked = unpack_needle_value(&packed);
assert_eq!(nv.offset.to_actual_offset(), unpacked.offset.to_actual_offset());
assert_eq!(nv.size, unpacked.size);
}
// ---- NeedleMap enum tests ----
#[test]
fn test_needle_map_enum_inmemory() {
let mut nm = NeedleMap::InMemory(CompactNeedleMap::new());
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
assert_eq!(nm.get(NeedleId(1)).unwrap().size, Size(100));
assert_eq!(nm.file_count(), 1);
}
#[test]
fn test_needle_map_enum_redb() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.rdb");
let mut nm = NeedleMap::Redb(RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap());
nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
assert_eq!(nm.get(NeedleId(1)).unwrap().size, Size(100));
assert_eq!(nm.file_count(), 1);
}
}

248
seaweed-volume/src/storage/volume.rs

@ -17,7 +17,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use tracing::warn;
use crate::storage::needle::needle::{self, Needle, NeedleError, get_actual_size};
use crate::storage::needle_map::{CompactNeedleMap, NeedleMapKind};
use crate::storage::needle_map::{CompactNeedleMap, NeedleMap, NeedleMapKind, RedbNeedleMap};
use crate::storage::super_block::{SuperBlock, ReplicaPlacement, SUPER_BLOCK_SIZE};
use crate::storage::types::*;
@ -240,7 +240,7 @@ pub struct Volume {
pub collection: String,
dat_file: Option<File>,
nm: Option<CompactNeedleMap>,
nm: Option<NeedleMap>,
needle_map_kind: NeedleMapKind,
pub super_block: SuperBlock,
@ -340,7 +340,7 @@ impl Volume {
pub fn file_name(&self, ext: &str) -> String {
match ext {
".idx" | ".cpx" | ".ldb" | ".cpldb" => {
".idx" | ".cpx" | ".ldb" | ".cpldb" | ".rdb" => {
format!("{}{}", self.index_file_name(), ext)
}
_ => {
@ -431,13 +431,10 @@ impl Volume {
}
fn load_index(&mut self) -> Result<(), VolumeError> {
if self.needle_map_kind != NeedleMapKind::InMemory {
warn!(
volume_id = self.id.0,
kind = ?self.needle_map_kind,
"only InMemory needle map is currently supported, falling back to InMemory"
);
}
let use_redb = matches!(
self.needle_map_kind,
NeedleMapKind::LevelDb | NeedleMapKind::LevelDbMedium | NeedleMapKind::LevelDbLarge
);
let idx_path = self.file_name(".idx");
@ -446,12 +443,23 @@ impl Volume {
fs::create_dir_all(parent)?;
}
if use_redb {
self.load_index_redb(&idx_path)?;
} else {
self.load_index_inmemory(&idx_path)?;
}
Ok(())
}
/// Load index using in-memory CompactNeedleMap.
fn load_index_inmemory(&mut self, idx_path: &str) -> Result<(), VolumeError> {
if self.no_write_or_delete {
// Open read-only
if Path::new(&idx_path).exists() {
let mut idx_file = File::open(&idx_path)?;
let nm = CompactNeedleMap::load_from_idx(&mut idx_file)?;
self.nm = Some(nm);
self.nm = Some(NeedleMap::InMemory(nm));
} else {
// Missing .idx with existing .dat could orphan needles
let dat_path = self.file_name(".dat");
@ -464,7 +472,7 @@ impl Volume {
);
}
}
self.nm = Some(CompactNeedleMap::new());
self.nm = Some(NeedleMap::InMemory(CompactNeedleMap::new()));
}
} else {
// Open read-write (create if missing)
@ -484,7 +492,56 @@ impl Volume {
.append(true)
.open(&idx_path)?;
nm.set_idx_file(Box::new(write_file), idx_size);
self.nm = Some(nm);
self.nm = Some(NeedleMap::InMemory(nm));
}
Ok(())
}
/// Load index using disk-backed RedbNeedleMap.
fn load_index_redb(&mut self, idx_path: &str) -> Result<(), VolumeError> {
// The redb database file is stored alongside the volume files
let rdb_path = self.file_name(".rdb");
if self.no_write_or_delete {
// Open read-only
if Path::new(&idx_path).exists() {
let mut idx_file = File::open(&idx_path)?;
let nm = RedbNeedleMap::load_from_idx(&rdb_path, &mut idx_file)?;
self.nm = Some(NeedleMap::Redb(nm));
} else {
// Missing .idx with existing .dat could orphan needles
let dat_path = self.file_name(".dat");
if Path::new(&dat_path).exists() {
let dat_size = fs::metadata(&dat_path).map(|m| m.len()).unwrap_or(0);
if dat_size > SUPER_BLOCK_SIZE as u64 {
warn!(
volume_id = self.id.0,
".idx file missing but .dat exists with data; needles may be orphaned"
);
}
}
self.nm = Some(NeedleMap::Redb(RedbNeedleMap::new(&rdb_path)?));
}
} else {
// Open read-write (create if missing)
let idx_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&idx_path)?;
let idx_size = idx_file.metadata()?.len();
let mut idx_reader = io::BufReader::new(&idx_file);
let mut nm = RedbNeedleMap::load_from_idx(&rdb_path, &mut idx_reader)?;
// Re-open for append-only writes
let write_file = OpenOptions::new()
.write(true)
.append(true)
.open(&idx_path)?;
nm.set_idx_file(Box::new(write_file), idx_size);
self.nm = Some(NeedleMap::Redb(nm));
}
Ok(())
@ -927,7 +984,7 @@ impl Volume {
pub fn read_all_needles(&self) -> Result<Vec<Needle>, VolumeError> {
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
let mut needles = Vec::new();
for (&key, nv) in nm.iter() {
for (key, nv) in nm.iter_entries() {
if !nv.size.is_valid() {
continue; // skip deleted
}
@ -954,7 +1011,7 @@ impl Volume {
let mut files_checked: u64 = 0;
let mut broken = Vec::new();
for (needle_id, nv) in nm.iter() {
for (needle_id, nv) in nm.iter_entries() {
if nv.offset.is_zero() || nv.size.is_deleted() {
continue;
}
@ -970,7 +1027,7 @@ impl Volume {
// Read and verify the needle (read_needle_data_at checks CRC via read_bytes/read_tail)
let mut n = Needle {
id: *needle_id,
id: needle_id,
..Needle::default()
};
match self.read_needle_data_at(&mut n, offset, nv.size) {
@ -1165,6 +1222,163 @@ impl Volume {
Ok(())
}
// ---- Binary search for incremental copy ----
/// Read a single index entry's offset from the .idx file by entry index.
fn read_offset_from_index(&self, m: i64) -> Result<Offset, VolumeError> {
let idx_path = self.file_name(".idx");
let idx_file = File::open(&idx_path)?;
let mut buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
let file_offset = m as u64 * NEEDLE_MAP_ENTRY_SIZE as u64;
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
idx_file.read_exact_at(&mut buf, file_offset)?;
}
#[cfg(not(unix))]
{
let mut f = idx_file;
f.seek(SeekFrom::Start(file_offset))?;
std::io::Read::read_exact(&mut f, &mut buf)?;
}
let (_key, offset, _size) = idx_entry_from_bytes(&buf);
Ok(offset)
}
/// Read the append_at_ns timestamp from a needle at the given offset in the .dat file.
fn read_append_at_ns(&self, offset: Offset) -> Result<u64, VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
let actual_offset = offset.to_actual_offset() as u64;
let version = self.version();
let mut header_buf = [0u8; NEEDLE_HEADER_SIZE];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut header_buf, actual_offset)?;
}
#[cfg(not(unix))]
{
read_exact_at(dat_file, &mut header_buf, actual_offset)?;
}
let (_cookie, _id, size) = Needle::parse_header(&header_buf);
if size.0 <= 0 {
return Ok(0);
}
let actual_size = get_actual_size(size, version);
let mut buf = vec![0u8; actual_size as usize];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut buf, actual_offset)?;
}
#[cfg(not(unix))]
{
read_exact_at(dat_file, &mut buf, actual_offset)?;
}
let mut n = Needle::default();
n.read_bytes_meta_only(&mut buf, offset.to_actual_offset(), size, version)?;
Ok(n.append_at_ns)
}
/// Search right from position m to find the first non-deleted entry.
fn read_right_ns(&self, m: i64, max: i64) -> Result<(i64, Offset, u64), VolumeError> {
let mut index = m;
loop {
index += 1;
if index >= max {
return Ok((index, Offset::default(), 0));
}
let offset = self.read_offset_from_index(index)?;
if !offset.is_zero() {
let ts = self.read_append_at_ns(offset)?;
return Ok((index, offset, ts));
}
}
}
/// Search left from position m to find the first non-deleted entry.
fn read_left_ns(&self, m: i64) -> Result<(i64, Offset, u64), VolumeError> {
let mut index = m;
loop {
index -= 1;
if index < 0 {
return Ok((index, Offset::default(), 0));
}
let offset = self.read_offset_from_index(index)?;
if !offset.is_zero() {
let ts = self.read_append_at_ns(offset)?;
return Ok((index, offset, ts));
}
}
}
/// Binary search through the .idx file to find the first needle
/// with append_at_ns > since_ns. Returns (offset, is_last).
/// Matches Go's BinarySearchByAppendAtNs in volume_backup.go.
pub fn binary_search_by_append_at_ns(&self, since_ns: u64) -> Result<(Offset, bool), VolumeError> {
let file_size = self.idx_file_size() as i64;
if file_size % NEEDLE_MAP_ENTRY_SIZE as i64 != 0 {
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::InvalidData,
format!("unexpected idx file size: {}", file_size),
)));
}
let entry_count = file_size / NEEDLE_MAP_ENTRY_SIZE as i64;
let mut l: i64 = 0;
let mut h: i64 = entry_count;
while l < h {
let m = (l + h) / 2;
if m == entry_count {
return Ok((Offset::default(), true));
}
let offset = self.read_offset_from_index(m)?;
if offset.is_zero() {
let (left_index, _left_offset, left_ns) = self.read_left_ns(m)?;
let (right_index, right_offset, right_ns) = self.read_right_ns(m, entry_count)?;
if right_ns <= since_ns {
l = right_index;
if l == entry_count {
return Ok((Offset::default(), true));
} else {
continue;
}
}
if since_ns < left_ns {
h = left_index + 1;
continue;
}
return Ok((right_offset, false));
}
let m_ns = self.read_append_at_ns(offset)?;
if m_ns <= since_ns {
l = m + 1;
} else {
h = m;
}
}
if l == entry_count {
return Ok((Offset::default(), true));
}
let offset = self.read_offset_from_index(l)?;
Ok((offset, false))
}
/// Write a raw needle blob at a specific offset in the .dat file.
pub fn write_needle_blob(&mut self, offset: i64, needle_blob: &[u8]) -> Result<(), VolumeError> {
if self.no_write_or_delete {
@ -1279,7 +1493,7 @@ impl Volume {
// Collect live entries from needle map (sorted ascending)
let nm = self.nm.as_ref().ok_or(VolumeError::NotInitialized)?;
let mut entries: Vec<(NeedleId, Offset, Size)> = Vec::new();
for (&id, nv) in nm.iter() {
for (id, nv) in nm.iter_entries() {
if nv.offset.is_zero() || nv.size.is_deleted() {
continue;
}

4
seaweed-volume/tests/http_integration.rs

@ -57,6 +57,10 @@ fn test_state() -> (Arc<VolumeServerState>, TempDir) {
s3_tier_registry: std::sync::RwLock::new(
seaweed_volume::remote_storage::s3_tier::S3TierRegistry::new(),
),
read_mode: seaweed_volume::config::ReadMode::Local,
master_url: String::new(),
self_url: String::new(),
http_client: reqwest::Client::new(),
});
(state, tmp)
}

Loading…
Cancel
Save