diff --git a/seaweed-volume/Cargo.lock b/seaweed-volume/Cargo.lock index 6bd1e6e96..df5fe6c84 100644 --- a/seaweed-volume/Cargo.lock +++ b/seaweed-volume/Cargo.lock @@ -2971,6 +2971,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redb" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae323eb086579a3769daa2c753bb96deb95993c534711e0dbe881b5192906a06" +dependencies = [ + "libc", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -3349,6 +3358,7 @@ dependencies = [ "prost", "prost-types", "rand 0.8.5", + "redb", "reed-solomon-erasure", "reqwest", "rustls 0.23.37", diff --git a/seaweed-volume/Cargo.toml b/seaweed-volume/Cargo.toml index 94dd917f2..e255286df 100644 --- a/seaweed-volume/Cargo.toml +++ b/seaweed-volume/Cargo.toml @@ -41,6 +41,9 @@ rustls-pemfile = "2" # Using rusty-leveldb for pure Rust LevelDB rusty-leveldb = "3" +# Disk-backed needle map (alternative to in-memory HashMap) +redb = "3" + # Reed-Solomon erasure coding reed-solomon-erasure = "6" @@ -65,7 +68,7 @@ memmap2 = "0.9" uuid = { version = "1", features = ["v4"] } # HTTP client (for proxying, remote fetch) -reqwest = { version = "0.12", features = ["rustls-tls", "stream", "multipart"] } +reqwest = { version = "0.12", features = ["rustls-tls", "stream", "multipart", "json"] } # Content hashing md-5 = "0.10" diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index c523ae898..b52fe5bcd 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -98,6 +98,9 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box Result<(), Box { + // All entries are before since_ns — nothing to send + drop(store); + let stream = tokio_stream::iter(Vec::new()); + return Ok(Response::new(Box::pin(stream))); + } + Ok((offset, false)) => { + let actual = offset.to_actual_offset(); + if actual <= 0 { + super_block_size + } else { + actual as u64 + } + } + Err(_e) => { + // On error, fall back to streaming from superblock end + super_block_size + } + } + }; + drop(store); // Read the .dat file let file = std::fs::File::open(&dat_path) diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index 97e7d305f..b7c884d15 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -14,6 +14,7 @@ use axum::http::{header, HeaderMap, Method, Request, StatusCode}; use axum::response::{IntoResponse, Response}; use serde::{Deserialize, Serialize}; +use crate::config::ReadMode; use crate::metrics; use crate::storage::needle::needle::Needle; use crate::storage::types::*; @@ -226,6 +227,193 @@ fn parse_url_path(path: &str) -> Option<(VolumeId, NeedleId, Cookie)> { Some((vid, needle_id, cookie)) } +// ============================================================================ +// Volume Lookup + Proxy/Redirect +// ============================================================================ + +/// A volume location returned by master lookup. +#[derive(Debug, Deserialize)] +struct VolumeLocation { + url: String, + #[serde(rename = "publicUrl")] + public_url: String, +} + +/// Master /dir/lookup response. +#[derive(Debug, Deserialize)] +struct LookupResult { + #[serde(default)] + locations: Option>, + #[serde(default)] + error: Option, +} + +/// Look up volume locations from the master via HTTP /dir/lookup. +async fn lookup_volume( + client: &reqwest::Client, + master_url: &str, + volume_id: u32, +) -> Result, String> { + let url = format!("http://{}/dir/lookup?volumeId={}", master_url, volume_id); + let resp = client.get(&url).send().await.map_err(|e| format!("lookup request failed: {}", e))?; + let result: LookupResult = resp.json().await.map_err(|e| format!("lookup parse failed: {}", e))?; + if let Some(err) = result.error { + if !err.is_empty() { + return Err(err); + } + } + Ok(result.locations.unwrap_or_default()) +} + +/// Extracted request info needed for proxy/redirect (avoids borrowing Request across await). +struct ProxyRequestInfo { + original_headers: HeaderMap, + original_query: String, + path: String, + vid_str: String, + fid_str: String, +} + +/// Handle proxy or redirect for a non-local volume read. +async fn proxy_or_redirect_to_target( + state: &VolumeServerState, + info: ProxyRequestInfo, + vid: VolumeId, +) -> Response { + // Look up volume locations from master + let locations = match lookup_volume(&state.http_client, &state.master_url, vid.0).await { + Ok(locs) => locs, + Err(e) => { + tracing::warn!("volume lookup failed for {}: {}", vid.0, e); + return StatusCode::NOT_FOUND.into_response(); + } + }; + + if locations.is_empty() { + return StatusCode::NOT_FOUND.into_response(); + } + + // Filter out self, then shuffle remaining + let mut candidates: Vec<&VolumeLocation> = locations + .iter() + .filter(|loc| !loc.url.contains(&state.self_url)) + .collect(); + + if candidates.is_empty() { + return StatusCode::NOT_FOUND.into_response(); + } + + // Shuffle for load balancing + if candidates.len() >= 2 { + use rand::seq::SliceRandom; + let mut rng = rand::thread_rng(); + candidates.shuffle(&mut rng); + } + + let target = candidates[0]; + + match state.read_mode { + ReadMode::Proxy => { + proxy_request(state, &info, target).await + } + ReadMode::Redirect => { + redirect_request(&info, target) + } + ReadMode::Local => unreachable!(), + } +} + +/// Proxy the request to the target volume server. +async fn proxy_request( + state: &VolumeServerState, + info: &ProxyRequestInfo, + target: &VolumeLocation, +) -> Response { + // Build target URL, adding proxied=true query param + let scheme = "http"; + let target_host = &target.url; + let path = info.path.trim_start_matches('/'); + + let target_url = if info.original_query.is_empty() { + format!("{}://{}/{}?proxied=true", scheme, target_host, path) + } else { + format!("{}://{}/{}?{}&proxied=true", scheme, target_host, path, info.original_query) + }; + + // Build the proxy request + let mut req_builder = state.http_client.get(&target_url); + + // Forward all original headers + for (name, value) in &info.original_headers { + if let Ok(v) = value.to_str() { + req_builder = req_builder.header(name.as_str(), v); + } + } + + let resp = match req_builder.send().await { + Ok(r) => r, + Err(e) => { + tracing::warn!("proxy request to {} failed: {}", target_url, e); + return StatusCode::INTERNAL_SERVER_ERROR.into_response(); + } + }; + + // Build response, copying headers and body from remote + let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + let mut response_headers = HeaderMap::new(); + for (name, value) in resp.headers() { + if name.as_str().eq_ignore_ascii_case("server") { + continue; + } + response_headers.insert(name.clone(), value.clone()); + } + + let body_bytes = match resp.bytes().await { + Ok(b) => b, + Err(e) => { + tracing::warn!("proxy response read failed: {}", e); + return StatusCode::INTERNAL_SERVER_ERROR.into_response(); + } + }; + + let mut response = Response::new(Body::from(body_bytes)); + *response.status_mut() = status; + *response.headers_mut() = response_headers; + response +} + +/// Return a redirect response to the target volume server. +fn redirect_request( + info: &ProxyRequestInfo, + target: &VolumeLocation, +) -> Response { + let scheme = "http"; + let target_host = &target.public_url; + + // Build query string: preserve collection, add proxied=true, drop readDeleted (Go parity) + let mut query_params = Vec::new(); + if !info.original_query.is_empty() { + for param in info.original_query.split('&') { + if let Some((key, value)) = param.split_once('=') { + if key == "collection" { + query_params.push(format!("collection={}", value)); + } + // Intentionally drop readDeleted and other params (Go parity) + } + } + } + query_params.push("proxied=true".to_string()); + let query = query_params.join("&"); + + let location = format!("{}://{}/{},{}?{}", scheme, target_host, &info.vid_str, &info.fid_str, query); + + Response::builder() + .status(StatusCode::MOVED_PERMANENTLY) + .header("Location", &location) + .body(Body::from(format!("Moved Permanently.\n\n", location))) + .unwrap_or_else(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response()) +} + // ============================================================================ // Query parameters // ============================================================================ @@ -320,6 +508,58 @@ async fn get_or_head_handler_inner( return (StatusCode::UNAUTHORIZED, format!("JWT error: {}", e)).into_response(); } + // Check if volume exists locally; if not, proxy/redirect based on read_mode. + // This mirrors Go's hasVolume check in GetOrHeadHandler. + // NOTE: The RwLockReadGuard must be dropped before any .await to keep the future Send. + let has_volume = state.store.read().unwrap().has_volume(vid); + + if !has_volume { + // Check if already proxied (loop prevention) + let query_string = request.uri().query().unwrap_or("").to_string(); + let is_proxied = query_string.contains("proxied=true"); + + if is_proxied || state.read_mode == ReadMode::Local || state.master_url.is_empty() { + return StatusCode::NOT_FOUND.into_response(); + } + + // Extract vid_str and fid_str from path for redirect URL construction. + // For redirect, fid must be stripped of extension (Go parity: parseURLPath returns raw fid). + let trimmed = path.trim_start_matches('/'); + let (vid_str, fid_str) = if let Some(pos) = trimmed.find(',') { + let raw_fid = &trimmed[pos + 1..]; + // Strip filename after slash: "fid/filename.ext" -> "fid" + let fid = if let Some(slash) = raw_fid.find('/') { + &raw_fid[..slash] + } else if let Some(dot) = raw_fid.rfind('.') { + // Strip extension: "fid.ext" -> "fid" + &raw_fid[..dot] + } else { + raw_fid + }; + (trimmed[..pos].to_string(), fid.to_string()) + } else if let Some(pos) = trimmed.find('/') { + let after = &trimmed[pos + 1..]; + let fid_part = if let Some(slash) = after.find('/') { + &after[..slash] + } else { + after + }; + (trimmed[..pos].to_string(), fid_part.to_string()) + } else { + return StatusCode::NOT_FOUND.into_response(); + }; + + let info = ProxyRequestInfo { + original_headers: request.headers().clone(), + original_query: query_string, + path: path.clone(), + vid_str, + fid_str, + }; + + return proxy_or_redirect_to_target(&state, info, vid).await; + } + // Download throttling let download_guard = if state.concurrent_download_limit > 0 { let timeout = if state.inflight_download_data_timeout.is_zero() { diff --git a/seaweed-volume/src/server/volume_server.rs b/seaweed-volume/src/server/volume_server.rs index ff5c71f05..e8eed2f52 100644 --- a/seaweed-volume/src/server/volume_server.rs +++ b/seaweed-volume/src/server/volume_server.rs @@ -21,6 +21,7 @@ use axum::{ http::{StatusCode, HeaderValue, Method}, }; +use crate::config::ReadMode; use crate::security::Guard; use crate::storage::store::Store; @@ -65,6 +66,14 @@ pub struct VolumeServerState { pub write_queue: std::sync::OnceLock, /// Registry of S3 tier backends for tiered storage operations. pub s3_tier_registry: std::sync::RwLock, + /// Read mode: local, proxy, or redirect for non-local volumes. + pub read_mode: ReadMode, + /// First master address for volume lookups (e.g., "localhost:9333"). + pub master_url: String, + /// This server's own address (ip:port) for filtering self from lookup results. + pub self_url: String, + /// HTTP client for proxy requests and master lookups. + pub http_client: reqwest::Client, } impl VolumeServerState { diff --git a/seaweed-volume/src/server/write_queue.rs b/seaweed-volume/src/server/write_queue.rs index bb2d51e8c..6fd67eb07 100644 --- a/seaweed-volume/src/server/write_queue.rs +++ b/seaweed-volume/src/server/write_queue.rs @@ -204,6 +204,10 @@ mod tests { volume_state_notify: tokio::sync::Notify::new(), write_queue: std::sync::OnceLock::new(), s3_tier_registry: std::sync::RwLock::new(crate::remote_storage::s3_tier::S3TierRegistry::new()), + read_mode: crate::config::ReadMode::Local, + master_url: String::new(), + self_url: String::new(), + http_client: reqwest::Client::new(), }) } diff --git a/seaweed-volume/src/storage/needle_map.rs b/seaweed-volume/src/storage/needle_map.rs index 124067900..ed1f0f47d 100644 --- a/seaweed-volume/src/storage/needle_map.rs +++ b/seaweed-volume/src/storage/needle_map.rs @@ -1,5 +1,10 @@ -//! NeedleMapper: in-memory index mapping NeedleId → (Offset, Size). +//! NeedleMapper: index mapping NeedleId -> (Offset, Size). //! +//! Two implementations: +//! - `CompactNeedleMap`: in-memory HashMap (fast, uses more RAM) +//! - `RedbNeedleMap`: disk-backed via redb (low RAM, slightly slower) +//! +//! The `NeedleMap` enum wraps both and provides a uniform interface. //! Loaded from .idx file on volume mount. Supports Get, Put, Delete with //! metrics tracking (file count, byte count, deleted count, deleted bytes). @@ -7,6 +12,8 @@ use std::collections::HashMap; use std::io::{self, Read, Seek, Write}; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition}; + use crate::storage::idx; use crate::storage::types::*; @@ -20,6 +27,23 @@ pub struct NeedleValue { pub size: Size, } +/// Pack an (Offset, Size) pair into 8 bytes for redb storage. +/// Layout: [offset 4 bytes big-endian] [size 4 bytes big-endian] +fn pack_needle_value(nv: &NeedleValue) -> [u8; 8] { + let mut buf = [0u8; 8]; + nv.offset.to_bytes(&mut buf[..4]); + nv.size.to_bytes(&mut buf[4..8]); + buf +} + +/// Unpack 8 bytes from redb storage into (Offset, Size). +fn unpack_needle_value(bytes: &[u8; 8]) -> NeedleValue { + NeedleValue { + offset: Offset::from_bytes(&bytes[..4]), + size: Size::from_bytes(&bytes[4..8]), + } +} + // ============================================================================ // NeedleMapMetric // ============================================================================ @@ -86,18 +110,9 @@ pub enum NeedleMapKind { } // ============================================================================ -// CompactNeedleMap (in-memory) +// IdxFileWriter trait // ============================================================================ -/// In-memory needle map backed by a HashMap. -/// The .idx file is kept open for append-only writes. -pub struct CompactNeedleMap { - map: HashMap, - metric: NeedleMapMetric, - idx_file: Option>, - idx_file_offset: u64, -} - /// Trait for appending to an index file. pub trait IdxFileWriter: Write + Send + Sync { fn sync_all(&self) -> io::Result<()>; @@ -109,6 +124,19 @@ impl IdxFileWriter for std::fs::File { } } +// ============================================================================ +// CompactNeedleMap (in-memory) +// ============================================================================ + +/// In-memory needle map backed by a HashMap. +/// The .idx file is kept open for append-only writes. +pub struct CompactNeedleMap { + map: HashMap, + metric: NeedleMapMetric, + idx_file: Option>, + idx_file_offset: u64, +} + impl CompactNeedleMap { /// Create a new empty in-memory map. pub fn new() -> Self { @@ -158,8 +186,8 @@ impl CompactNeedleMap { } /// Look up a needle. - pub fn get(&self, key: NeedleId) -> Option<&NeedleValue> { - self.map.get(&key) + pub fn get(&self, key: NeedleId) -> Option { + self.map.get(&key).cloned() } /// Mark a needle as deleted. Appends tombstone to .idx file. @@ -280,6 +308,482 @@ impl CompactNeedleMap { } } +// ============================================================================ +// RedbNeedleMap (disk-backed via redb) +// ============================================================================ + +/// redb table: NeedleId (u64) -> packed [offset(4) + size(4)] +const NEEDLE_TABLE: TableDefinition = TableDefinition::new("needles"); + +/// Disk-backed needle map using redb. +/// Low memory usage — data lives on disk with redb's page cache. +pub struct RedbNeedleMap { + db: Database, + metric: NeedleMapMetric, + idx_file: Option>, + idx_file_offset: u64, +} + +impl RedbNeedleMap { + /// Create a new redb-backed needle map at the given path. + /// The database file will be created if it does not exist. + pub fn new(db_path: &str) -> io::Result { + let db = Database::create(db_path).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb create error: {}", e)) + })?; + + // Ensure the table exists + let txn = db.begin_write().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb begin_write: {}", e)) + })?; + { + let _table = txn.open_table(NEEDLE_TABLE).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e)) + })?; + } + txn.commit().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e)) + })?; + + Ok(RedbNeedleMap { + db, + metric: NeedleMapMetric::default(), + idx_file: None, + idx_file_offset: 0, + }) + } + + /// Load from an .idx file, populating the redb database. + pub fn load_from_idx(db_path: &str, reader: &mut R) -> io::Result { + let nm = RedbNeedleMap::new(db_path)?; + + // Collect entries from idx file, resolving duplicates/deletions + let mut entries: HashMap> = HashMap::new(); + idx::walk_index_file(reader, 0, |key, offset, size| { + if offset.is_zero() || size.is_deleted() { + entries.insert(key, None); + } else { + entries.insert(key, Some(NeedleValue { offset, size })); + } + Ok(()) + })?; + + // Write all live entries to redb in a single transaction + let txn = nm.db.begin_write().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb begin_write: {}", e)) + })?; + { + let mut table = txn.open_table(NEEDLE_TABLE).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e)) + })?; + + for (key, maybe_nv) in &entries { + let key_u64: u64 = (*key).into(); + if let Some(nv) = maybe_nv { + let packed = pack_needle_value(nv); + table.insert(key_u64, packed.as_slice()).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb insert: {}", e)) + })?; + nm.metric.on_put(*key, None, nv.size); + } else { + // Entry was deleted — remove from redb if present + table.remove(key_u64).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb remove: {}", e)) + })?; + } + } + } + txn.commit().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e)) + })?; + + Ok(nm) + } + + /// Set the index file for append-only writes. + pub fn set_idx_file(&mut self, file: Box, offset: u64) { + self.idx_file = Some(file); + self.idx_file_offset = offset; + } + + // ---- Map operations ---- + + /// Insert or update an entry. Writes to idx file first, then redb. + pub fn put(&mut self, key: NeedleId, offset: Offset, size: Size) -> io::Result<()> { + // Persist to idx file BEFORE mutating redb state for crash consistency + if let Some(ref mut idx_file) = self.idx_file { + idx::write_index_entry(idx_file, key, offset, size)?; + self.idx_file_offset += NEEDLE_MAP_ENTRY_SIZE as u64; + } + + let key_u64: u64 = key.into(); + let nv = NeedleValue { offset, size }; + let packed = pack_needle_value(&nv); + + // Read old value for metric update + let old = self.get_internal(key_u64)?; + + let txn = self.db.begin_write().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb begin_write: {}", e)) + })?; + { + let mut table = txn.open_table(NEEDLE_TABLE).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e)) + })?; + table.insert(key_u64, packed.as_slice()).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb insert: {}", e)) + })?; + } + txn.commit().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e)) + })?; + + self.metric.on_put(key, old.as_ref(), size); + Ok(()) + } + + /// Look up a needle. + pub fn get(&self, key: NeedleId) -> Option { + let key_u64: u64 = key.into(); + self.get_internal(key_u64).ok().flatten() + } + + /// Internal get that returns io::Result for error propagation. + fn get_internal(&self, key_u64: u64) -> io::Result> { + let txn = self.db.begin_read().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb begin_read: {}", e)) + })?; + let table = txn.open_table(NEEDLE_TABLE).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e)) + })?; + match table.get(key_u64) { + Ok(Some(guard)) => { + let bytes: &[u8] = guard.value(); + if bytes.len() == 8 { + let mut arr = [0u8; 8]; + arr.copy_from_slice(bytes); + Ok(Some(unpack_needle_value(&arr))) + } else { + Ok(None) + } + } + Ok(None) => Ok(None), + Err(e) => Err(io::Error::new(io::ErrorKind::Other, format!("redb get: {}", e))), + } + } + + /// Mark a needle as deleted. Appends tombstone to .idx file, negates size in redb. + pub fn delete(&mut self, key: NeedleId, offset: Offset) -> io::Result> { + let key_u64: u64 = key.into(); + + if let Some(old) = self.get_internal(key_u64)? { + if old.size.is_valid() { + // Persist tombstone to idx file BEFORE mutating redb + if let Some(ref mut idx_file) = self.idx_file { + idx::write_index_entry(idx_file, key, offset, TOMBSTONE_FILE_SIZE)?; + self.idx_file_offset += NEEDLE_MAP_ENTRY_SIZE as u64; + } + + self.metric.on_delete(&old); + let deleted_size = Size(-(old.size.0)); + // Keep original offset so readDeleted can find original data (matching Go behavior) + let deleted_nv = NeedleValue { offset: old.offset, size: deleted_size }; + let packed = pack_needle_value(&deleted_nv); + + let txn = self.db.begin_write().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb begin_write: {}", e)) + })?; + { + let mut table = txn.open_table(NEEDLE_TABLE).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e)) + })?; + table.insert(key_u64, packed.as_slice()).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb insert: {}", e)) + })?; + } + txn.commit().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e)) + })?; + + return Ok(Some(old.size)); + } + } + Ok(None) + } + + // ---- Metrics accessors ---- + + pub fn content_size(&self) -> u64 { + self.metric.file_byte_count.load(Ordering::Relaxed) + } + + pub fn deleted_size(&self) -> u64 { + self.metric.deletion_byte_count.load(Ordering::Relaxed) + } + + pub fn file_count(&self) -> i64 { + self.metric.file_count.load(Ordering::Relaxed) + } + + pub fn deleted_count(&self) -> i64 { + self.metric.deletion_count.load(Ordering::Relaxed) + } + + pub fn max_file_key(&self) -> NeedleId { + NeedleId(self.metric.max_file_key.load(Ordering::Relaxed)) + } + + pub fn index_file_size(&self) -> u64 { + self.idx_file_offset + } + + /// Sync index file to disk. + pub fn sync(&self) -> io::Result<()> { + if let Some(ref idx_file) = self.idx_file { + idx_file.sync_all()?; + } + Ok(()) + } + + /// Close index file. + pub fn close(&mut self) { + let _ = self.sync(); + self.idx_file = None; + } + + /// Save the redb contents to an index file, sorted by needle ID ascending. + pub fn save_to_idx(&self, path: &str) -> io::Result<()> { + let txn = self.db.begin_read().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb begin_read: {}", e)) + })?; + let table = txn.open_table(NEEDLE_TABLE).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e)) + })?; + + let mut file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path)?; + + // redb iterates in key order (u64 ascending) + let iter = table.iter().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb iter: {}", e)) + })?; + + for entry in iter { + let (key_guard, val_guard) = entry.map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb iter next: {}", e)) + })?; + let key_u64: u64 = key_guard.value(); + let bytes: &[u8] = val_guard.value(); + if bytes.len() == 8 { + let mut arr = [0u8; 8]; + arr.copy_from_slice(bytes); + let nv = unpack_needle_value(&arr); + if nv.size.is_valid() { + idx::write_index_entry(&mut file, NeedleId(key_u64), nv.offset, nv.size)?; + } + } + } + file.sync_all()?; + Ok(()) + } + + /// Visit all entries in ascending order by needle ID. + pub fn ascending_visit(&self, mut f: F) -> Result<(), String> + where + F: FnMut(NeedleId, &NeedleValue) -> Result<(), String>, + { + let txn = self.db.begin_read().map_err(|e| format!("redb begin_read: {}", e))?; + let table = txn.open_table(NEEDLE_TABLE).map_err(|e| format!("redb open_table: {}", e))?; + let iter = table.iter().map_err(|e| format!("redb iter: {}", e))?; + + for entry in iter { + let (key_guard, val_guard) = entry.map_err(|e| format!("redb iter next: {}", e))?; + let key_u64: u64 = key_guard.value(); + let bytes: &[u8] = val_guard.value(); + if bytes.len() == 8 { + let mut arr = [0u8; 8]; + arr.copy_from_slice(bytes); + let nv = unpack_needle_value(&arr); + f(NeedleId(key_u64), &nv)?; + } + } + Ok(()) + } + + /// Collect all entries as a Vec for iteration (used by volume.rs iter patterns). + pub fn collect_entries(&self) -> Vec<(NeedleId, NeedleValue)> { + let mut result = Vec::new(); + let txn: redb::ReadTransaction = match self.db.begin_read() { + Ok(t) => t, + Err(_) => return result, + }; + let table = match txn.open_table(NEEDLE_TABLE) { + Ok(t) => t, + Err(_) => return result, + }; + let iter = match table.iter() { + Ok(i) => i, + Err(_) => return result, + }; + for entry in iter { + if let Ok((key_guard, val_guard)) = entry { + let key_u64: u64 = key_guard.value(); + let bytes: &[u8] = val_guard.value(); + if bytes.len() == 8 { + let mut arr = [0u8; 8]; + arr.copy_from_slice(bytes); + let nv = unpack_needle_value(&arr); + result.push((NeedleId(key_u64), nv)); + } + } + } + result + } +} + +// ============================================================================ +// NeedleMap enum — unified interface over both implementations +// ============================================================================ + +/// Unified needle map wrapping either in-memory or redb-backed storage. +pub enum NeedleMap { + InMemory(CompactNeedleMap), + Redb(RedbNeedleMap), +} + +impl NeedleMap { + /// Insert or update an entry. + pub fn put(&mut self, key: NeedleId, offset: Offset, size: Size) -> io::Result<()> { + match self { + NeedleMap::InMemory(nm) => nm.put(key, offset, size), + NeedleMap::Redb(nm) => nm.put(key, offset, size), + } + } + + /// Look up a needle. + pub fn get(&self, key: NeedleId) -> Option { + match self { + NeedleMap::InMemory(nm) => nm.get(key), + NeedleMap::Redb(nm) => nm.get(key), + } + } + + /// Mark a needle as deleted. + pub fn delete(&mut self, key: NeedleId, offset: Offset) -> io::Result> { + match self { + NeedleMap::InMemory(nm) => nm.delete(key, offset), + NeedleMap::Redb(nm) => nm.delete(key, offset), + } + } + + /// Set the index file for append-only writes. + pub fn set_idx_file(&mut self, file: Box, offset: u64) { + match self { + NeedleMap::InMemory(nm) => nm.set_idx_file(file, offset), + NeedleMap::Redb(nm) => nm.set_idx_file(file, offset), + } + } + + /// Content byte count. + pub fn content_size(&self) -> u64 { + match self { + NeedleMap::InMemory(nm) => nm.content_size(), + NeedleMap::Redb(nm) => nm.content_size(), + } + } + + /// Deleted byte count. + pub fn deleted_size(&self) -> u64 { + match self { + NeedleMap::InMemory(nm) => nm.deleted_size(), + NeedleMap::Redb(nm) => nm.deleted_size(), + } + } + + /// Live file count. + pub fn file_count(&self) -> i64 { + match self { + NeedleMap::InMemory(nm) => nm.file_count(), + NeedleMap::Redb(nm) => nm.file_count(), + } + } + + /// Deleted file count. + pub fn deleted_count(&self) -> i64 { + match self { + NeedleMap::InMemory(nm) => nm.deleted_count(), + NeedleMap::Redb(nm) => nm.deleted_count(), + } + } + + /// Maximum needle ID seen. + pub fn max_file_key(&self) -> NeedleId { + match self { + NeedleMap::InMemory(nm) => nm.max_file_key(), + NeedleMap::Redb(nm) => nm.max_file_key(), + } + } + + /// Index file size in bytes. + pub fn index_file_size(&self) -> u64 { + match self { + NeedleMap::InMemory(nm) => nm.index_file_size(), + NeedleMap::Redb(nm) => nm.index_file_size(), + } + } + + /// Sync index file to disk. + pub fn sync(&self) -> io::Result<()> { + match self { + NeedleMap::InMemory(nm) => nm.sync(), + NeedleMap::Redb(nm) => nm.sync(), + } + } + + /// Close index file. + pub fn close(&mut self) { + match self { + NeedleMap::InMemory(nm) => nm.close(), + NeedleMap::Redb(nm) => nm.close(), + } + } + + /// Save to an index file. + pub fn save_to_idx(&self, path: &str) -> io::Result<()> { + match self { + NeedleMap::InMemory(nm) => nm.save_to_idx(path), + NeedleMap::Redb(nm) => nm.save_to_idx(path), + } + } + + /// Visit all entries in ascending order by needle ID. + pub fn ascending_visit(&self, f: F) -> Result<(), String> + where + F: FnMut(NeedleId, &NeedleValue) -> Result<(), String>, + { + match self { + NeedleMap::InMemory(nm) => nm.ascending_visit(f), + NeedleMap::Redb(nm) => nm.ascending_visit(f), + } + } + + /// Iterate all entries. Returns a Vec of (NeedleId, NeedleValue) pairs. + /// For InMemory this collects from the HashMap; for Redb it reads from disk. + pub fn iter_entries(&self) -> Vec<(NeedleId, NeedleValue)> { + match self { + NeedleMap::InMemory(nm) => nm.iter().map(|(&id, &nv)| (id, nv)).collect(), + NeedleMap::Redb(nm) => nm.collect_entries(), + } + } +} + +// ============================================================================ +// Tests +// ============================================================================ + #[cfg(test)] mod tests { use super::*; @@ -371,4 +875,199 @@ mod tests { assert_eq!(r2, None); assert_eq!(nm.deleted_count(), 1); // not double counted } + + // ---- RedbNeedleMap tests ---- + + #[test] + fn test_redb_needle_map_put_get() { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("test.rdb"); + let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap(); + + nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap(); + nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap(); + + let v1 = nm.get(NeedleId(1)).unwrap(); + assert_eq!(v1.size, Size(100)); + + let v2 = nm.get(NeedleId(2)).unwrap(); + assert_eq!(v2.size, Size(200)); + + assert!(nm.get(NeedleId(99)).is_none()); + } + + #[test] + fn test_redb_needle_map_delete() { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("test.rdb"); + let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap(); + + nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap(); + assert_eq!(nm.file_count(), 1); + assert_eq!(nm.content_size(), 100); + + let deleted = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap(); + assert_eq!(deleted, Some(Size(100))); + + assert_eq!(nm.file_count(), 0); + assert_eq!(nm.deleted_count(), 1); + assert_eq!(nm.deleted_size(), 100); + + // Deleted entry should have negated size + let nv = nm.get(NeedleId(1)).unwrap(); + assert_eq!(nv.size, Size(-100)); + } + + #[test] + fn test_redb_needle_map_metrics() { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("test.rdb"); + let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap(); + + nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap(); + nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap(); + nm.put(NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap(); + + assert_eq!(nm.file_count(), 3); + assert_eq!(nm.content_size(), 600); + assert_eq!(nm.max_file_key(), NeedleId(3)); + + // Update existing + nm.put(NeedleId(2), Offset::from_actual_offset(700), Size(250)).unwrap(); + assert_eq!(nm.file_count(), 3); + assert_eq!(nm.content_size(), 650); // 100 + 250 + 300 + + // Delete + nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap(); + assert_eq!(nm.file_count(), 2); + assert_eq!(nm.deleted_count(), 1); + } + + #[test] + fn test_redb_needle_map_load_from_idx() { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("test.rdb"); + + let mut idx_data = Vec::new(); + idx::write_index_entry(&mut idx_data, NeedleId(1), Offset::from_actual_offset(8), Size(100)).unwrap(); + idx::write_index_entry(&mut idx_data, NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap(); + idx::write_index_entry(&mut idx_data, NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap(); + // Delete needle 2 + idx::write_index_entry(&mut idx_data, NeedleId(2), Offset::default(), TOMBSTONE_FILE_SIZE).unwrap(); + + let mut cursor = Cursor::new(idx_data); + let nm = RedbNeedleMap::load_from_idx(db_path.to_str().unwrap(), &mut cursor).unwrap(); + + assert!(nm.get(NeedleId(1)).is_some()); + assert!(nm.get(NeedleId(2)).is_none()); // deleted and removed + assert!(nm.get(NeedleId(3)).is_some()); + assert_eq!(nm.file_count(), 2); + } + + #[test] + fn test_redb_needle_map_double_delete() { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("test.rdb"); + let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap(); + + nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap(); + + let r1 = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap(); + assert_eq!(r1, Some(Size(100))); + + // Second delete should return None (already deleted) + let r2 = nm.delete(NeedleId(1), Offset::from_actual_offset(0)).unwrap(); + assert_eq!(r2, None); + assert_eq!(nm.deleted_count(), 1); // not double counted + } + + #[test] + fn test_redb_needle_map_ascending_visit() { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("test.rdb"); + let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap(); + + nm.put(NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap(); + nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap(); + nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap(); + + let mut visited = Vec::new(); + nm.ascending_visit(|id, nv| { + visited.push((id, nv.size)); + Ok(()) + }).unwrap(); + + assert_eq!(visited.len(), 3); + assert_eq!(visited[0], (NeedleId(1), Size(100))); + assert_eq!(visited[1], (NeedleId(2), Size(200))); + assert_eq!(visited[2], (NeedleId(3), Size(300))); + } + + #[test] + fn test_redb_needle_map_save_to_idx() { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("test.rdb"); + let idx_path = dir.path().join("test.idx"); + + let mut nm = RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap(); + nm.put(NeedleId(1), Offset::from_actual_offset(8), Size(100)).unwrap(); + nm.put(NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap(); + nm.put(NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap(); + // Delete needle 2 + nm.delete(NeedleId(2), Offset::from_actual_offset(128)).unwrap(); + + nm.save_to_idx(idx_path.to_str().unwrap()).unwrap(); + + // Load back with CompactNeedleMap to verify + let mut idx_file = std::fs::File::open(&idx_path).unwrap(); + let loaded = CompactNeedleMap::load_from_idx(&mut idx_file).unwrap(); + assert_eq!(loaded.file_count(), 2); // only live entries + assert!(loaded.get(NeedleId(1)).is_some()); + assert!(loaded.get(NeedleId(2)).is_none()); // deleted, not saved + assert!(loaded.get(NeedleId(3)).is_some()); + } + + #[test] + fn test_pack_unpack_needle_value() { + let nv = NeedleValue { + offset: Offset::from_actual_offset(8 * 1000), + size: Size(4096), + }; + let packed = pack_needle_value(&nv); + let unpacked = unpack_needle_value(&packed); + assert_eq!(nv.offset.to_actual_offset(), unpacked.offset.to_actual_offset()); + assert_eq!(nv.size, unpacked.size); + } + + #[test] + fn test_pack_unpack_negative_size() { + let nv = NeedleValue { + offset: Offset::from_actual_offset(8 * 500), + size: Size(-100), + }; + let packed = pack_needle_value(&nv); + let unpacked = unpack_needle_value(&packed); + assert_eq!(nv.offset.to_actual_offset(), unpacked.offset.to_actual_offset()); + assert_eq!(nv.size, unpacked.size); + } + + // ---- NeedleMap enum tests ---- + + #[test] + fn test_needle_map_enum_inmemory() { + let mut nm = NeedleMap::InMemory(CompactNeedleMap::new()); + nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap(); + assert_eq!(nm.get(NeedleId(1)).unwrap().size, Size(100)); + assert_eq!(nm.file_count(), 1); + } + + #[test] + fn test_needle_map_enum_redb() { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("test.rdb"); + let mut nm = NeedleMap::Redb(RedbNeedleMap::new(db_path.to_str().unwrap()).unwrap()); + nm.put(NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap(); + assert_eq!(nm.get(NeedleId(1)).unwrap().size, Size(100)); + assert_eq!(nm.file_count(), 1); + } } diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 5a2d1e2ee..855c3df8b 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -17,7 +17,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use tracing::warn; use crate::storage::needle::needle::{self, Needle, NeedleError, get_actual_size}; -use crate::storage::needle_map::{CompactNeedleMap, NeedleMapKind}; +use crate::storage::needle_map::{CompactNeedleMap, NeedleMap, NeedleMapKind, RedbNeedleMap}; use crate::storage::super_block::{SuperBlock, ReplicaPlacement, SUPER_BLOCK_SIZE}; use crate::storage::types::*; @@ -240,7 +240,7 @@ pub struct Volume { pub collection: String, dat_file: Option, - nm: Option, + nm: Option, needle_map_kind: NeedleMapKind, pub super_block: SuperBlock, @@ -340,7 +340,7 @@ impl Volume { pub fn file_name(&self, ext: &str) -> String { match ext { - ".idx" | ".cpx" | ".ldb" | ".cpldb" => { + ".idx" | ".cpx" | ".ldb" | ".cpldb" | ".rdb" => { format!("{}{}", self.index_file_name(), ext) } _ => { @@ -431,13 +431,10 @@ impl Volume { } fn load_index(&mut self) -> Result<(), VolumeError> { - if self.needle_map_kind != NeedleMapKind::InMemory { - warn!( - volume_id = self.id.0, - kind = ?self.needle_map_kind, - "only InMemory needle map is currently supported, falling back to InMemory" - ); - } + let use_redb = matches!( + self.needle_map_kind, + NeedleMapKind::LevelDb | NeedleMapKind::LevelDbMedium | NeedleMapKind::LevelDbLarge + ); let idx_path = self.file_name(".idx"); @@ -446,12 +443,23 @@ impl Volume { fs::create_dir_all(parent)?; } + if use_redb { + self.load_index_redb(&idx_path)?; + } else { + self.load_index_inmemory(&idx_path)?; + } + + Ok(()) + } + + /// Load index using in-memory CompactNeedleMap. + fn load_index_inmemory(&mut self, idx_path: &str) -> Result<(), VolumeError> { if self.no_write_or_delete { // Open read-only if Path::new(&idx_path).exists() { let mut idx_file = File::open(&idx_path)?; let nm = CompactNeedleMap::load_from_idx(&mut idx_file)?; - self.nm = Some(nm); + self.nm = Some(NeedleMap::InMemory(nm)); } else { // Missing .idx with existing .dat could orphan needles let dat_path = self.file_name(".dat"); @@ -464,7 +472,7 @@ impl Volume { ); } } - self.nm = Some(CompactNeedleMap::new()); + self.nm = Some(NeedleMap::InMemory(CompactNeedleMap::new())); } } else { // Open read-write (create if missing) @@ -484,7 +492,56 @@ impl Volume { .append(true) .open(&idx_path)?; nm.set_idx_file(Box::new(write_file), idx_size); - self.nm = Some(nm); + self.nm = Some(NeedleMap::InMemory(nm)); + } + + Ok(()) + } + + /// Load index using disk-backed RedbNeedleMap. + fn load_index_redb(&mut self, idx_path: &str) -> Result<(), VolumeError> { + // The redb database file is stored alongside the volume files + let rdb_path = self.file_name(".rdb"); + + if self.no_write_or_delete { + // Open read-only + if Path::new(&idx_path).exists() { + let mut idx_file = File::open(&idx_path)?; + let nm = RedbNeedleMap::load_from_idx(&rdb_path, &mut idx_file)?; + self.nm = Some(NeedleMap::Redb(nm)); + } else { + // Missing .idx with existing .dat could orphan needles + let dat_path = self.file_name(".dat"); + if Path::new(&dat_path).exists() { + let dat_size = fs::metadata(&dat_path).map(|m| m.len()).unwrap_or(0); + if dat_size > SUPER_BLOCK_SIZE as u64 { + warn!( + volume_id = self.id.0, + ".idx file missing but .dat exists with data; needles may be orphaned" + ); + } + } + self.nm = Some(NeedleMap::Redb(RedbNeedleMap::new(&rdb_path)?)); + } + } else { + // Open read-write (create if missing) + let idx_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&idx_path)?; + + let idx_size = idx_file.metadata()?.len(); + let mut idx_reader = io::BufReader::new(&idx_file); + let mut nm = RedbNeedleMap::load_from_idx(&rdb_path, &mut idx_reader)?; + + // Re-open for append-only writes + let write_file = OpenOptions::new() + .write(true) + .append(true) + .open(&idx_path)?; + nm.set_idx_file(Box::new(write_file), idx_size); + self.nm = Some(NeedleMap::Redb(nm)); } Ok(()) @@ -927,7 +984,7 @@ impl Volume { pub fn read_all_needles(&self) -> Result, VolumeError> { let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; let mut needles = Vec::new(); - for (&key, nv) in nm.iter() { + for (key, nv) in nm.iter_entries() { if !nv.size.is_valid() { continue; // skip deleted } @@ -954,7 +1011,7 @@ impl Volume { let mut files_checked: u64 = 0; let mut broken = Vec::new(); - for (needle_id, nv) in nm.iter() { + for (needle_id, nv) in nm.iter_entries() { if nv.offset.is_zero() || nv.size.is_deleted() { continue; } @@ -970,7 +1027,7 @@ impl Volume { // Read and verify the needle (read_needle_data_at checks CRC via read_bytes/read_tail) let mut n = Needle { - id: *needle_id, + id: needle_id, ..Needle::default() }; match self.read_needle_data_at(&mut n, offset, nv.size) { @@ -1165,6 +1222,163 @@ impl Volume { Ok(()) } + // ---- Binary search for incremental copy ---- + + /// Read a single index entry's offset from the .idx file by entry index. + fn read_offset_from_index(&self, m: i64) -> Result { + let idx_path = self.file_name(".idx"); + let idx_file = File::open(&idx_path)?; + let mut buf = [0u8; NEEDLE_MAP_ENTRY_SIZE]; + let file_offset = m as u64 * NEEDLE_MAP_ENTRY_SIZE as u64; + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + idx_file.read_exact_at(&mut buf, file_offset)?; + } + #[cfg(not(unix))] + { + let mut f = idx_file; + f.seek(SeekFrom::Start(file_offset))?; + std::io::Read::read_exact(&mut f, &mut buf)?; + } + let (_key, offset, _size) = idx_entry_from_bytes(&buf); + Ok(offset) + } + + /// Read the append_at_ns timestamp from a needle at the given offset in the .dat file. + fn read_append_at_ns(&self, offset: Offset) -> Result { + let dat_file = self.dat_file.as_ref().ok_or_else(|| { + VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open")) + })?; + let actual_offset = offset.to_actual_offset() as u64; + let version = self.version(); + + let mut header_buf = [0u8; NEEDLE_HEADER_SIZE]; + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + dat_file.read_exact_at(&mut header_buf, actual_offset)?; + } + #[cfg(not(unix))] + { + read_exact_at(dat_file, &mut header_buf, actual_offset)?; + } + + let (_cookie, _id, size) = Needle::parse_header(&header_buf); + if size.0 <= 0 { + return Ok(0); + } + + let actual_size = get_actual_size(size, version); + let mut buf = vec![0u8; actual_size as usize]; + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + dat_file.read_exact_at(&mut buf, actual_offset)?; + } + #[cfg(not(unix))] + { + read_exact_at(dat_file, &mut buf, actual_offset)?; + } + + let mut n = Needle::default(); + n.read_bytes_meta_only(&mut buf, offset.to_actual_offset(), size, version)?; + Ok(n.append_at_ns) + } + + /// Search right from position m to find the first non-deleted entry. + fn read_right_ns(&self, m: i64, max: i64) -> Result<(i64, Offset, u64), VolumeError> { + let mut index = m; + loop { + index += 1; + if index >= max { + return Ok((index, Offset::default(), 0)); + } + let offset = self.read_offset_from_index(index)?; + if !offset.is_zero() { + let ts = self.read_append_at_ns(offset)?; + return Ok((index, offset, ts)); + } + } + } + + /// Search left from position m to find the first non-deleted entry. + fn read_left_ns(&self, m: i64) -> Result<(i64, Offset, u64), VolumeError> { + let mut index = m; + loop { + index -= 1; + if index < 0 { + return Ok((index, Offset::default(), 0)); + } + let offset = self.read_offset_from_index(index)?; + if !offset.is_zero() { + let ts = self.read_append_at_ns(offset)?; + return Ok((index, offset, ts)); + } + } + } + + /// Binary search through the .idx file to find the first needle + /// with append_at_ns > since_ns. Returns (offset, is_last). + /// Matches Go's BinarySearchByAppendAtNs in volume_backup.go. + pub fn binary_search_by_append_at_ns(&self, since_ns: u64) -> Result<(Offset, bool), VolumeError> { + let file_size = self.idx_file_size() as i64; + if file_size % NEEDLE_MAP_ENTRY_SIZE as i64 != 0 { + return Err(VolumeError::Io(io::Error::new( + io::ErrorKind::InvalidData, + format!("unexpected idx file size: {}", file_size), + ))); + } + + let entry_count = file_size / NEEDLE_MAP_ENTRY_SIZE as i64; + let mut l: i64 = 0; + let mut h: i64 = entry_count; + + while l < h { + let m = (l + h) / 2; + + if m == entry_count { + return Ok((Offset::default(), true)); + } + + let offset = self.read_offset_from_index(m)?; + + if offset.is_zero() { + let (left_index, _left_offset, left_ns) = self.read_left_ns(m)?; + let (right_index, right_offset, right_ns) = self.read_right_ns(m, entry_count)?; + + if right_ns <= since_ns { + l = right_index; + if l == entry_count { + return Ok((Offset::default(), true)); + } else { + continue; + } + } + if since_ns < left_ns { + h = left_index + 1; + continue; + } + return Ok((right_offset, false)); + } + + let m_ns = self.read_append_at_ns(offset)?; + + if m_ns <= since_ns { + l = m + 1; + } else { + h = m; + } + } + + if l == entry_count { + return Ok((Offset::default(), true)); + } + + let offset = self.read_offset_from_index(l)?; + Ok((offset, false)) + } + /// Write a raw needle blob at a specific offset in the .dat file. pub fn write_needle_blob(&mut self, offset: i64, needle_blob: &[u8]) -> Result<(), VolumeError> { if self.no_write_or_delete { @@ -1279,7 +1493,7 @@ impl Volume { // Collect live entries from needle map (sorted ascending) let nm = self.nm.as_ref().ok_or(VolumeError::NotInitialized)?; let mut entries: Vec<(NeedleId, Offset, Size)> = Vec::new(); - for (&id, nv) in nm.iter() { + for (id, nv) in nm.iter_entries() { if nv.offset.is_zero() || nv.size.is_deleted() { continue; } diff --git a/seaweed-volume/tests/http_integration.rs b/seaweed-volume/tests/http_integration.rs index b81c92bac..0bc1cae8f 100644 --- a/seaweed-volume/tests/http_integration.rs +++ b/seaweed-volume/tests/http_integration.rs @@ -57,6 +57,10 @@ fn test_state() -> (Arc, TempDir) { s3_tier_registry: std::sync::RwLock::new( seaweed_volume::remote_storage::s3_tier::S3TierRegistry::new(), ), + read_mode: seaweed_volume::config::ReadMode::Local, + master_url: String::new(), + self_url: String::new(), + http_client: reqwest::Client::new(), }); (state, tmp) }