@ -10,9 +10,10 @@
use std ::collections ::HashMap ;
use std ::io ::{ self , Read , Seek , Write } ;
use std ::path ::Path ;
use std ::sync ::atomic ::{ AtomicI64 , AtomicU64 , Ordering } ;
use redb ::{ Database , ReadableDatabase , ReadableTable , TableDefinition } ;
use redb ::{ Database , Durability , ReadableDatabase , ReadableTable , TableDefinition } ;
use crate ::storage ::idx ;
use crate ::storage ::types ::* ;
@ -330,6 +331,12 @@ impl CompactNeedleMap {
/// redb table: NeedleId (u64) -> packed [offset(4) + size(4)]
const NEEDLE_TABLE : TableDefinition < u64 , & [ u8 ] > = TableDefinition ::new ( "needles" ) ;
/// Metadata table: stores the .idx file size that was used to build this redb.
/// Key "idx_size" -> u64 byte offset. Used to detect whether the .rdb can be
/// reused on restart or needs a full/incremental rebuild.
const META_TABLE : TableDefinition < & str , u64 > = TableDefinition ::new ( "meta" ) ;
const META_IDX_SIZE : & str = "idx_size" ;
/// Disk-backed needle map using redb.
/// Low memory usage — data lives on disk with redb's page cache.
pub struct RedbNeedleMap {
@ -340,6 +347,17 @@ pub struct RedbNeedleMap {
}
impl RedbNeedleMap {
/// Begin a write transaction with `Durability::None` (no fsync).
/// The .idx file is the source of truth for crash recovery, so redb
/// is always rebuilt from .idx on startup — fsync is unnecessary.
fn begin_write_no_fsync ( db : & Database ) -> io ::Result < redb ::WriteTransaction > {
let mut txn = db . begin_write ( ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb begin_write: {}" , e ) )
} ) ? ;
let _ = txn . set_durability ( Durability ::None ) ;
Ok ( txn )
}
/// Create a new redb-backed needle map at the given path.
/// The database file will be created if it does not exist.
pub fn new ( db_path : & str ) -> io ::Result < Self > {
@ -347,14 +365,15 @@ impl RedbNeedleMap {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb create error: {}" , e ) )
} ) ? ;
// Ensure the table exists
let txn = db . begin_write ( ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb begin_write: {}" , e ) )
} ) ? ;
// Ensure tables exist
let txn = Self ::begin_write_no_fsync ( & db ) ? ;
{
let _table = txn . open_table ( NEEDLE_TABLE ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb open_table: {}" , e ) )
} ) ? ;
let _meta = txn . open_table ( META_TABLE ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb open_table meta: {}" , e ) )
} ) ? ;
}
txn . commit ( )
. map_err ( | e | io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb commit: {}" , e ) ) ) ? ;
@ -367,8 +386,229 @@ impl RedbNeedleMap {
} )
}
/// Load from an .idx file, populating the redb database.
/// Save the .idx file size into redb metadata so we can detect whether
/// the .rdb is up-to-date on the next startup.
fn save_idx_size_meta ( & self , idx_size : u64 ) -> io ::Result < ( ) > {
let txn = Self ::begin_write_no_fsync ( & self . db ) ? ;
{
let mut meta = txn . open_table ( META_TABLE ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb open meta: {}" , e ) )
} ) ? ;
meta . insert ( META_IDX_SIZE , idx_size ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb insert meta: {}" , e ) )
} ) ? ;
}
txn . commit ( )
. map_err ( | e | io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb commit meta: {}" , e ) ) ) ? ;
Ok ( ( ) )
}
/// Read the stored .idx file size from redb metadata.
fn read_idx_size_meta ( & self ) -> io ::Result < Option < u64 > > {
let txn = self . db . begin_read ( ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb begin_read: {}" , e ) )
} ) ? ;
let meta = txn . open_table ( META_TABLE ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb open meta: {}" , e ) )
} ) ? ;
match meta . get ( META_IDX_SIZE ) {
Ok ( Some ( guard ) ) = > Ok ( Some ( guard . value ( ) ) ) ,
Ok ( None ) = > Ok ( None ) ,
Err ( e ) = > Err ( io ::Error ::new (
io ::ErrorKind ::Other ,
format ! ( "redb get meta: {}" , e ) ,
) ) ,
}
}
/// Rebuild metrics by scanning all entries in the redb table.
/// Called when reusing an existing .rdb without a full rebuild.
fn rebuild_metrics_from_db ( & self ) -> io ::Result < ( ) > {
let txn = self . db . begin_read ( ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb begin_read: {}" , e ) )
} ) ? ;
let table = txn . open_table ( NEEDLE_TABLE ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb open_table: {}" , e ) )
} ) ? ;
let iter = table . iter ( ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb iter: {}" , e ) )
} ) ? ;
for entry in iter {
let ( key_guard , val_guard ) = entry . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb iter next: {}" , e ) )
} ) ? ;
let key = NeedleId ( key_guard . value ( ) ) ;
let bytes : & [ u8 ] = val_guard . value ( ) ;
if bytes . len ( ) = = 8 {
let mut arr = [ 0 u8 ; 8 ] ;
arr . copy_from_slice ( bytes ) ;
let nv = unpack_needle_value ( & arr ) ;
self . metric . maybe_set_max_file_key ( key ) ;
if nv . size . is_valid ( ) {
self . metric . file_count . fetch_add ( 1 , Ordering ::Relaxed ) ;
self . metric
. file_byte_count
. fetch_add ( nv . size . 0 as u64 , Ordering ::Relaxed ) ;
} else {
// Deleted entry (negative size)
self . metric . deletion_count . fetch_add ( 1 , Ordering ::Relaxed ) ;
self . metric
. deletion_byte_count
. fetch_add ( ( - nv . size . 0 ) as u64 , Ordering ::Relaxed ) ;
}
}
}
Ok ( ( ) )
}
/// Load from an .idx file, reusing an existing .rdb if it is consistent.
///
/// Strategy:
/// 1. Try to open existing .rdb and read its stored .idx size
/// 2. If .idx size matches → reuse .rdb, rebuild metrics from scan
/// 3. If .idx is larger → replay new entries incrementally
/// 4. Otherwise (missing, corrupted, .idx smaller) → full rebuild
pub fn load_from_idx < R : Read + Seek > ( db_path : & str , reader : & mut R ) -> io ::Result < Self > {
let idx_size = reader . seek ( io ::SeekFrom ::End ( 0 ) ) ? ;
reader . seek ( io ::SeekFrom ::Start ( 0 ) ) ? ;
// Try to reuse existing .rdb
if Path ::new ( db_path ) . exists ( ) {
if let Ok ( nm ) = Self ::try_reuse_rdb ( db_path , reader , idx_size ) {
return Ok ( nm ) ;
}
// Reuse failed — fall through to full rebuild
reader . seek ( io ::SeekFrom ::Start ( 0 ) ) ? ;
}
Self ::full_rebuild ( db_path , reader , idx_size )
}
/// Try to reuse an existing .rdb file. Returns Ok if successful,
/// Err if a full rebuild is needed.
fn try_reuse_rdb < R : Read + Seek > (
db_path : & str ,
reader : & mut R ,
idx_size : u64 ,
) -> io ::Result < Self > {
let db = Database ::open ( db_path ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb open: {}" , e ) )
} ) ? ;
let nm = RedbNeedleMap {
db ,
metric : NeedleMapMetric ::default ( ) ,
idx_file : None ,
idx_file_offset : 0 ,
} ;
let stored_idx_size = nm . read_idx_size_meta ( ) ? . ok_or_else ( | | {
io ::Error ::new ( io ::ErrorKind ::Other , "no idx_size in redb meta" )
} ) ? ;
if stored_idx_size > idx_size {
// .idx shrank — corrupted or truncated, need full rebuild
return Err ( io ::Error ::new (
io ::ErrorKind ::Other ,
"idx file smaller than stored size" ,
) ) ;
}
// Rebuild metrics from existing data
nm . rebuild_metrics_from_db ( ) ? ;
if stored_idx_size < idx_size {
// .idx grew — replay new entries incrementally
reader . seek ( io ::SeekFrom ::Start ( stored_idx_size ) ) ? ;
let txn = Self ::begin_write_no_fsync ( & nm . db ) ? ;
{
let mut table = txn . open_table ( NEEDLE_TABLE ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb open_table: {}" , e ) )
} ) ? ;
idx ::walk_index_file ( reader , stored_idx_size , | key , offset , size | {
let key_u64 : u64 = key . into ( ) ;
if offset . is_zero ( ) | | size . is_deleted ( ) {
// Delete: look up old value for metric update, then
// store tombstone (negative size with original offset)
if let Ok ( Some ( old ) ) = nm . get_via_table ( & table , key_u64 ) {
if old . size . is_valid ( ) {
nm . metric . on_delete ( & old ) ;
let deleted_nv = NeedleValue {
offset : old . offset ,
size : Size ( - ( old . size . 0 ) ) ,
} ;
let packed = pack_needle_value ( & deleted_nv ) ;
table
. insert ( key_u64 , packed . as_slice ( ) )
. map_err ( | e | {
io ::Error ::new (
io ::ErrorKind ::Other ,
format ! ( "redb insert: {}" , e ) ,
)
} ) ? ;
}
}
} else {
// Put: look up old value for metric update
let old = nm . get_via_table ( & table , key_u64 ) . ok ( ) . flatten ( ) ;
let nv = NeedleValue { offset , size } ;
let packed = pack_needle_value ( & nv ) ;
table
. insert ( key_u64 , packed . as_slice ( ) )
. map_err ( | e | {
io ::Error ::new (
io ::ErrorKind ::Other ,
format ! ( "redb insert: {}" , e ) ,
)
} ) ? ;
nm . metric . on_put ( key , old . as_ref ( ) , size ) ;
}
Ok ( ( ) )
} ) ? ;
}
txn . commit ( ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb commit: {}" , e ) )
} ) ? ;
nm . save_idx_size_meta ( idx_size ) ? ;
}
Ok ( nm )
}
/// Look up a needle value using an already-open table reference.
/// Used during incremental replay to avoid opening separate read transactions.
fn get_via_table (
& self ,
table : & redb ::Table < u64 , & [ u8 ] > ,
key_u64 : u64 ,
) -> io ::Result < Option < NeedleValue > > {
match table . get ( key_u64 ) {
Ok ( Some ( guard ) ) = > {
let bytes : & [ u8 ] = guard . value ( ) ;
if bytes . len ( ) = = 8 {
let mut arr = [ 0 u8 ; 8 ] ;
arr . copy_from_slice ( bytes ) ;
Ok ( Some ( unpack_needle_value ( & arr ) ) )
} else {
Ok ( None )
}
}
Ok ( None ) = > Ok ( None ) ,
Err ( e ) = > Err ( io ::Error ::new (
io ::ErrorKind ::Other ,
format ! ( "redb get: {}" , e ) ,
) ) ,
}
}
/// Full rebuild: delete existing .rdb and rebuild from entire .idx file.
fn full_rebuild < R : Read + Seek > (
db_path : & str ,
reader : & mut R ,
idx_size : u64 ,
) -> io ::Result < Self > {
let _ = std ::fs ::remove_file ( db_path ) ;
let nm = RedbNeedleMap ::new ( db_path ) ? ;
// Collect entries from idx file, resolving duplicates/deletions
@ -383,9 +623,7 @@ impl RedbNeedleMap {
} ) ? ;
// Write all live entries to redb in a single transaction
let txn = nm . db . begin_write ( ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb begin_write: {}" , e ) )
} ) ? ;
let txn = Self ::begin_write_no_fsync ( & nm . db ) ? ;
{
let mut table = txn . open_table ( NEEDLE_TABLE ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb open_table: {}" , e ) )
@ -410,6 +648,8 @@ impl RedbNeedleMap {
txn . commit ( )
. map_err ( | e | io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb commit: {}" , e ) ) ) ? ;
nm . save_idx_size_meta ( idx_size ) ? ;
Ok ( nm )
}
@ -436,9 +676,7 @@ impl RedbNeedleMap {
// Read old value for metric update
let old = self . get_internal ( key_u64 ) ? ;
let txn = self . db . begin_write ( ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb begin_write: {}" , e ) )
} ) ? ;
let txn = Self ::begin_write_no_fsync ( & self . db ) ? ;
{
let mut table = txn . open_table ( NEEDLE_TABLE ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb open_table: {}" , e ) )
@ -509,9 +747,7 @@ impl RedbNeedleMap {
} ;
let packed = pack_needle_value ( & deleted_nv ) ;
let txn = self . db . begin_write ( ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb begin_write: {}" , e ) )
} ) ? ;
let txn = Self ::begin_write_no_fsync ( & self . db ) ? ;
{
let mut table = txn . open_table ( NEEDLE_TABLE ) . map_err ( | e | {
io ::Error ::new ( io ::ErrorKind ::Other , format ! ( "redb open_table: {}" , e ) )