diff --git a/seaweed-volume/src/storage/needle_map.rs b/seaweed-volume/src/storage/needle_map.rs index 414dc4020..3ab48106c 100644 --- a/seaweed-volume/src/storage/needle_map.rs +++ b/seaweed-volume/src/storage/needle_map.rs @@ -1,7 +1,7 @@ //! NeedleMapper: index mapping NeedleId -> (Offset, Size). //! //! Two implementations: -//! - `CompactNeedleMap`: in-memory HashMap (fast, uses more RAM) +//! - `CompactNeedleMap`: in-memory segmented sorted arrays (~10 bytes/entry) //! - `RedbNeedleMap`: disk-backed via redb (low RAM, slightly slower) //! //! The `NeedleMap` enum wraps both and provides a uniform interface. @@ -13,6 +13,9 @@ use std::io::{self, Read, Seek, Write}; use std::path::Path; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +mod compact_map; +use compact_map::CompactMap; + use redb::{Database, Durability, ReadableDatabase, ReadableTable, TableDefinition}; use crate::storage::idx; @@ -135,10 +138,11 @@ impl IdxFileWriter for std::fs::File { // CompactNeedleMap (in-memory) // ============================================================================ -/// In-memory needle map backed by a HashMap. +/// In-memory needle map backed by a CompactMap (segmented sorted arrays). +/// Uses ~10 bytes per entry instead of ~40-48 bytes with HashMap. /// The .idx file is kept open for append-only writes. pub struct CompactNeedleMap { - map: HashMap, + map: CompactMap, metric: NeedleMapMetric, idx_file: Option>, idx_file_offset: u64, @@ -148,7 +152,7 @@ impl CompactNeedleMap { /// Create a new empty in-memory map. pub fn new() -> Self { CompactNeedleMap { - map: HashMap::new(), + map: CompactMap::new(), metric: NeedleMapMetric::default(), idx_file: None, idx_file_offset: 0, @@ -162,7 +166,7 @@ impl CompactNeedleMap { if offset.is_zero() || size.is_deleted() { nm.delete_from_map(key); } else { - nm.set(key, NeedleValue { offset, size }); + nm.set_internal(key, NeedleValue { offset, size }); } Ok(()) })?; @@ -185,21 +189,20 @@ impl CompactNeedleMap { self.idx_file_offset += NEEDLE_MAP_ENTRY_SIZE as u64; } - let old = self.map.get(&key).cloned(); - let nv = NeedleValue { offset, size }; + let old = self.map.get(key); self.metric.on_put(key, old.as_ref(), size); - self.map.insert(key, nv); + self.map.set(key, offset, size); Ok(()) } /// Look up a needle. pub fn get(&self, key: NeedleId) -> Option { - self.map.get(&key).cloned() + self.map.get(key) } /// Mark a needle as deleted. Appends tombstone to .idx file. pub fn delete(&mut self, key: NeedleId, offset: Offset) -> io::Result> { - if let Some(old) = self.map.get(&key).cloned() { + if let Some(old) = self.map.get(key) { if old.size.is_valid() { // Persist tombstone to idx file BEFORE mutating in-memory state for crash consistency if let Some(ref mut idx_file) = self.idx_file { @@ -208,15 +211,8 @@ impl CompactNeedleMap { } self.metric.on_delete(&old); - let deleted_size = Size(-(old.size.0)); - // Keep original offset so readDeleted can find original data (matching Go behavior) - self.map.insert( - key, - NeedleValue { - offset: old.offset, - size: deleted_size, - }, - ); + // Mark as deleted in compact map (negates size in-place) + self.map.delete(key); return Ok(Some(old.size)); } } @@ -226,26 +222,21 @@ impl CompactNeedleMap { // ---- Internal helpers ---- /// Insert into map during loading (no idx file write). - fn set(&mut self, key: NeedleId, nv: NeedleValue) { - let old = self.map.get(&key).cloned(); + fn set_internal(&mut self, key: NeedleId, nv: NeedleValue) { + let old = self.map.get(key); self.metric.on_put(key, old.as_ref(), nv.size); - self.map.insert(key, nv); + self.map.set(key, nv.offset, nv.size); } /// Remove from map during loading (handle deletions in idx walk). fn delete_from_map(&mut self, key: NeedleId) { self.metric.maybe_set_max_file_key(key); - if let Some(old) = self.map.get(&key).cloned() { + if let Some(old) = self.map.get(key) { if old.size.is_valid() { self.metric.on_delete(&old); } } - self.map.remove(&key); - } - - /// Iterate over all entries in the needle map. - pub fn iter(&self) -> impl Iterator { - self.map.iter() + self.map.remove(key); } // ---- Metrics accessors ---- @@ -290,37 +281,29 @@ impl CompactNeedleMap { /// Save the in-memory map to an index file, sorted by needle ID ascending. pub fn save_to_idx(&self, path: &str) -> io::Result<()> { - let mut entries: Vec<_> = self - .map - .iter() - .filter(|(_, nv)| nv.size.is_valid()) - .collect(); - entries.sort_by_key(|(id, _)| **id); - let mut file = std::fs::OpenOptions::new() .write(true) .create(true) .truncate(true) .open(path)?; - for (id, nv) in entries { - idx::write_index_entry(&mut file, *id, nv.offset, nv.size)?; - } + self.map.ascending_visit(|id, nv| { + if nv.size.is_valid() { + idx::write_index_entry(&mut file, id, nv.offset, nv.size) + } else { + Ok(()) + } + })?; file.sync_all()?; Ok(()) } - /// Visit all live entries in ascending order by needle ID. - pub fn ascending_visit(&self, mut f: F) -> Result<(), String> + /// Visit all entries in ascending order by needle ID. + pub fn ascending_visit(&self, f: F) -> Result<(), String> where F: FnMut(NeedleId, &NeedleValue) -> Result<(), String>, { - let mut entries: Vec<_> = self.map.iter().collect(); - entries.sort_by_key(|(id, _)| **id); - for (&id, nv) in entries { - f(id, nv)?; - } - Ok(()) + self.map.ascending_visit(f) } } @@ -1032,10 +1015,17 @@ impl NeedleMap { } /// Iterate all entries. Returns a Vec of (NeedleId, NeedleValue) pairs. - /// For InMemory this collects from the HashMap; for Redb it reads from disk. + /// For InMemory this collects via ascending visit; for Redb it reads from disk. pub fn iter_entries(&self) -> Vec<(NeedleId, NeedleValue)> { match self { - NeedleMap::InMemory(nm) => nm.iter().map(|(&id, &nv)| (id, nv)).collect(), + NeedleMap::InMemory(nm) => { + let mut entries = Vec::new(); + let _ = nm.ascending_visit(|id, nv| { + entries.push((id, *nv)); + Ok(()) + }); + entries + } NeedleMap::Redb(nm) => nm.collect_entries(), } } diff --git a/seaweed-volume/src/storage/needle_map/compact_map.rs b/seaweed-volume/src/storage/needle_map/compact_map.rs new file mode 100644 index 000000000..dc4552df7 --- /dev/null +++ b/seaweed-volume/src/storage/needle_map/compact_map.rs @@ -0,0 +1,367 @@ +//! CompactMap: memory-efficient in-memory map of NeedleId -> (Offset, Size). +//! +//! Port of Go's CompactMap from weed/storage/needle_map/compact_map.go. +//! Uses segmented sorted arrays with compressed keys (u16 instead of u64) +//! to achieve ~10 bytes per entry instead of ~40-48 bytes with HashMap. +//! +//! NeedleId is split into: chunk = id / SEGMENT_CHUNK_SIZE, compact_key = id % SEGMENT_CHUNK_SIZE. +//! Each segment stores up to SEGMENT_CHUNK_SIZE entries in a sorted Vec, searched via binary search. +//! Best case (ordered inserts): O(1). Worst case: O(log n) per segment. + +use std::collections::HashMap; + +use crate::storage::types::*; +use super::NeedleValue; + +/// Maximum entries per segment. Must be <= u16::MAX (65535). +const SEGMENT_CHUNK_SIZE: u64 = 50_000; + +/// Compact key: only the low bits of NeedleId within a segment. +type CompactKey = u16; + +/// Segment chunk identifier: NeedleId / SEGMENT_CHUNK_SIZE. +type Chunk = u64; + +/// Compact entry: 10 bytes (2 + 4 + 4) vs 16 bytes for full NeedleId + NeedleValue. +#[derive(Clone, Copy)] +struct CompactEntry { + key: CompactKey, // 2 bytes + offset: [u8; OFFSET_SIZE], // 4 bytes + size: Size, // 4 bytes +} + +impl CompactEntry { + fn to_needle_value(&self) -> NeedleValue { + NeedleValue { + offset: Offset::from_bytes(&self.offset), + size: self.size, + } + } +} + +/// A sorted segment of compact entries for a given chunk. +struct Segment { + list: Vec, + chunk: Chunk, + first_key: CompactKey, + last_key: CompactKey, +} + +impl Segment { + fn new(chunk: Chunk) -> Self { + Segment { + list: Vec::new(), + chunk, + first_key: u16::MAX, + last_key: 0, + } + } + + fn compact_key(&self, id: NeedleId) -> CompactKey { + (id.0 - SEGMENT_CHUNK_SIZE * self.chunk) as CompactKey + } + + /// Binary search for a compact key. Returns (index, found). + /// If not found, index is the insertion point. + fn bsearch(&self, id: NeedleId) -> (usize, bool) { + let ck = self.compact_key(id); + + if self.list.is_empty() { + return (0, false); + } + if ck == self.first_key { + return (0, true); + } + if ck < self.first_key { + return (0, false); + } + if ck == self.last_key { + return (self.list.len() - 1, true); + } + if ck > self.last_key { + return (self.list.len(), false); + } + + let i = self.list.partition_point(|e| e.key < ck); + if i < self.list.len() && self.list[i].key == ck { + (i, true) + } else { + (i, false) + } + } + + /// Insert or update. Returns old NeedleValue if updating. + fn set(&mut self, id: NeedleId, offset: Offset, size: Size) -> Option { + let (i, found) = self.bsearch(id); + + if found { + let old = self.list[i].to_needle_value(); + let mut offset_bytes = [0u8; OFFSET_SIZE]; + offset.to_bytes(&mut offset_bytes); + self.list[i].offset = offset_bytes; + self.list[i].size = size; + return Some(old); + } + + // Insert at sorted position + let ck = self.compact_key(id); + let mut offset_bytes = [0u8; OFFSET_SIZE]; + offset.to_bytes(&mut offset_bytes); + + let entry = CompactEntry { + key: ck, + offset: offset_bytes, + size, + }; + + if self.list.len() == SEGMENT_CHUNK_SIZE as usize - 1 { + // Pin capacity to exact size when maxing out + let mut new_list = Vec::with_capacity(SEGMENT_CHUNK_SIZE as usize); + new_list.extend_from_slice(&self.list[..i]); + new_list.push(entry); + new_list.extend_from_slice(&self.list[i..]); + self.list = new_list; + } else { + self.list.insert(i, entry); + } + + if ck < self.first_key { + self.first_key = ck; + } + if ck > self.last_key { + self.last_key = ck; + } + + None + } + + fn get(&self, id: NeedleId) -> Option { + let (i, found) = self.bsearch(id); + if found { + Some(self.list[i].to_needle_value()) + } else { + None + } + } + + /// Mark as deleted by negating size. Returns previous size if not already deleted. + /// Matches Go behavior: checks !IsDeleted() (i.e., size >= 0). + fn delete(&mut self, id: NeedleId) -> Option { + let (i, found) = self.bsearch(id); + if found && !self.list[i].size.is_deleted() { + let old_size = self.list[i].size; + if self.list[i].size.0 == 0 { + self.list[i].size = TOMBSTONE_FILE_SIZE; + } else { + self.list[i].size = Size(-self.list[i].size.0); + } + Some(old_size) + } else { + None + } + } +} + +/// Memory-efficient map of NeedleId -> (Offset, Size). +/// Segments NeedleIds into chunks of 50,000 and stores compact 10-byte entries +/// in sorted arrays, using only 2 bytes for the key within each segment. +pub struct CompactMap { + segments: HashMap, +} + +impl CompactMap { + pub fn new() -> Self { + CompactMap { + segments: HashMap::new(), + } + } + + fn segment_for_key(&mut self, id: NeedleId) -> &mut Segment { + let chunk = id.0 / SEGMENT_CHUNK_SIZE; + self.segments + .entry(chunk) + .or_insert_with(|| Segment::new(chunk)) + } + + /// Insert or update. Returns old NeedleValue if updating. + pub fn set(&mut self, id: NeedleId, offset: Offset, size: Size) -> Option { + let chunk = id.0 / SEGMENT_CHUNK_SIZE; + let segment = self.segments + .entry(chunk) + .or_insert_with(|| Segment::new(chunk)); + segment.set(id, offset, size) + } + + pub fn get(&self, id: NeedleId) -> Option { + let chunk = id.0 / SEGMENT_CHUNK_SIZE; + self.segments.get(&chunk)?.get(id) + } + + /// Mark as deleted. Returns previous size if was valid. + pub fn delete(&mut self, id: NeedleId) -> Option { + let chunk = id.0 / SEGMENT_CHUNK_SIZE; + self.segments.get_mut(&chunk)?.delete(id) + } + + /// Remove entry entirely (used during idx loading). + pub fn remove(&mut self, id: NeedleId) -> Option { + let chunk = id.0 / SEGMENT_CHUNK_SIZE; + let segment = self.segments.get_mut(&chunk)?; + let (i, found) = segment.bsearch(id); + if found { + let entry = segment.list.remove(i); + // Update first/last keys + if segment.list.is_empty() { + segment.first_key = u16::MAX; + segment.last_key = 0; + } else { + segment.first_key = segment.list[0].key; + segment.last_key = segment.list[segment.list.len() - 1].key; + } + Some(entry.to_needle_value()) + } else { + None + } + } + + /// Iterate all entries in ascending NeedleId order. + pub fn ascending_visit(&self, mut f: F) -> Result<(), E> + where + F: FnMut(NeedleId, &NeedleValue) -> Result<(), E>, + { + let mut chunks: Vec = self.segments.keys().copied().collect(); + chunks.sort_unstable(); + + for chunk in chunks { + let segment = &self.segments[&chunk]; + for entry in &segment.list { + let id = NeedleId(SEGMENT_CHUNK_SIZE * segment.chunk + entry.key as u64); + let nv = entry.to_needle_value(); + f(id, &nv)?; + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn offset(v: u32) -> Offset { + let bytes = v.to_be_bytes(); + Offset::from_bytes(&bytes) + } + + #[test] + fn test_compact_map_basic() { + let mut m = CompactMap::new(); + + // Insert + assert!(m.set(NeedleId(1), offset(100), Size(50)).is_none()); + assert!(m.set(NeedleId(2), offset(200), Size(60)).is_none()); + + // Get + let nv = m.get(NeedleId(1)).unwrap(); + assert_eq!(nv.size, Size(50)); + + // Update returns old value + let old = m.set(NeedleId(1), offset(300), Size(70)).unwrap(); + assert_eq!(old.size, Size(50)); + + // Get updated value + let nv = m.get(NeedleId(1)).unwrap(); + assert_eq!(nv.size, Size(70)); + + // Miss + assert!(m.get(NeedleId(999)).is_none()); + } + + #[test] + fn test_compact_map_delete() { + let mut m = CompactMap::new(); + m.set(NeedleId(1), offset(100), Size(50)); + + // Delete returns old size + let old = m.delete(NeedleId(1)).unwrap(); + assert_eq!(old, Size(50)); + + // Get returns deleted (negative size) + let nv = m.get(NeedleId(1)).unwrap(); + assert!(nv.size.is_deleted()); + + // Delete again returns None (already deleted) + assert!(m.delete(NeedleId(1)).is_none()); + } + + #[test] + fn test_compact_map_zero_size_delete() { + let mut m = CompactMap::new(); + m.set(NeedleId(1), offset(100), Size(0)); + + let old = m.delete(NeedleId(1)).unwrap(); + assert_eq!(old, Size(0)); + + let nv = m.get(NeedleId(1)).unwrap(); + assert_eq!(nv.size, TOMBSTONE_FILE_SIZE); + } + + #[test] + fn test_compact_map_cross_segment() { + let mut m = CompactMap::new(); + + // Insert across multiple segments + m.set(NeedleId(1), offset(1), Size(1)); + m.set(NeedleId(50_000), offset(2), Size(2)); + m.set(NeedleId(100_000), offset(3), Size(3)); + + assert_eq!(m.get(NeedleId(1)).unwrap().size, Size(1)); + assert_eq!(m.get(NeedleId(50_000)).unwrap().size, Size(2)); + assert_eq!(m.get(NeedleId(100_000)).unwrap().size, Size(3)); + } + + #[test] + fn test_compact_map_ascending_visit() { + let mut m = CompactMap::new(); + m.set(NeedleId(100_005), offset(3), Size(3)); + m.set(NeedleId(5), offset(1), Size(1)); + m.set(NeedleId(50_005), offset(2), Size(2)); + + let mut visited = Vec::new(); + m.ascending_visit(|id, nv| { + visited.push((id, nv.size)); + Ok::<_, String>(()) + }) + .unwrap(); + + assert_eq!(visited.len(), 3); + assert_eq!(visited[0].0, NeedleId(5)); + assert_eq!(visited[1].0, NeedleId(50_005)); + assert_eq!(visited[2].0, NeedleId(100_005)); + } + + #[test] + fn test_compact_map_remove() { + let mut m = CompactMap::new(); + m.set(NeedleId(1), offset(100), Size(50)); + m.set(NeedleId(2), offset(200), Size(60)); + + let removed = m.remove(NeedleId(1)).unwrap(); + assert_eq!(removed.size, Size(50)); + + assert!(m.get(NeedleId(1)).is_none()); + assert_eq!(m.get(NeedleId(2)).unwrap().size, Size(60)); + } + + #[test] + fn test_compact_map_reverse_insert_order() { + let mut m = CompactMap::new(); + // Insert in reverse order to test sorted insert + for i in (0..100).rev() { + m.set(NeedleId(i), offset(i as u32), Size(i as i32)); + } + for i in 0..100 { + assert_eq!(m.get(NeedleId(i)).unwrap().size, Size(i as i32)); + } + } +}