diff --git a/seaweed-volume/src/storage/needle_map.rs b/seaweed-volume/src/storage/needle_map.rs index e8942ea1e..414dc4020 100644 --- a/seaweed-volume/src/storage/needle_map.rs +++ b/seaweed-volume/src/storage/needle_map.rs @@ -10,9 +10,10 @@ use std::collections::HashMap; use std::io::{self, Read, Seek, Write}; +use std::path::Path; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; -use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition}; +use redb::{Database, Durability, ReadableDatabase, ReadableTable, TableDefinition}; use crate::storage::idx; use crate::storage::types::*; @@ -330,6 +331,12 @@ impl CompactNeedleMap { /// redb table: NeedleId (u64) -> packed [offset(4) + size(4)] const NEEDLE_TABLE: TableDefinition = TableDefinition::new("needles"); +/// Metadata table: stores the .idx file size that was used to build this redb. +/// Key "idx_size" -> u64 byte offset. Used to detect whether the .rdb can be +/// reused on restart or needs a full/incremental rebuild. +const META_TABLE: TableDefinition<&str, u64> = TableDefinition::new("meta"); +const META_IDX_SIZE: &str = "idx_size"; + /// Disk-backed needle map using redb. /// Low memory usage — data lives on disk with redb's page cache. pub struct RedbNeedleMap { @@ -340,6 +347,17 @@ pub struct RedbNeedleMap { } impl RedbNeedleMap { + /// Begin a write transaction with `Durability::None` (no fsync). + /// The .idx file is the source of truth for crash recovery, so redb + /// is always rebuilt from .idx on startup — fsync is unnecessary. + fn begin_write_no_fsync(db: &Database) -> io::Result { + let mut txn = db.begin_write().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb begin_write: {}", e)) + })?; + let _ = txn.set_durability(Durability::None); + Ok(txn) + } + /// Create a new redb-backed needle map at the given path. /// The database file will be created if it does not exist. pub fn new(db_path: &str) -> io::Result { @@ -347,14 +365,15 @@ impl RedbNeedleMap { io::Error::new(io::ErrorKind::Other, format!("redb create error: {}", e)) })?; - // Ensure the table exists - let txn = db.begin_write().map_err(|e| { - io::Error::new(io::ErrorKind::Other, format!("redb begin_write: {}", e)) - })?; + // Ensure tables exist + let txn = Self::begin_write_no_fsync(&db)?; { let _table = txn.open_table(NEEDLE_TABLE).map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e)) })?; + let _meta = txn.open_table(META_TABLE).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb open_table meta: {}", e)) + })?; } txn.commit() .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e)))?; @@ -367,8 +386,229 @@ impl RedbNeedleMap { }) } - /// Load from an .idx file, populating the redb database. + /// Save the .idx file size into redb metadata so we can detect whether + /// the .rdb is up-to-date on the next startup. + fn save_idx_size_meta(&self, idx_size: u64) -> io::Result<()> { + let txn = Self::begin_write_no_fsync(&self.db)?; + { + let mut meta = txn.open_table(META_TABLE).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb open meta: {}", e)) + })?; + meta.insert(META_IDX_SIZE, idx_size).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb insert meta: {}", e)) + })?; + } + txn.commit() + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb commit meta: {}", e)))?; + Ok(()) + } + + /// Read the stored .idx file size from redb metadata. + fn read_idx_size_meta(&self) -> io::Result> { + let txn = self.db.begin_read().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb begin_read: {}", e)) + })?; + let meta = txn.open_table(META_TABLE).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb open meta: {}", e)) + })?; + match meta.get(META_IDX_SIZE) { + Ok(Some(guard)) => Ok(Some(guard.value())), + Ok(None) => Ok(None), + Err(e) => Err(io::Error::new( + io::ErrorKind::Other, + format!("redb get meta: {}", e), + )), + } + } + + /// Rebuild metrics by scanning all entries in the redb table. + /// Called when reusing an existing .rdb without a full rebuild. + fn rebuild_metrics_from_db(&self) -> io::Result<()> { + let txn = self.db.begin_read().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb begin_read: {}", e)) + })?; + let table = txn.open_table(NEEDLE_TABLE).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e)) + })?; + let iter = table.iter().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb iter: {}", e)) + })?; + for entry in iter { + let (key_guard, val_guard) = entry.map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb iter next: {}", e)) + })?; + let key = NeedleId(key_guard.value()); + let bytes: &[u8] = val_guard.value(); + if bytes.len() == 8 { + let mut arr = [0u8; 8]; + arr.copy_from_slice(bytes); + let nv = unpack_needle_value(&arr); + self.metric.maybe_set_max_file_key(key); + if nv.size.is_valid() { + self.metric.file_count.fetch_add(1, Ordering::Relaxed); + self.metric + .file_byte_count + .fetch_add(nv.size.0 as u64, Ordering::Relaxed); + } else { + // Deleted entry (negative size) + self.metric.deletion_count.fetch_add(1, Ordering::Relaxed); + self.metric + .deletion_byte_count + .fetch_add((-nv.size.0) as u64, Ordering::Relaxed); + } + } + } + Ok(()) + } + + /// Load from an .idx file, reusing an existing .rdb if it is consistent. + /// + /// Strategy: + /// 1. Try to open existing .rdb and read its stored .idx size + /// 2. If .idx size matches → reuse .rdb, rebuild metrics from scan + /// 3. If .idx is larger → replay new entries incrementally + /// 4. Otherwise (missing, corrupted, .idx smaller) → full rebuild pub fn load_from_idx(db_path: &str, reader: &mut R) -> io::Result { + let idx_size = reader.seek(io::SeekFrom::End(0))?; + reader.seek(io::SeekFrom::Start(0))?; + + // Try to reuse existing .rdb + if Path::new(db_path).exists() { + if let Ok(nm) = Self::try_reuse_rdb(db_path, reader, idx_size) { + return Ok(nm); + } + // Reuse failed — fall through to full rebuild + reader.seek(io::SeekFrom::Start(0))?; + } + + Self::full_rebuild(db_path, reader, idx_size) + } + + /// Try to reuse an existing .rdb file. Returns Ok if successful, + /// Err if a full rebuild is needed. + fn try_reuse_rdb( + db_path: &str, + reader: &mut R, + idx_size: u64, + ) -> io::Result { + let db = Database::open(db_path).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb open: {}", e)) + })?; + + let nm = RedbNeedleMap { + db, + metric: NeedleMapMetric::default(), + idx_file: None, + idx_file_offset: 0, + }; + + let stored_idx_size = nm.read_idx_size_meta()?.ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "no idx_size in redb meta") + })?; + + if stored_idx_size > idx_size { + // .idx shrank — corrupted or truncated, need full rebuild + return Err(io::Error::new( + io::ErrorKind::Other, + "idx file smaller than stored size", + )); + } + + // Rebuild metrics from existing data + nm.rebuild_metrics_from_db()?; + + if stored_idx_size < idx_size { + // .idx grew — replay new entries incrementally + reader.seek(io::SeekFrom::Start(stored_idx_size))?; + let txn = Self::begin_write_no_fsync(&nm.db)?; + { + let mut table = txn.open_table(NEEDLE_TABLE).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e)) + })?; + idx::walk_index_file(reader, stored_idx_size, |key, offset, size| { + let key_u64: u64 = key.into(); + if offset.is_zero() || size.is_deleted() { + // Delete: look up old value for metric update, then + // store tombstone (negative size with original offset) + if let Ok(Some(old)) = nm.get_via_table(&table, key_u64) { + if old.size.is_valid() { + nm.metric.on_delete(&old); + let deleted_nv = NeedleValue { + offset: old.offset, + size: Size(-(old.size.0)), + }; + let packed = pack_needle_value(&deleted_nv); + table + .insert(key_u64, packed.as_slice()) + .map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("redb insert: {}", e), + ) + })?; + } + } + } else { + // Put: look up old value for metric update + let old = nm.get_via_table(&table, key_u64).ok().flatten(); + let nv = NeedleValue { offset, size }; + let packed = pack_needle_value(&nv); + table + .insert(key_u64, packed.as_slice()) + .map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("redb insert: {}", e), + ) + })?; + nm.metric.on_put(key, old.as_ref(), size); + } + Ok(()) + })?; + } + txn.commit().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e)) + })?; + + nm.save_idx_size_meta(idx_size)?; + } + + Ok(nm) + } + + /// Look up a needle value using an already-open table reference. + /// Used during incremental replay to avoid opening separate read transactions. + fn get_via_table( + &self, + table: &redb::Table, + key_u64: u64, + ) -> io::Result> { + match table.get(key_u64) { + Ok(Some(guard)) => { + let bytes: &[u8] = guard.value(); + if bytes.len() == 8 { + let mut arr = [0u8; 8]; + arr.copy_from_slice(bytes); + Ok(Some(unpack_needle_value(&arr))) + } else { + Ok(None) + } + } + Ok(None) => Ok(None), + Err(e) => Err(io::Error::new( + io::ErrorKind::Other, + format!("redb get: {}", e), + )), + } + } + + /// Full rebuild: delete existing .rdb and rebuild from entire .idx file. + fn full_rebuild( + db_path: &str, + reader: &mut R, + idx_size: u64, + ) -> io::Result { + let _ = std::fs::remove_file(db_path); let nm = RedbNeedleMap::new(db_path)?; // Collect entries from idx file, resolving duplicates/deletions @@ -383,9 +623,7 @@ impl RedbNeedleMap { })?; // Write all live entries to redb in a single transaction - let txn = nm.db.begin_write().map_err(|e| { - io::Error::new(io::ErrorKind::Other, format!("redb begin_write: {}", e)) - })?; + let txn = Self::begin_write_no_fsync(&nm.db)?; { let mut table = txn.open_table(NEEDLE_TABLE).map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e)) @@ -410,6 +648,8 @@ impl RedbNeedleMap { txn.commit() .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("redb commit: {}", e)))?; + nm.save_idx_size_meta(idx_size)?; + Ok(nm) } @@ -436,9 +676,7 @@ impl RedbNeedleMap { // Read old value for metric update let old = self.get_internal(key_u64)?; - let txn = self.db.begin_write().map_err(|e| { - io::Error::new(io::ErrorKind::Other, format!("redb begin_write: {}", e)) - })?; + let txn = Self::begin_write_no_fsync(&self.db)?; { let mut table = txn.open_table(NEEDLE_TABLE).map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e)) @@ -509,9 +747,7 @@ impl RedbNeedleMap { }; let packed = pack_needle_value(&deleted_nv); - let txn = self.db.begin_write().map_err(|e| { - io::Error::new(io::ErrorKind::Other, format!("redb begin_write: {}", e)) - })?; + let txn = Self::begin_write_no_fsync(&self.db)?; { let mut table = txn.open_table(NEEDLE_TABLE).map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("redb open_table: {}", e))