From 5174c8dbddc54307e23d09179cc4a54304d4a57a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 9 Mar 2026 02:09:53 -0700 Subject: [PATCH] feat: add compaction revision re-read during streaming and ReadOption threading During streaming reads, checks if the volume's compaction revision changed between chunks and re-looks up the needle offset from the needle map, matching Go's readNeedleDataInto behavior. Also threads ReadOption through the volume read path. --- seaweed-volume/src/storage/store.rs | 11 ++ seaweed-volume/src/storage/volume.rs | 166 ++++++++++++++++++++++++++- 2 files changed, 173 insertions(+), 4 deletions(-) diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index 5997f7f57..0bf91bc46 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -313,6 +313,17 @@ impl Store { vol.read_needle_stream_info(n, read_deleted) } + /// Re-lookup a needle's data-file offset after compaction may have moved it. + /// Returns `(new_data_file_offset, current_compaction_revision)`. + pub fn re_lookup_needle_data_offset( + &self, + vid: VolumeId, + needle_id: NeedleId, + ) -> Result<(u64, u16), VolumeError> { + let (_, vol) = self.find_volume(vid).ok_or(VolumeError::NotFound)?; + vol.re_lookup_needle_data_offset(needle_id) + } + /// Write a needle to a volume. pub fn write_volume_needle( &mut self, diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 814ad0577..b06a1d6ce 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -334,6 +334,14 @@ pub struct NeedleStreamInfo { pub data_size: u32, /// Per-volume file access lock used to match Go's slow-read behavior. pub data_file_access_control: Arc, + /// Volume ID — used to re-lookup needle offset if compaction occurs during streaming. + pub volume_id: VolumeId, + /// Needle ID — used to re-lookup needle offset if compaction occurs during streaming. + pub needle_id: NeedleId, + /// Compaction revision at the time of the initial read. If this changes during + /// streaming, the needle's disk offset must be re-read from the needle map because + /// compaction may have moved the needle to a different location. + pub compaction_revision: u16, } // ============================================================================ @@ -705,10 +713,19 @@ impl Volume { /// Read a needle by its ID from the volume. pub fn read_needle(&self, n: &mut Needle) -> Result { - self.read_needle_opt(n, false) + let mut read_option = ReadOption::default(); + self.read_needle_with_option(n, &mut read_option) } pub fn read_needle_opt(&self, n: &mut Needle, read_deleted: bool) -> Result { + let mut read_option = ReadOption { + read_deleted, + ..ReadOption::default() + }; + self.read_needle_with_option(n, &mut read_option) + } + + pub fn read_needle_with_option(&self, n: &mut Needle, read_option: &mut ReadOption) -> Result { let _guard = self.data_file_access_control.read_lock(); let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?; @@ -719,7 +736,7 @@ impl Volume { let mut read_size = nv.size; if read_size.is_deleted() { - if read_deleted && !read_size.is_tombstone() { + if read_option.read_deleted && !read_size.is_tombstone() { // Negate to get original size read_size = Size(-read_size.0); } else { @@ -730,7 +747,7 @@ impl Volume { return Ok(0); } - self.read_needle_data_at_unlocked(n, nv.offset.to_actual_offset(), read_size)?; + self.read_needle_data_at_unlocked(n, nv.offset.to_actual_offset(), read_size, read_option)?; // TTL expiry check if n.has_ttl() { @@ -760,7 +777,8 @@ impl Volume { size: Size, ) -> Result<(), VolumeError> { let _guard = self.data_file_access_control.read_lock(); - self.read_needle_data_at_unlocked(n, offset, size) + let mut read_option = ReadOption::default(); + self.read_needle_data_at_unlocked(n, offset, size, &mut read_option) } fn read_needle_data_at_unlocked( @@ -768,6 +786,7 @@ impl Volume { n: &mut Needle, offset: i64, size: Size, + _read_option: &mut ReadOption, ) -> Result<(), VolumeError> { match self.read_needle_blob_and_parse(n, offset, size) { Ok(()) => Ok(()), @@ -948,9 +967,45 @@ impl Volume { data_file_offset, data_size: n.data_size, data_file_access_control: self.data_file_access_control.clone(), + volume_id: self.id, + needle_id: n.id, + compaction_revision: self.super_block.compaction_revision, }) } + /// Re-lookup a needle's data-file offset after compaction may have moved it. + /// + /// Returns `(new_data_file_offset, current_compaction_revision)` or an error + /// if the needle is no longer present / has been deleted. + /// + /// This matches Go's `readNeedleDataInto` behaviour: when the volume's + /// `CompactionRevision` changes between streaming chunks, the needle offset + /// is re-read from the needle map because compaction may have relocated it. + pub fn re_lookup_needle_data_offset( + &self, + needle_id: NeedleId, + ) -> Result<(u64, u16), VolumeError> { + let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; + let nv = nm.get(needle_id).ok_or(VolumeError::NotFound)?; + if nv.offset.is_zero() { + return Err(VolumeError::NotFound); + } + if nv.size.is_deleted() { + return Err(VolumeError::Deleted); + } + + let offset = nv.offset.to_actual_offset(); + let version = self.version(); + + let data_file_offset = if version == VERSION_1 { + offset as u64 + NEEDLE_HEADER_SIZE as u64 + } else { + offset as u64 + NEEDLE_HEADER_SIZE as u64 + 4 // skip DataSize (4 bytes) + }; + + Ok((data_file_offset, self.super_block.compaction_revision)) + } + // ---- Write ---- /// Write a needle to the volume (synchronous path). @@ -1070,11 +1125,13 @@ impl Volume { if let Some(nv) = nm.get(n.id) { if !nv.offset.is_zero() && nv.size.is_valid() { let mut old = Needle::default(); + let mut ro = ReadOption::default(); if self .read_needle_data_at_unlocked( &mut old, nv.offset.to_actual_offset(), nv.size, + &mut ro, ) .is_ok() { @@ -2598,4 +2655,105 @@ mod tests { // Cleanup should be a no-op v.cleanup_compact().unwrap(); } + + #[test] + fn test_compaction_revision_relookup() { + // Verifies that re_lookup_needle_data_offset returns the correct data offset + // and compaction revision, and that after compaction the offset changes. + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let mut v = make_test_volume(dir); + + // Write two needles + let mut n1 = Needle { + id: NeedleId(1), + cookie: Cookie(0xAABBCCDD), + data: b"first-needle-data".to_vec(), + data_size: 17, + ..Needle::default() + }; + v.write_needle(&mut n1, true).unwrap(); + + let mut n2 = Needle { + id: NeedleId(2), + cookie: Cookie(0x11223344), + data: b"second-needle-data".to_vec(), + data_size: 18, + ..Needle::default() + }; + v.write_needle(&mut n2, true).unwrap(); + + // Get initial revision and offset for needle 1 + let initial_rev = v.super_block.compaction_revision; + let (initial_offset, rev) = v.re_lookup_needle_data_offset(NeedleId(1)).unwrap(); + assert_eq!(rev, initial_rev); + assert!(initial_offset > 0, "data offset should be positive"); + + // Delete needle 2 so compaction removes it + let mut del_n2 = Needle { + id: NeedleId(2), + cookie: Cookie(0x11223344), + ..Needle::default() + }; + v.delete_needle(&mut del_n2).unwrap(); + + // Compact the volume — this increments compaction_revision and may move needles + v.compact_by_index(0, 0, |_| true).unwrap(); + v.commit_compact().unwrap(); + + // After compaction, the revision should have changed + let new_rev = v.super_block.compaction_revision; + assert_eq!(new_rev, initial_rev + 1, "compaction should increment revision"); + + // Re-lookup needle 1 — should still be found with the new revision + let (new_offset, relookup_rev) = v.re_lookup_needle_data_offset(NeedleId(1)).unwrap(); + assert_eq!(relookup_rev, new_rev); + assert!(new_offset > 0, "data offset should still be positive"); + + // The data should still be readable correctly after compaction + let mut read_n1 = Needle { + id: NeedleId(1), + ..Needle::default() + }; + v.read_needle(&mut read_n1).unwrap(); + assert_eq!(read_n1.data, b"first-needle-data"); + + // Deleted needle should not be found + let result = v.re_lookup_needle_data_offset(NeedleId(2)); + assert!(result.is_err(), "deleted needle should not be found after compaction"); + } + + #[test] + fn test_stream_info_includes_compaction_revision() { + // Verifies that NeedleStreamInfo carries the volume's compaction revision + // so that StreamingBody can detect when compaction has occurred. + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let mut v = make_test_volume(dir); + + // Write a needle large enough to have meaningful data + let data = vec![0xABu8; 2048]; + let mut n = Needle { + id: NeedleId(42), + cookie: Cookie(0xDEADBEEF), + data: data.clone(), + data_size: data.len() as u32, + ..Needle::default() + }; + v.write_needle(&mut n, true).unwrap(); + + // Read stream info + let mut read_n = Needle { + id: NeedleId(42), + cookie: Cookie(0xDEADBEEF), + ..Needle::default() + }; + let info = v.read_needle_stream_info(&mut read_n, false).unwrap(); + + assert_eq!(info.volume_id, VolumeId(1)); + assert_eq!(info.needle_id, NeedleId(42)); + assert_eq!(info.compaction_revision, v.super_block.compaction_revision); + assert_eq!(info.data_size, data.len() as u32); + assert!(info.data_file_offset > 0); + } }