@ -334,6 +334,14 @@ pub struct NeedleStreamInfo {
pub data_size : u32 ,
pub data_size : u32 ,
/// Per-volume file access lock used to match Go's slow-read behavior.
/// Per-volume file access lock used to match Go's slow-read behavior.
pub data_file_access_control : Arc < DataFileAccessControl > ,
pub data_file_access_control : Arc < DataFileAccessControl > ,
/// Volume ID — used to re-lookup needle offset if compaction occurs during streaming.
pub volume_id : VolumeId ,
/// Needle ID — used to re-lookup needle offset if compaction occurs during streaming.
pub needle_id : NeedleId ,
/// Compaction revision at the time of the initial read. If this changes during
/// streaming, the needle's disk offset must be re-read from the needle map because
/// compaction may have moved the needle to a different location.
pub compaction_revision : u16 ,
}
}
// ============================================================================
// ============================================================================
@ -705,10 +713,19 @@ impl Volume {
/// Read a needle by its ID from the volume.
/// Read a needle by its ID from the volume.
pub fn read_needle ( & self , n : & mut Needle ) -> Result < i32 , VolumeError > {
pub fn read_needle ( & self , n : & mut Needle ) -> Result < i32 , VolumeError > {
self . read_needle_opt ( n , false )
let mut read_option = ReadOption ::default ( ) ;
self . read_needle_with_option ( n , & mut read_option )
}
}
pub fn read_needle_opt ( & self , n : & mut Needle , read_deleted : bool ) -> Result < i32 , VolumeError > {
pub fn read_needle_opt ( & self , n : & mut Needle , read_deleted : bool ) -> Result < i32 , VolumeError > {
let mut read_option = ReadOption {
read_deleted ,
. . ReadOption ::default ( )
} ;
self . read_needle_with_option ( n , & mut read_option )
}
pub fn read_needle_with_option ( & self , n : & mut Needle , read_option : & mut ReadOption ) -> Result < i32 , VolumeError > {
let _guard = self . data_file_access_control . read_lock ( ) ;
let _guard = self . data_file_access_control . read_lock ( ) ;
let nm = self . nm . as_ref ( ) . ok_or ( VolumeError ::NotFound ) ? ;
let nm = self . nm . as_ref ( ) . ok_or ( VolumeError ::NotFound ) ? ;
let nv = nm . get ( n . id ) . ok_or ( VolumeError ::NotFound ) ? ;
let nv = nm . get ( n . id ) . ok_or ( VolumeError ::NotFound ) ? ;
@ -719,7 +736,7 @@ impl Volume {
let mut read_size = nv . size ;
let mut read_size = nv . size ;
if read_size . is_deleted ( ) {
if read_size . is_deleted ( ) {
if read_deleted & & ! read_size . is_tombstone ( ) {
if read_option . read_ deleted & & ! read_size . is_tombstone ( ) {
// Negate to get original size
// Negate to get original size
read_size = Size ( - read_size . 0 ) ;
read_size = Size ( - read_size . 0 ) ;
} else {
} else {
@ -730,7 +747,7 @@ impl Volume {
return Ok ( 0 ) ;
return Ok ( 0 ) ;
}
}
self . read_needle_data_at_unlocked ( n , nv . offset . to_actual_offset ( ) , read_size ) ? ;
self . read_needle_data_at_unlocked ( n , nv . offset . to_actual_offset ( ) , read_size , read_option ) ? ;
// TTL expiry check
// TTL expiry check
if n . has_ttl ( ) {
if n . has_ttl ( ) {
@ -760,7 +777,8 @@ impl Volume {
size : Size ,
size : Size ,
) -> Result < ( ) , VolumeError > {
) -> Result < ( ) , VolumeError > {
let _guard = self . data_file_access_control . read_lock ( ) ;
let _guard = self . data_file_access_control . read_lock ( ) ;
self . read_needle_data_at_unlocked ( n , offset , size )
let mut read_option = ReadOption ::default ( ) ;
self . read_needle_data_at_unlocked ( n , offset , size , & mut read_option )
}
}
fn read_needle_data_at_unlocked (
fn read_needle_data_at_unlocked (
@ -768,6 +786,7 @@ impl Volume {
n : & mut Needle ,
n : & mut Needle ,
offset : i64 ,
offset : i64 ,
size : Size ,
size : Size ,
_read_option : & mut ReadOption ,
) -> Result < ( ) , VolumeError > {
) -> Result < ( ) , VolumeError > {
match self . read_needle_blob_and_parse ( n , offset , size ) {
match self . read_needle_blob_and_parse ( n , offset , size ) {
Ok ( ( ) ) = > Ok ( ( ) ) ,
Ok ( ( ) ) = > Ok ( ( ) ) ,
@ -948,9 +967,45 @@ impl Volume {
data_file_offset ,
data_file_offset ,
data_size : n . data_size ,
data_size : n . data_size ,
data_file_access_control : self . data_file_access_control . clone ( ) ,
data_file_access_control : self . data_file_access_control . clone ( ) ,
volume_id : self . id ,
needle_id : n . id ,
compaction_revision : self . super_block . compaction_revision ,
} )
} )
}
}
/// Re-lookup a needle's data-file offset after compaction may have moved it.
///
/// Returns `(new_data_file_offset, current_compaction_revision)` or an error
/// if the needle is no longer present / has been deleted.
///
/// This matches Go's `readNeedleDataInto` behaviour: when the volume's
/// `CompactionRevision` changes between streaming chunks, the needle offset
/// is re-read from the needle map because compaction may have relocated it.
pub fn re_lookup_needle_data_offset (
& self ,
needle_id : NeedleId ,
) -> Result < ( u64 , u16 ) , VolumeError > {
let nm = self . nm . as_ref ( ) . ok_or ( VolumeError ::NotFound ) ? ;
let nv = nm . get ( needle_id ) . ok_or ( VolumeError ::NotFound ) ? ;
if nv . offset . is_zero ( ) {
return Err ( VolumeError ::NotFound ) ;
}
if nv . size . is_deleted ( ) {
return Err ( VolumeError ::Deleted ) ;
}
let offset = nv . offset . to_actual_offset ( ) ;
let version = self . version ( ) ;
let data_file_offset = if version = = VERSION_1 {
offset as u64 + NEEDLE_HEADER_SIZE as u64
} else {
offset as u64 + NEEDLE_HEADER_SIZE as u64 + 4 // skip DataSize (4 bytes)
} ;
Ok ( ( data_file_offset , self . super_block . compaction_revision ) )
}
// ---- Write ----
// ---- Write ----
/// Write a needle to the volume (synchronous path).
/// Write a needle to the volume (synchronous path).
@ -1070,11 +1125,13 @@ impl Volume {
if let Some ( nv ) = nm . get ( n . id ) {
if let Some ( nv ) = nm . get ( n . id ) {
if ! nv . offset . is_zero ( ) & & nv . size . is_valid ( ) {
if ! nv . offset . is_zero ( ) & & nv . size . is_valid ( ) {
let mut old = Needle ::default ( ) ;
let mut old = Needle ::default ( ) ;
let mut ro = ReadOption ::default ( ) ;
if self
if self
. read_needle_data_at_unlocked (
. read_needle_data_at_unlocked (
& mut old ,
& mut old ,
nv . offset . to_actual_offset ( ) ,
nv . offset . to_actual_offset ( ) ,
nv . size ,
nv . size ,
& mut ro ,
)
)
. is_ok ( )
. is_ok ( )
{
{
@ -2598,4 +2655,105 @@ mod tests {
// Cleanup should be a no-op
// Cleanup should be a no-op
v . cleanup_compact ( ) . unwrap ( ) ;
v . cleanup_compact ( ) . unwrap ( ) ;
}
}
#[ test ]
fn test_compaction_revision_relookup ( ) {
// Verifies that re_lookup_needle_data_offset returns the correct data offset
// and compaction revision, and that after compaction the offset changes.
let tmp = TempDir ::new ( ) . unwrap ( ) ;
let dir = tmp . path ( ) . to_str ( ) . unwrap ( ) ;
let mut v = make_test_volume ( dir ) ;
// Write two needles
let mut n1 = Needle {
id : NeedleId ( 1 ) ,
cookie : Cookie ( 0xAABBCCDD ) ,
data : b "first-needle-data" . to_vec ( ) ,
data_size : 17 ,
. . Needle ::default ( )
} ;
v . write_needle ( & mut n1 , true ) . unwrap ( ) ;
let mut n2 = Needle {
id : NeedleId ( 2 ) ,
cookie : Cookie ( 0x11223344 ) ,
data : b "second-needle-data" . to_vec ( ) ,
data_size : 18 ,
. . Needle ::default ( )
} ;
v . write_needle ( & mut n2 , true ) . unwrap ( ) ;
// Get initial revision and offset for needle 1
let initial_rev = v . super_block . compaction_revision ;
let ( initial_offset , rev ) = v . re_lookup_needle_data_offset ( NeedleId ( 1 ) ) . unwrap ( ) ;
assert_eq ! ( rev , initial_rev ) ;
assert ! ( initial_offset > 0 , "data offset should be positive" ) ;
// Delete needle 2 so compaction removes it
let mut del_n2 = Needle {
id : NeedleId ( 2 ) ,
cookie : Cookie ( 0x11223344 ) ,
. . Needle ::default ( )
} ;
v . delete_needle ( & mut del_n2 ) . unwrap ( ) ;
// Compact the volume — this increments compaction_revision and may move needles
v . compact_by_index ( 0 , 0 , | _ | true ) . unwrap ( ) ;
v . commit_compact ( ) . unwrap ( ) ;
// After compaction, the revision should have changed
let new_rev = v . super_block . compaction_revision ;
assert_eq ! ( new_rev , initial_rev + 1 , "compaction should increment revision" ) ;
// Re-lookup needle 1 — should still be found with the new revision
let ( new_offset , relookup_rev ) = v . re_lookup_needle_data_offset ( NeedleId ( 1 ) ) . unwrap ( ) ;
assert_eq ! ( relookup_rev , new_rev ) ;
assert ! ( new_offset > 0 , "data offset should still be positive" ) ;
// The data should still be readable correctly after compaction
let mut read_n1 = Needle {
id : NeedleId ( 1 ) ,
. . Needle ::default ( )
} ;
v . read_needle ( & mut read_n1 ) . unwrap ( ) ;
assert_eq ! ( read_n1 . data , b" first-needle-data " ) ;
// Deleted needle should not be found
let result = v . re_lookup_needle_data_offset ( NeedleId ( 2 ) ) ;
assert ! ( result . is_err ( ) , "deleted needle should not be found after compaction" ) ;
}
#[ test ]
fn test_stream_info_includes_compaction_revision ( ) {
// Verifies that NeedleStreamInfo carries the volume's compaction revision
// so that StreamingBody can detect when compaction has occurred.
let tmp = TempDir ::new ( ) . unwrap ( ) ;
let dir = tmp . path ( ) . to_str ( ) . unwrap ( ) ;
let mut v = make_test_volume ( dir ) ;
// Write a needle large enough to have meaningful data
let data = vec ! [ 0xAB u8 ; 2048 ] ;
let mut n = Needle {
id : NeedleId ( 42 ) ,
cookie : Cookie ( 0xDEADBEEF ) ,
data : data . clone ( ) ,
data_size : data . len ( ) as u32 ,
. . Needle ::default ( )
} ;
v . write_needle ( & mut n , true ) . unwrap ( ) ;
// Read stream info
let mut read_n = Needle {
id : NeedleId ( 42 ) ,
cookie : Cookie ( 0xDEADBEEF ) ,
. . Needle ::default ( )
} ;
let info = v . read_needle_stream_info ( & mut read_n , false ) . unwrap ( ) ;
assert_eq ! ( info . volume_id , VolumeId ( 1 ) ) ;
assert_eq ! ( info . needle_id , NeedleId ( 42 ) ) ;
assert_eq ! ( info . compaction_revision , v . super_block . compaction_revision ) ;
assert_eq ! ( info . data_size , data . len ( ) as u32 ) ;
assert ! ( info . data_file_offset > 0 ) ;
}
}
}