You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

2303 lines
74 KiB

//! Volume: the core storage unit — a .dat file + .idx index.
//!
//! Each volume contains many needles (files). It manages:
//! - Reading/writing/deleting needles from the .dat file
//! - Maintaining the in-memory NeedleMap (NeedleId → Offset+Size)
//! - SuperBlock at offset 0 of the .dat file
//! - Metrics (file count, content size, deleted count)
//!
//! Matches Go's storage/volume.go, volume_loading.go, volume_read.go,
//! volume_write.go, volume_super_block.go.
use std::fs::{self, File, OpenOptions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::Arc;
use std::sync::{Condvar, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::warn;
use crate::storage::needle::needle::{self, get_actual_size, Needle, NeedleError};
use crate::storage::needle_map::{CompactNeedleMap, NeedleMap, NeedleMapKind, RedbNeedleMap};
use crate::storage::super_block::{ReplicaPlacement, SuperBlock, SUPER_BLOCK_SIZE};
use crate::storage::types::*;
// ============================================================================
// Errors
// ============================================================================
#[derive(Debug, thiserror::Error)]
pub enum VolumeError {
#[error("not found")]
NotFound,
#[error("already deleted")]
Deleted,
#[error("needle size mismatch")]
SizeMismatch,
#[error("unsupported version: {0}")]
UnsupportedVersion(u8),
#[error("cookie mismatch: {0:#x}")]
CookieMismatch(u32),
#[error("volume not empty")]
NotEmpty,
#[error("volume already exists")]
AlreadyExists,
#[error("volume is read-only")]
ReadOnly,
#[error("volume size limit exceeded: current {current}, limit {limit}")]
SizeLimitExceeded { current: u64, limit: u64 },
#[error("volume not initialized")]
NotInitialized,
#[error("needle error: {0}")]
Needle(#[from] NeedleError),
#[error("super block error: {0}")]
SuperBlock(#[from] crate::storage::super_block::SuperBlockError),
#[error("IO error: {0}")]
Io(#[from] io::Error),
}
// ============================================================================
// VolumeInfo (.vif persistence)
// ============================================================================
/// Legacy simple VolumeInfo for backward compat with old .vif files.
#[derive(serde::Serialize, serde::Deserialize)]
struct VolumeInfo {
read_only: bool,
}
pub use crate::pb::volume_server_pb::RemoteFile as PbRemoteFile;
/// Protobuf VolumeInfo type alias.
pub use crate::pb::volume_server_pb::VolumeInfo as PbVolumeInfo;
/// Helper module for deserializing protojson uint64 fields that may be strings or numbers.
mod string_or_u64 {
use serde::{self, Deserialize, Deserializer, Serializer};
pub fn serialize<S>(value: &u64, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// Emit as string to match Go's protojson format for uint64
serializer.serialize_str(&value.to_string())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<u64, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum StringOrNum {
Str(String),
Num(u64),
}
match StringOrNum::deserialize(deserializer)? {
StringOrNum::Str(s) => s.parse::<u64>().map_err(serde::de::Error::custom),
StringOrNum::Num(n) => Ok(n),
}
}
}
mod string_or_i64 {
use serde::{self, Deserialize, Deserializer, Serializer};
pub fn serialize<S>(value: &i64, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&value.to_string())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<i64, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum StringOrNum {
Str(String),
Num(i64),
}
match StringOrNum::deserialize(deserializer)? {
StringOrNum::Str(s) => s.parse::<i64>().map_err(serde::de::Error::custom),
StringOrNum::Num(n) => Ok(n),
}
}
}
/// Serde-compatible representation of RemoteFile for .vif JSON serialization.
/// Field names use snake_case to match Go's protobuf JSON output (jsonpb).
#[derive(serde::Serialize, serde::Deserialize, Default, Clone)]
pub struct VifRemoteFile {
#[serde(default, rename = "backendType")]
pub backend_type: String,
#[serde(default, rename = "backendId")]
pub backend_id: String,
#[serde(default)]
pub key: String,
#[serde(default, with = "string_or_u64")]
pub offset: u64,
#[serde(default, rename = "fileSize", with = "string_or_u64")]
pub file_size: u64,
#[serde(default, rename = "modifiedTime", with = "string_or_u64")]
pub modified_time: u64,
#[serde(default)]
pub extension: String,
}
/// Serde-compatible representation of VolumeInfo for .vif JSON serialization.
/// Matches Go's protobuf JSON format (jsonpb with EmitUnpopulated=true).
#[derive(serde::Serialize, serde::Deserialize, Default, Clone)]
pub struct VifVolumeInfo {
#[serde(default)]
pub files: Vec<VifRemoteFile>,
#[serde(default)]
pub version: u32,
#[serde(default)]
pub replication: String,
#[serde(default, rename = "bytesOffset")]
pub bytes_offset: u32,
#[serde(default, rename = "datFileSize", with = "string_or_i64")]
pub dat_file_size: i64,
#[serde(default, rename = "expireAtSec", with = "string_or_u64")]
pub expire_at_sec: u64,
#[serde(default, rename = "readOnly")]
pub read_only: bool,
}
impl VifVolumeInfo {
/// Convert from protobuf VolumeInfo to the serde-compatible struct.
pub fn from_pb(pb: &PbVolumeInfo) -> Self {
VifVolumeInfo {
files: pb
.files
.iter()
.map(|f| VifRemoteFile {
backend_type: f.backend_type.clone(),
backend_id: f.backend_id.clone(),
key: f.key.clone(),
offset: f.offset,
file_size: f.file_size,
modified_time: f.modified_time,
extension: f.extension.clone(),
})
.collect(),
version: pb.version,
replication: pb.replication.clone(),
bytes_offset: pb.bytes_offset,
dat_file_size: pb.dat_file_size,
expire_at_sec: pb.expire_at_sec,
read_only: pb.read_only,
}
}
/// Convert to protobuf VolumeInfo.
pub fn to_pb(&self) -> PbVolumeInfo {
PbVolumeInfo {
files: self
.files
.iter()
.map(|f| PbRemoteFile {
backend_type: f.backend_type.clone(),
backend_id: f.backend_id.clone(),
key: f.key.clone(),
offset: f.offset,
file_size: f.file_size,
modified_time: f.modified_time,
extension: f.extension.clone(),
})
.collect(),
version: self.version,
replication: self.replication.clone(),
bytes_offset: self.bytes_offset,
dat_file_size: self.dat_file_size,
expire_at_sec: self.expire_at_sec,
read_only: self.read_only,
ec_shard_config: None,
}
}
}
// ============================================================================
// Streaming read support
// ============================================================================
#[derive(Default)]
struct DataFileAccessState {
readers: usize,
writer_active: bool,
}
#[derive(Default)]
pub struct DataFileAccessControl {
state: Mutex<DataFileAccessState>,
condvar: Condvar,
}
pub struct DataFileReadLease {
control: Arc<DataFileAccessControl>,
}
pub struct DataFileWriteLease {
control: Arc<DataFileAccessControl>,
}
impl DataFileAccessControl {
pub fn read_lock(self: &Arc<Self>) -> DataFileReadLease {
let mut state = self.state.lock().unwrap();
while state.writer_active {
state = self.condvar.wait(state).unwrap();
}
state.readers += 1;
drop(state);
DataFileReadLease {
control: self.clone(),
}
}
pub fn write_lock(self: &Arc<Self>) -> DataFileWriteLease {
let mut state = self.state.lock().unwrap();
while state.writer_active || state.readers > 0 {
state = self.condvar.wait(state).unwrap();
}
state.writer_active = true;
drop(state);
DataFileWriteLease {
control: self.clone(),
}
}
}
impl Drop for DataFileReadLease {
fn drop(&mut self) {
let mut state = self.control.state.lock().unwrap();
state.readers -= 1;
if state.readers == 0 {
self.control.condvar.notify_all();
}
}
}
impl Drop for DataFileWriteLease {
fn drop(&mut self) {
let mut state = self.control.state.lock().unwrap();
state.writer_active = false;
self.control.condvar.notify_all();
}
}
/// Information needed to stream needle data directly from the dat file
/// without loading the entire payload into memory.
pub struct NeedleStreamInfo {
/// Cloned file handle for the dat file.
pub dat_file: File,
/// Absolute byte offset within the dat file where needle data starts.
pub data_file_offset: u64,
/// Size of the data payload in bytes.
pub data_size: u32,
/// Per-volume file access lock used to match Go's slow-read behavior.
pub data_file_access_control: Arc<DataFileAccessControl>,
}
// ============================================================================
// Volume
// ============================================================================
pub struct Volume {
pub id: VolumeId,
dir: String,
dir_idx: String,
pub collection: String,
dat_file: Option<File>,
nm: Option<NeedleMap>,
needle_map_kind: NeedleMapKind,
data_file_access_control: Arc<DataFileAccessControl>,
pub super_block: SuperBlock,
no_write_or_delete: bool,
no_write_can_delete: bool,
last_modified_ts_seconds: u64,
last_append_at_ns: u64,
last_compact_index_offset: u64,
last_compact_revision: u16,
is_compacting: bool,
/// Compaction speed limit in bytes per second (0 = unlimited).
pub compaction_byte_per_second: i64,
_last_io_error: Option<io::Error>,
/// Protobuf VolumeInfo for tiered storage (.vif file).
pub volume_info: PbVolumeInfo,
/// Whether this volume has a remote file reference.
pub has_remote_file: bool,
}
/// Windows helper: loop seek_read until buffer is fully filled.
#[cfg(windows)]
fn read_exact_at(file: &File, buf: &mut [u8], mut offset: u64) -> io::Result<()> {
use std::os::windows::fs::FileExt;
let mut filled = 0;
while filled < buf.len() {
let n = file.seek_read(&mut buf[filled..], offset)?;
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF in seek_read",
));
}
filled += n;
offset += n as u64;
}
Ok(())
}
impl Volume {
/// Create and load a volume from disk.
pub fn new(
dirname: &str,
dir_idx: &str,
collection: &str,
id: VolumeId,
needle_map_kind: NeedleMapKind,
replica_placement: Option<ReplicaPlacement>,
ttl: Option<crate::storage::needle::ttl::TTL>,
preallocate: u64,
version: Version,
) -> Result<Self, VolumeError> {
let mut v = Volume {
id,
dir: dirname.to_string(),
dir_idx: dir_idx.to_string(),
collection: collection.to_string(),
dat_file: None,
nm: None,
needle_map_kind,
data_file_access_control: Arc::new(DataFileAccessControl::default()),
super_block: SuperBlock {
replica_placement: replica_placement.unwrap_or_default(),
ttl: ttl.unwrap_or(crate::storage::needle::ttl::TTL::EMPTY),
..SuperBlock::default()
},
no_write_or_delete: false,
no_write_can_delete: false,
last_modified_ts_seconds: 0,
last_append_at_ns: 0,
last_compact_index_offset: 0,
last_compact_revision: 0,
is_compacting: false,
compaction_byte_per_second: 0,
_last_io_error: None,
volume_info: PbVolumeInfo::default(),
has_remote_file: false,
};
v.load(true, true, preallocate, version)?;
Ok(v)
}
// ---- File naming (matching Go) ----
/// Base filename: dir/collection_id or dir/id
pub fn data_file_name(&self) -> String {
volume_file_name(&self.dir, &self.collection, self.id)
}
pub fn index_file_name(&self) -> String {
volume_file_name(&self.dir_idx, &self.collection, self.id)
}
pub fn file_name(&self, ext: &str) -> String {
match ext {
".idx" | ".cpx" | ".ldb" | ".cpldb" | ".rdb" => {
format!("{}{}", self.index_file_name(), ext)
}
_ => {
format!("{}{}", self.data_file_name(), ext)
}
}
}
pub fn version(&self) -> Version {
self.super_block.version
}
// ---- Loading ----
fn load(
&mut self,
also_load_index: bool,
create_dat_if_missing: bool,
preallocate: u64,
version: Version,
) -> Result<(), VolumeError> {
let dat_path = self.file_name(".dat");
let mut already_has_super_block = false;
if Path::new(&dat_path).exists() {
let metadata = fs::metadata(&dat_path)?;
// Try to open read-write; fall back to read-only
match OpenOptions::new().read(true).write(true).open(&dat_path) {
Ok(file) => {
self.dat_file = Some(file);
}
Err(e) if e.kind() == io::ErrorKind::PermissionDenied => {
self.dat_file = Some(File::open(&dat_path)?);
self.no_write_or_delete = true;
}
Err(e) => return Err(e.into()),
}
self.last_modified_ts_seconds = metadata
.modified()
.unwrap_or(SystemTime::UNIX_EPOCH)
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
if metadata.len() >= SUPER_BLOCK_SIZE as u64 {
already_has_super_block = true;
}
} else if create_dat_if_missing {
// Create directory if needed
if let Some(parent) = Path::new(&dat_path).parent() {
fs::create_dir_all(parent)?;
}
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&dat_path)?;
if preallocate > 0 {
file.set_len(preallocate)?;
file.set_len(0)?; // truncate back — the preallocate is just a hint
}
self.dat_file = Some(file);
} else {
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::NotFound,
format!("volume data file {} does not exist", dat_path),
)));
}
if already_has_super_block {
self.read_super_block()?;
if !self.super_block.version.is_supported() {
return Err(VolumeError::UnsupportedVersion(self.super_block.version.0));
}
} else {
self.maybe_write_super_block(version)?;
}
if also_load_index {
self.load_index()?;
}
self.load_vif();
Ok(())
}
fn load_index(&mut self) -> Result<(), VolumeError> {
let use_redb = matches!(
self.needle_map_kind,
NeedleMapKind::LevelDb | NeedleMapKind::LevelDbMedium | NeedleMapKind::LevelDbLarge
);
let idx_path = self.file_name(".idx");
// Ensure idx directory exists
if let Some(parent) = Path::new(&idx_path).parent() {
fs::create_dir_all(parent)?;
}
if use_redb {
self.load_index_redb(&idx_path)?;
} else {
self.load_index_inmemory(&idx_path)?;
}
Ok(())
}
/// Load index using in-memory CompactNeedleMap.
fn load_index_inmemory(&mut self, idx_path: &str) -> Result<(), VolumeError> {
if self.no_write_or_delete {
// Open read-only
if Path::new(&idx_path).exists() {
let mut idx_file = File::open(&idx_path)?;
let nm = CompactNeedleMap::load_from_idx(&mut idx_file)?;
self.nm = Some(NeedleMap::InMemory(nm));
} else {
// Missing .idx with existing .dat could orphan needles
let dat_path = self.file_name(".dat");
if Path::new(&dat_path).exists() {
let dat_size = fs::metadata(&dat_path).map(|m| m.len()).unwrap_or(0);
if dat_size > SUPER_BLOCK_SIZE as u64 {
warn!(
volume_id = self.id.0,
".idx file missing but .dat exists with data; needles may be orphaned"
);
}
}
self.nm = Some(NeedleMap::InMemory(CompactNeedleMap::new()));
}
} else {
// Open read-write (create if missing)
let idx_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&idx_path)?;
let idx_size = idx_file.metadata()?.len();
let mut idx_reader = io::BufReader::new(&idx_file);
let mut nm = CompactNeedleMap::load_from_idx(&mut idx_reader)?;
// Re-open for append-only writes
let write_file = OpenOptions::new()
.write(true)
.append(true)
.open(&idx_path)?;
nm.set_idx_file(Box::new(write_file), idx_size);
self.nm = Some(NeedleMap::InMemory(nm));
}
Ok(())
}
/// Load index using disk-backed RedbNeedleMap.
fn load_index_redb(&mut self, idx_path: &str) -> Result<(), VolumeError> {
// The redb database file is stored alongside the volume files
let rdb_path = self.file_name(".rdb");
if self.no_write_or_delete {
// Open read-only
if Path::new(&idx_path).exists() {
let mut idx_file = File::open(&idx_path)?;
let nm = RedbNeedleMap::load_from_idx(&rdb_path, &mut idx_file)?;
self.nm = Some(NeedleMap::Redb(nm));
} else {
// Missing .idx with existing .dat could orphan needles
let dat_path = self.file_name(".dat");
if Path::new(&dat_path).exists() {
let dat_size = fs::metadata(&dat_path).map(|m| m.len()).unwrap_or(0);
if dat_size > SUPER_BLOCK_SIZE as u64 {
warn!(
volume_id = self.id.0,
".idx file missing but .dat exists with data; needles may be orphaned"
);
}
}
self.nm = Some(NeedleMap::Redb(RedbNeedleMap::new(&rdb_path)?));
}
} else {
// Open read-write (create if missing)
let idx_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&idx_path)?;
let idx_size = idx_file.metadata()?.len();
let mut idx_reader = io::BufReader::new(&idx_file);
let mut nm = RedbNeedleMap::load_from_idx(&rdb_path, &mut idx_reader)?;
// Re-open for append-only writes
let write_file = OpenOptions::new()
.write(true)
.append(true)
.open(&idx_path)?;
nm.set_idx_file(Box::new(write_file), idx_size);
self.nm = Some(NeedleMap::Redb(nm));
}
Ok(())
}
// ---- SuperBlock I/O ----
fn read_super_block(&mut self) -> Result<(), VolumeError> {
let dat_file = self.dat_file.as_mut().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
dat_file.seek(SeekFrom::Start(0))?;
let mut header = [0u8; SUPER_BLOCK_SIZE];
dat_file.read_exact(&mut header)?;
let extra_size = u16::from_be_bytes([header[6], header[7]]);
let total_size = SUPER_BLOCK_SIZE + extra_size as usize;
let mut full_buf = vec![0u8; total_size];
full_buf[..SUPER_BLOCK_SIZE].copy_from_slice(&header);
if extra_size > 0 {
dat_file.read_exact(&mut full_buf[SUPER_BLOCK_SIZE..])?;
}
self.super_block = SuperBlock::from_bytes(&full_buf)?;
Ok(())
}
fn maybe_write_super_block(&mut self, version: Version) -> Result<(), VolumeError> {
let dat_file = self.dat_file.as_mut().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
let dat_size = dat_file.metadata()?.len();
if dat_size == 0 {
if !version.is_supported() {
return Err(VolumeError::UnsupportedVersion(version.0));
}
self.super_block.version = version;
let bytes = self.super_block.to_bytes();
dat_file.seek(SeekFrom::Start(0))?;
dat_file.write_all(&bytes)?;
dat_file.sync_all()?;
}
Ok(())
}
// ---- Read ----
/// Read a needle by its ID from the volume.
pub fn read_needle(&self, n: &mut Needle) -> Result<i32, VolumeError> {
self.read_needle_opt(n, false)
}
pub fn read_needle_opt(&self, n: &mut Needle, read_deleted: bool) -> Result<i32, VolumeError> {
let _guard = self.data_file_access_control.read_lock();
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?;
if nv.offset.is_zero() {
return Err(VolumeError::NotFound);
}
let mut read_size = nv.size;
if read_size.is_deleted() {
if read_deleted && !read_size.is_tombstone() {
// Negate to get original size
read_size = Size(-read_size.0);
} else {
return Err(VolumeError::Deleted);
}
}
if read_size.0 == 0 {
return Ok(0);
}
self.read_needle_data_at_unlocked(n, nv.offset.to_actual_offset(), read_size)?;
// TTL expiry check
if n.has_ttl() {
if let Some(ref ttl) = n.ttl {
let ttl_minutes = ttl.minutes();
if ttl_minutes > 0 && n.has_last_modified_date() && n.append_at_ns > 0 {
let expire_at_ns = n.append_at_ns + (ttl_minutes as u64) * 60 * 1_000_000_000;
let now_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
if now_ns >= expire_at_ns {
return Err(VolumeError::NotFound);
}
}
}
}
Ok(n.data_size as i32)
}
/// Read needle data from .dat file at given offset.
pub fn read_needle_data_at(
&self,
n: &mut Needle,
offset: i64,
size: Size,
) -> Result<(), VolumeError> {
let _guard = self.data_file_access_control.read_lock();
self.read_needle_data_at_unlocked(n, offset, size)
}
fn read_needle_data_at_unlocked(
&self,
n: &mut Needle,
offset: i64,
size: Size,
) -> Result<(), VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
let version = self.version();
let actual_size = get_actual_size(size, version);
// Use pread (read_at) to avoid seeking with shared reference
let mut buf = vec![0u8; actual_size as usize];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut buf, offset as u64)?;
}
#[cfg(windows)]
{
read_exact_at(dat_file, &mut buf, offset as u64)?;
}
#[cfg(not(any(unix, windows)))]
{
compile_error!("Platform not supported: only unix and windows are supported");
}
n.read_bytes(&mut buf, offset, size, version)?;
Ok(())
}
/// Read raw needle blob at a specific offset.
pub fn read_needle_blob(&self, offset: i64, size: Size) -> Result<Vec<u8>, VolumeError> {
let _guard = self.data_file_access_control.read_lock();
self.read_needle_blob_unlocked(offset, size)
}
fn read_needle_blob_unlocked(&self, offset: i64, size: Size) -> Result<Vec<u8>, VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
let version = self.version();
let actual_size = get_actual_size(size, version);
let mut buf = vec![0u8; actual_size as usize];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut buf, offset as u64)?;
}
#[cfg(windows)]
{
read_exact_at(dat_file, &mut buf, offset as u64)?;
}
Ok(buf)
}
/// Read needle metadata (header + flags/name/mime/etc) without loading the data payload,
/// and return a `NeedleStreamInfo` that can be used to stream data directly from the dat file.
///
/// This is used for large needles to avoid loading the entire payload into memory.
pub fn read_needle_stream_info(
&self,
n: &mut Needle,
read_deleted: bool,
) -> Result<NeedleStreamInfo, VolumeError> {
let _guard = self.data_file_access_control.read_lock();
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?;
if nv.offset.is_zero() {
return Err(VolumeError::NotFound);
}
let mut read_size = nv.size;
if read_size.is_deleted() {
if read_deleted && !read_size.is_tombstone() {
read_size = Size(-read_size.0);
} else {
return Err(VolumeError::Deleted);
}
}
if read_size.0 == 0 {
return Err(VolumeError::NotFound);
}
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
let offset = nv.offset.to_actual_offset();
let version = self.version();
let actual_size = get_actual_size(read_size, version);
// Read the full needle bytes (including data) for metadata parsing.
// We use read_bytes_meta_only which skips copying the data payload.
let mut buf = vec![0u8; actual_size as usize];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut buf, offset as u64)?;
}
#[cfg(windows)]
{
read_exact_at(dat_file, &mut buf, offset as u64)?;
}
n.read_bytes_meta_only(&mut buf, offset, read_size, version)?;
// TTL expiry check
if n.has_ttl() {
if let Some(ref ttl) = n.ttl {
let ttl_minutes = ttl.minutes();
if ttl_minutes > 0 && n.has_last_modified_date() && n.append_at_ns > 0 {
let expire_at_ns = n.append_at_ns + (ttl_minutes as u64) * 60 * 1_000_000_000;
let now_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
if now_ns >= expire_at_ns {
return Err(VolumeError::NotFound);
}
}
}
}
// For V1, data starts right after the header
// For V2/V3, data starts at header + 4 (DataSize field)
let data_file_offset = if version == VERSION_1 {
offset as u64 + NEEDLE_HEADER_SIZE as u64
} else {
offset as u64 + NEEDLE_HEADER_SIZE as u64 + 4 // skip DataSize (4 bytes)
};
let cloned_file = dat_file.try_clone().map_err(VolumeError::Io)?;
Ok(NeedleStreamInfo {
dat_file: cloned_file,
data_file_offset,
data_size: n.data_size,
data_file_access_control: self.data_file_access_control.clone(),
})
}
// ---- Write ----
/// Write a needle to the volume (synchronous path).
pub fn write_needle(
&mut self,
n: &mut Needle,
check_cookie: bool,
) -> Result<(u64, Size, bool), VolumeError> {
let _guard = self.data_file_access_control.write_lock();
if self.no_write_or_delete {
return Err(VolumeError::ReadOnly);
}
self.do_write_request(n, check_cookie)
}
fn do_write_request(
&mut self,
n: &mut Needle,
check_cookie: bool,
) -> Result<(u64, Size, bool), VolumeError> {
// Ensure checksum is computed before dedup check
if n.checksum == crate::storage::needle::crc::CRC(0) && !n.data.is_empty() {
n.checksum = crate::storage::needle::crc::CRC::new(&n.data);
}
// Dedup check
if self.is_file_unchanged(n) {
return Ok((0, Size(n.data_size as i32), true));
}
// Cookie validation for existing needle
if let Some(nm) = &self.nm {
if let Some(nv) = nm.get(n.id) {
if !nv.offset.is_zero() && nv.size.is_valid() {
let mut existing = Needle::default();
// Read only the header to check cookie
self.read_needle_header_unlocked(&mut existing, nv.offset.to_actual_offset())?;
if n.cookie.0 == 0 && !check_cookie {
n.cookie = existing.cookie;
}
if existing.cookie != n.cookie {
return Err(VolumeError::CookieMismatch(n.cookie.0));
}
}
}
}
// Update append timestamp
n.append_at_ns = get_append_at_ns(self.last_append_at_ns);
// Append to .dat file
let (offset, size, _actual_size) = self.append_needle(n)?;
self.last_append_at_ns = n.append_at_ns;
// Update needle map
let should_update = if let Some(nm) = &self.nm {
match nm.get(n.id) {
Some(nv) => (nv.offset.to_actual_offset() as u64) < offset,
None => true,
}
} else {
true
};
if should_update {
if let Some(nm) = &mut self.nm {
nm.put(n.id, Offset::from_actual_offset(offset as i64), n.size)?;
}
}
if self.last_modified_ts_seconds < n.last_modified {
self.last_modified_ts_seconds = n.last_modified;
}
Ok((offset, size, false))
}
fn read_needle_header_unlocked(&self, n: &mut Needle, offset: i64) -> Result<(), VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
let mut header = [0u8; NEEDLE_HEADER_SIZE];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut header, offset as u64)?;
}
#[cfg(windows)]
{
read_exact_at(dat_file, &mut header, offset as u64)?;
}
n.read_header(&header);
Ok(())
}
fn is_file_unchanged(&self, n: &Needle) -> bool {
// Don't dedup for volumes with TTL
if self.super_block.ttl != crate::storage::needle::ttl::TTL::EMPTY {
return false;
}
if let Some(nm) = &self.nm {
if let Some(nv) = nm.get(n.id) {
if !nv.offset.is_zero() && nv.size.is_valid() {
let mut old = Needle::default();
if self
.read_needle_data_at_unlocked(
&mut old,
nv.offset.to_actual_offset(),
nv.size,
)
.is_ok()
{
if old.cookie == n.cookie
&& old.checksum == n.checksum
&& old.data == n.data
{
return true;
}
}
}
}
}
false
}
/// Append a needle to the .dat file. Returns (offset, size, actual_size).
fn append_needle(&mut self, n: &mut Needle) -> Result<(u64, Size, i64), VolumeError> {
let version = self.version();
let bytes = n.write_bytes(version);
let actual_size = bytes.len() as i64;
let dat_file = self.dat_file.as_mut().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
let offset = dat_file.seek(SeekFrom::End(0))?;
dat_file.write_all(&bytes)?;
Ok((offset, n.size, actual_size))
}
// ---- Delete ----
/// Delete a needle from the volume.
pub fn delete_needle(&mut self, n: &mut Needle) -> Result<Size, VolumeError> {
let _guard = self.data_file_access_control.write_lock();
if self.no_write_or_delete {
return Err(VolumeError::ReadOnly);
}
self.do_delete_request(n)
}
fn do_delete_request(&mut self, n: &mut Needle) -> Result<Size, VolumeError> {
let (found, size, _stored_offset) = if let Some(nm) = &self.nm {
if let Some(nv) = nm.get(n.id) {
if !nv.size.is_deleted() {
(true, nv.size, nv.offset)
} else {
(false, Size(0), Offset::default())
}
} else {
(false, Size(0), Offset::default())
}
} else {
return Ok(Size(0));
};
if !found {
return Ok(Size(0));
}
// Write tombstone: append needle with empty data
n.data = vec![];
n.append_at_ns = get_append_at_ns(self.last_append_at_ns);
let (offset, _, _) = self.append_needle(n)?;
self.last_append_at_ns = n.append_at_ns;
// Update index
if let Some(nm) = &mut self.nm {
nm.delete(n.id, Offset::from_actual_offset(offset as i64))?;
}
Ok(size)
}
// ---- Metrics ----
pub fn content_size(&self) -> u64 {
self.nm.as_ref().map_or(0, |nm| nm.content_size())
}
pub fn deleted_size(&self) -> u64 {
self.nm.as_ref().map_or(0, |nm| nm.deleted_size())
}
pub fn file_count(&self) -> i64 {
self.nm.as_ref().map_or(0, |nm| nm.file_count())
}
pub fn deleted_count(&self) -> i64 {
self.nm.as_ref().map_or(0, |nm| nm.deleted_count())
}
pub fn max_file_key(&self) -> NeedleId {
self.nm.as_ref().map_or(NeedleId(0), |nm| nm.max_file_key())
}
pub fn is_read_only(&self) -> bool {
self.no_write_or_delete || self.no_write_can_delete
}
pub fn last_compact_revision(&self) -> u16 {
self.last_compact_revision
}
pub fn last_modified_ts(&self) -> u64 {
self.last_modified_ts_seconds
}
/// Read all live needles from the volume (for ReadAllNeedles streaming RPC).
pub fn read_all_needles(&self) -> Result<Vec<Needle>, VolumeError> {
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
let mut needles = Vec::new();
for (key, nv) in nm.iter_entries() {
if !nv.size.is_valid() {
continue; // skip deleted
}
let mut n = Needle {
id: key,
..Needle::default()
};
if let Ok(()) = self.read_needle_data_at(&mut n, nv.offset.to_actual_offset(), nv.size)
{
needles.push(n);
}
}
Ok(needles)
}
/// Scrub the volume by reading and verifying all needles.
/// Returns (files_checked, broken_needles) tuple.
/// Each needle is read from disk and its CRC checksum is verified.
pub fn scrub(&self) -> Result<(u64, Vec<String>), VolumeError> {
let _dat_file = self.dat_file.as_ref().ok_or(VolumeError::NotFound)?;
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
let dat_size = self.dat_file_size().map_err(|e| VolumeError::Io(e))?;
let mut files_checked: u64 = 0;
let mut broken = Vec::new();
for (needle_id, nv) in nm.iter_entries() {
if nv.offset.is_zero() || nv.size.is_deleted() {
continue;
}
let offset = nv.offset.to_actual_offset();
if offset < 0 || offset as u64 >= dat_size {
broken.push(format!(
"needle {} offset {} out of range (dat_size={})",
needle_id.0, offset, dat_size
));
continue;
}
// Read and verify the needle (read_needle_data_at checks CRC via read_bytes/read_tail)
let mut n = Needle {
id: needle_id,
..Needle::default()
};
match self.read_needle_data_at(&mut n, offset, nv.size) {
Ok(_) => {}
Err(e) => {
broken.push(format!("needle {} error: {}", needle_id.0, e));
}
}
files_checked += 1;
}
Ok((files_checked, broken))
}
/// Scan raw needle entries from the .dat file starting at `from_offset`.
/// Returns (needle_header_bytes, needle_body_bytes, append_at_ns) for each needle.
/// Used by VolumeTailSender to stream raw bytes.
pub fn scan_raw_needles_from(
&self,
from_offset: u64,
) -> Result<Vec<(Vec<u8>, Vec<u8>, u64)>, VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or(VolumeError::NotFound)?;
let version = self.super_block.version;
let dat_size = dat_file.metadata()?.len();
let mut entries = Vec::new();
let mut offset = from_offset;
let mut dat = dat_file.try_clone()?;
while offset < dat_size {
// Read needle header (16 bytes)
let mut header = [0u8; NEEDLE_HEADER_SIZE];
dat.seek(SeekFrom::Start(offset))?;
match dat.read_exact(&mut header) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
}
let (_cookie, _id, size) = Needle::parse_header(&header);
if size.0 == 0 && _id.is_empty() {
break;
}
let body_length = needle::needle_body_length(size, version);
let total_size = NEEDLE_HEADER_SIZE as u64 + body_length as u64;
if size.is_deleted() || size.0 <= 0 {
offset += total_size;
continue;
}
// Read body bytes
let mut body = vec![0u8; body_length as usize];
dat.seek(SeekFrom::Start(offset + NEEDLE_HEADER_SIZE as u64))?;
match dat.read_exact(&mut body) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
}
// Parse the needle to get append_at_ns
let mut full = vec![0u8; total_size as usize];
full[..NEEDLE_HEADER_SIZE].copy_from_slice(&header);
full[NEEDLE_HEADER_SIZE..].copy_from_slice(&body);
let mut n = Needle::default();
let _ = n.read_bytes(&full, offset as i64, size, version);
entries.push((header.to_vec(), body, n.append_at_ns));
offset += total_size;
}
Ok(entries)
}
/// Insert or update a needle index entry (for low-level blob writes).
pub fn put_needle_index(
&mut self,
key: NeedleId,
offset: Offset,
size: Size,
) -> Result<(), VolumeError> {
if let Some(ref mut nm) = self.nm {
nm.put(key, offset, size).map_err(VolumeError::Io)?;
}
Ok(())
}
/// Mark this volume as read-only (no writes or deletes).
pub fn set_read_only(&mut self) {
self.no_write_or_delete = true;
self.save_vif();
}
/// Mark this volume as writable (allow writes and deletes).
pub fn set_writable(&mut self) {
self.no_write_or_delete = false;
self.no_write_can_delete = false;
self.save_vif();
}
/// Path to .vif file.
fn vif_path(&self) -> String {
format!("{}/{}.vif", self.dir, self.id.0)
}
/// Load volume info from .vif file.
/// Supports both the protobuf-JSON format (Go-compatible) and legacy JSON.
fn load_vif(&mut self) {
let path = self.vif_path();
if let Ok(content) = fs::read_to_string(&path) {
if content.trim().is_empty() {
return;
}
// Try protobuf-JSON (Go-compatible VolumeInfo via VifVolumeInfo)
if let Ok(vif_info) = serde_json::from_str::<VifVolumeInfo>(&content) {
let pb_info = vif_info.to_pb();
if pb_info.read_only {
self.no_write_or_delete = true;
}
self.has_remote_file = !pb_info.files.is_empty();
self.volume_info = pb_info;
return;
}
// Fall back to legacy format
if let Ok(info) = serde_json::from_str::<VolumeInfo>(&content) {
if info.read_only {
self.no_write_or_delete = true;
}
}
}
}
/// Save volume info to .vif file in protobuf-JSON format (Go-compatible).
fn save_vif(&self) {
let mut vif = VifVolumeInfo::from_pb(&self.volume_info);
vif.read_only = self.no_write_or_delete;
if let Ok(content) = serde_json::to_string_pretty(&vif) {
let _ = fs::write(&self.vif_path(), content);
}
}
/// Save full VolumeInfo to .vif file (for tiered storage).
pub fn save_volume_info(&mut self) -> Result<(), VolumeError> {
self.volume_info.read_only = self.no_write_or_delete;
let vif = VifVolumeInfo::from_pb(&self.volume_info);
let content = serde_json::to_string_pretty(&vif)
.map_err(|e| VolumeError::Io(io::Error::new(io::ErrorKind::Other, e.to_string())))?;
fs::write(&self.vif_path(), content)?;
Ok(())
}
/// Get the remote storage backend name and key from this volume .vif.
pub fn remote_storage_name_key(&self) -> (String, String) {
if self.volume_info.files.is_empty() {
return (String::new(), String::new());
}
let rf = &self.volume_info.files[0];
let backend_name = if rf.backend_id.is_empty() {
rf.backend_type.clone()
} else {
format!("{}.{}", rf.backend_type, rf.backend_id)
};
(backend_name, rf.key.clone())
}
/// Get the dat file path for this volume.
pub fn dat_path(&self) -> String {
self.file_name(".dat")
}
/// Get the directory this volume is stored in.
pub fn dir(&self) -> &str {
&self.dir
}
/// Throttle IO during compaction to avoid saturating disk.
pub fn maybe_throttle_compaction(&self, bytes_written: u64) {
if self.compaction_byte_per_second <= 0 || !self.is_compacting {
return;
}
// Simple throttle: sleep based on bytes written vs allowed rate
let sleep_us =
(bytes_written as f64 / self.compaction_byte_per_second as f64 * 1_000_000.0) as u64;
if sleep_us > 0 {
std::thread::sleep(std::time::Duration::from_micros(sleep_us));
}
}
/// Change the replication placement and rewrite the super block.
pub fn set_replica_placement(&mut self, rp: ReplicaPlacement) -> Result<(), VolumeError> {
self.super_block.replica_placement = rp;
let bytes = self.super_block.to_bytes();
let dat_file = self.dat_file.as_mut().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
dat_file.seek(SeekFrom::Start(0))?;
dat_file.write_all(&bytes)?;
dat_file.sync_all()?;
Ok(())
}
// ---- Binary search for incremental copy ----
/// Read a single index entry's offset from the .idx file by entry index.
fn read_offset_from_index(&self, m: i64) -> Result<Offset, VolumeError> {
let idx_path = self.file_name(".idx");
let idx_file = File::open(&idx_path)?;
let mut buf = [0u8; NEEDLE_MAP_ENTRY_SIZE];
let file_offset = m as u64 * NEEDLE_MAP_ENTRY_SIZE as u64;
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
idx_file.read_exact_at(&mut buf, file_offset)?;
}
#[cfg(not(unix))]
{
let mut f = idx_file;
f.seek(SeekFrom::Start(file_offset))?;
std::io::Read::read_exact(&mut f, &mut buf)?;
}
let (_key, offset, _size) = idx_entry_from_bytes(&buf);
Ok(offset)
}
/// Read the append_at_ns timestamp from a needle at the given offset in the .dat file.
fn read_append_at_ns(&self, offset: Offset) -> Result<u64, VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
let actual_offset = offset.to_actual_offset() as u64;
let version = self.version();
let mut header_buf = [0u8; NEEDLE_HEADER_SIZE];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut header_buf, actual_offset)?;
}
#[cfg(not(unix))]
{
read_exact_at(dat_file, &mut header_buf, actual_offset)?;
}
let (_cookie, _id, size) = Needle::parse_header(&header_buf);
if size.0 <= 0 {
return Ok(0);
}
let actual_size = get_actual_size(size, version);
let mut buf = vec![0u8; actual_size as usize];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut buf, actual_offset)?;
}
#[cfg(not(unix))]
{
read_exact_at(dat_file, &mut buf, actual_offset)?;
}
let mut n = Needle::default();
n.read_bytes_meta_only(&mut buf, offset.to_actual_offset(), size, version)?;
Ok(n.append_at_ns)
}
/// Search right from position m to find the first non-deleted entry.
fn read_right_ns(&self, m: i64, max: i64) -> Result<(i64, Offset, u64), VolumeError> {
let mut index = m;
loop {
index += 1;
if index >= max {
return Ok((index, Offset::default(), 0));
}
let offset = self.read_offset_from_index(index)?;
if !offset.is_zero() {
let ts = self.read_append_at_ns(offset)?;
return Ok((index, offset, ts));
}
}
}
/// Search left from position m to find the first non-deleted entry.
fn read_left_ns(&self, m: i64) -> Result<(i64, Offset, u64), VolumeError> {
let mut index = m;
loop {
index -= 1;
if index < 0 {
return Ok((index, Offset::default(), 0));
}
let offset = self.read_offset_from_index(index)?;
if !offset.is_zero() {
let ts = self.read_append_at_ns(offset)?;
return Ok((index, offset, ts));
}
}
}
/// Binary search through the .idx file to find the first needle
/// with append_at_ns > since_ns. Returns (offset, is_last).
/// Matches Go's BinarySearchByAppendAtNs in volume_backup.go.
pub fn binary_search_by_append_at_ns(
&self,
since_ns: u64,
) -> Result<(Offset, bool), VolumeError> {
let file_size = self.idx_file_size() as i64;
if file_size % NEEDLE_MAP_ENTRY_SIZE as i64 != 0 {
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::InvalidData,
format!("unexpected idx file size: {}", file_size),
)));
}
let entry_count = file_size / NEEDLE_MAP_ENTRY_SIZE as i64;
let mut l: i64 = 0;
let mut h: i64 = entry_count;
while l < h {
let m = (l + h) / 2;
if m == entry_count {
return Ok((Offset::default(), true));
}
let offset = self.read_offset_from_index(m)?;
if offset.is_zero() {
let (left_index, _left_offset, left_ns) = self.read_left_ns(m)?;
let (right_index, right_offset, right_ns) = self.read_right_ns(m, entry_count)?;
if right_ns <= since_ns {
l = right_index;
if l == entry_count {
return Ok((Offset::default(), true));
} else {
continue;
}
}
if since_ns < left_ns {
h = left_index + 1;
continue;
}
return Ok((right_offset, false));
}
let m_ns = self.read_append_at_ns(offset)?;
if m_ns <= since_ns {
l = m + 1;
} else {
h = m;
}
}
if l == entry_count {
return Ok((Offset::default(), true));
}
let offset = self.read_offset_from_index(l)?;
Ok((offset, false))
}
/// Write a raw needle blob at a specific offset in the .dat file.
pub fn write_needle_blob(
&mut self,
offset: i64,
needle_blob: &[u8],
) -> Result<(), VolumeError> {
if self.no_write_or_delete {
return Err(VolumeError::ReadOnly);
}
let dat_file = self.dat_file.as_mut().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
dat_file.seek(SeekFrom::Start(offset as u64))?;
dat_file.write_all(needle_blob)?;
Ok(())
}
pub fn needs_replication(&self) -> bool {
self.super_block.replica_placement.get_copy_count() > 1
}
/// Garbage ratio: deleted_size / (content_size + deleted_size)
pub fn garbage_level(&self) -> f64 {
let content = self.content_size();
let deleted = self.deleted_size();
let total = content + deleted;
if total == 0 {
return 0.0;
}
deleted as f64 / total as f64
}
pub fn dat_file_size(&self) -> io::Result<u64> {
if let Some(ref f) = self.dat_file {
Ok(f.metadata()?.len())
} else {
Ok(0)
}
}
/// Get the modification time of the .dat file as Unix seconds.
pub fn dat_file_mod_time(&self) -> u64 {
self.dat_file
.as_ref()
.and_then(|f| f.metadata().ok())
.and_then(|m| m.modified().ok())
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_secs())
.unwrap_or(0)
}
pub fn idx_file_size(&self) -> u64 {
self.nm.as_ref().map_or(0, |nm| nm.index_file_size())
}
// ---- Compaction / Vacuum ----
/// Compact the volume by copying only live needles to new .cpd/.cpx files.
/// This reads from the current .dat/.idx and writes to .cpd/.cpx.
/// Call `commit_compact()` after to swap the files.
pub fn compact_by_index<F>(
&mut self,
_preallocate: u64,
_max_bytes_per_second: i64,
progress_fn: F,
) -> Result<(), VolumeError>
where
F: Fn(i64) -> bool,
{
if self.is_compacting {
return Ok(()); // already compacting
}
self.is_compacting = true;
let result = self.do_compact_by_index(progress_fn);
self.is_compacting = false;
result
}
fn do_compact_by_index<F>(&mut self, progress_fn: F) -> Result<(), VolumeError>
where
F: Fn(i64) -> bool,
{
// Record state before compaction for makeupDiff
self.last_compact_index_offset = self.nm.as_ref().map_or(0, |nm| nm.index_file_size());
self.last_compact_revision = self.super_block.compaction_revision;
// Sync current data
self.sync_to_disk()?;
let cpd_path = self.file_name(".cpd");
let cpx_path = self.file_name(".cpx");
let version = self.version();
// Write new super block with incremented compaction revision
let mut new_sb = self.super_block.clone();
new_sb.compaction_revision += 1;
let sb_bytes = new_sb.to_bytes();
let mut dst = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&cpd_path)?;
dst.write_all(&sb_bytes)?;
let mut new_offset = sb_bytes.len() as i64;
// Build new index in memory
let mut new_nm = CompactNeedleMap::new();
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
// Collect live entries from needle map (sorted ascending)
let nm = self.nm.as_ref().ok_or(VolumeError::NotInitialized)?;
let mut entries: Vec<(NeedleId, Offset, Size)> = Vec::new();
for (id, nv) in nm.iter_entries() {
if nv.offset.is_zero() || nv.size.is_deleted() {
continue;
}
entries.push((id, nv.offset, nv.size));
}
entries.sort_by_key(|(_, offset, _)| *offset);
for (id, offset, size) in entries {
// Progress callback
if !progress_fn(offset.to_actual_offset()) {
// Interrupted
let _ = fs::remove_file(&cpd_path);
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::Interrupted,
"compaction interrupted",
)));
}
// Read needle from source
let mut n = Needle {
id,
..Needle::default()
};
self.read_needle_data_at(&mut n, offset.to_actual_offset(), size)?;
// Skip TTL-expired needles
if n.has_ttl() {
if let Some(ref ttl) = n.ttl {
let ttl_minutes = ttl.minutes();
if ttl_minutes > 0 && n.last_modified > 0 {
let expire_at = n.last_modified + (ttl_minutes as u64) * 60;
if now >= expire_at {
continue;
}
}
}
}
// Write needle to destination
let bytes = n.write_bytes(version);
dst.write_all(&bytes)?;
// Update new index
new_nm.put(id, Offset::from_actual_offset(new_offset), n.size)?;
new_offset += bytes.len() as i64;
}
dst.sync_all()?;
// Save new index
new_nm.save_to_idx(&cpx_path)?;
Ok(())
}
/// Commit a previously completed compaction: swap .cpd/.cpx to .dat/.idx and reload.
pub fn commit_compact(&mut self) -> Result<(), VolumeError> {
// Close current files
if let Some(ref mut nm) = self.nm {
nm.close();
}
self.nm = None;
if let Some(ref dat_file) = self.dat_file {
let _ = dat_file.sync_all();
}
self.dat_file = None;
let cpd_path = self.file_name(".cpd");
let cpx_path = self.file_name(".cpx");
let dat_path = self.file_name(".dat");
let idx_path = self.file_name(".idx");
// Check that compact files exist
if !Path::new(&cpd_path).exists() || !Path::new(&cpx_path).exists() {
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::NotFound,
"compact files (.cpd/.cpx) not found",
)));
}
// Swap files: .cpd → .dat, .cpx → .idx
fs::rename(&cpd_path, &dat_path)?;
fs::rename(&cpx_path, &idx_path)?;
// Remove any leveldb files
let ldb_path = self.file_name(".ldb");
let _ = fs::remove_dir_all(&ldb_path);
// Reload
self.load(true, false, 0, self.version())?;
Ok(())
}
/// Clean up leftover compaction files (.cpd, .cpx).
pub fn cleanup_compact(&self) -> Result<(), VolumeError> {
let cpd_path = self.file_name(".cpd");
let cpx_path = self.file_name(".cpx");
let cpldb_path = self.file_name(".cpldb");
let e1 = fs::remove_file(&cpd_path);
let e2 = fs::remove_file(&cpx_path);
let e3 = fs::remove_dir_all(&cpldb_path);
// Ignore NotFound errors
if let Err(e) = e1 {
if e.kind() != io::ErrorKind::NotFound {
return Err(e.into());
}
}
if let Err(e) = e2 {
if e.kind() != io::ErrorKind::NotFound {
return Err(e.into());
}
}
if let Err(e) = e3 {
if e.kind() != io::ErrorKind::NotFound {
return Err(e.into());
}
}
Ok(())
}
// ---- Sync / Close ----
pub fn sync_to_disk(&mut self) -> io::Result<()> {
if let Some(ref dat_file) = self.dat_file {
dat_file.sync_all()?;
}
if let Some(ref nm) = self.nm {
nm.sync()?;
}
Ok(())
}
pub fn close(&mut self) {
if let Some(ref dat_file) = self.dat_file {
let _ = dat_file.sync_all();
}
self.dat_file = None;
if let Some(ref nm) = self.nm {
let _ = nm.sync();
}
self.nm = None;
}
/// Remove all volume files from disk.
pub fn destroy(&mut self) -> Result<(), VolumeError> {
if self.is_compacting {
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::Other,
format!("volume {} is compacting", self.id),
)));
}
self.close();
remove_volume_files(&self.data_file_name());
remove_volume_files(&self.index_file_name());
Ok(())
}
#[allow(dead_code)]
fn check_read_write_error(&mut self, err: &io::Error) {
if err.raw_os_error() == Some(5) {
// EIO
self._last_io_error = Some(io::Error::new(err.kind(), err.to_string()));
}
}
}
// ============================================================================
// Helpers
// ============================================================================
/// Generate volume file base name: dir/collection_id or dir/id
pub fn volume_file_name(dir: &str, collection: &str, id: VolumeId) -> String {
if collection.is_empty() {
format!("{}/{}", dir, id.0)
} else {
format!("{}/{}_{}", dir, collection, id.0)
}
}
/// Generate a monotonically increasing append timestamp.
fn get_append_at_ns(last: u64) -> u64 {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
if now <= last {
last + 1
} else {
now
}
}
/// Remove all files associated with a volume.
fn remove_volume_files(base: &str) {
for ext in &[".dat", ".idx", ".vif", ".sdx", ".cpd", ".cpx", ".note"] {
let _ = fs::remove_file(format!("{}{}", base, ext));
}
let _ = fs::remove_dir_all(format!("{}.ldb", base));
}
// ============================================================================
// ScanVolumeFile — iterate all needles in a .dat file
// ============================================================================
/// Callback for scanning needles in a volume file.
pub trait VolumeFileVisitor {
fn visit_super_block(&mut self, sb: &SuperBlock) -> Result<(), VolumeError>;
fn read_needle_body(&self) -> bool;
fn visit_needle(&mut self, n: &Needle, offset: i64) -> Result<(), VolumeError>;
}
/// Scan all needles in a volume's .dat file.
pub fn scan_volume_file(
dat_path: &str,
visitor: &mut dyn VolumeFileVisitor,
) -> Result<(), VolumeError> {
let mut file = File::open(dat_path)?;
// Read super block
let mut sb_buf = [0u8; SUPER_BLOCK_SIZE];
file.read_exact(&mut sb_buf)?;
let sb = SuperBlock::from_bytes(&sb_buf)?;
visitor.visit_super_block(&sb)?;
let version = sb.version;
let mut offset = sb.block_size() as i64;
loop {
// Read needle header
let mut header = [0u8; NEEDLE_HEADER_SIZE];
file.seek(SeekFrom::Start(offset as u64))?;
match file.read_exact(&mut header) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
}
let (_cookie, _id, size) = Needle::parse_header(&header);
if size.0 == 0 && _id.is_empty() {
break; // end of valid data
}
let body_length = needle::needle_body_length(size, version);
let total_size = NEEDLE_HEADER_SIZE as i64 + body_length;
// Skip full body parsing for deleted needles (tombstone or negative size)
if size.is_deleted() || size.0 <= 0 {
let mut n = Needle::default();
n.read_header(&header);
visitor.visit_needle(&n, offset)?;
} else if visitor.read_needle_body() {
let mut buf = vec![0u8; total_size as usize];
file.seek(SeekFrom::Start(offset as u64))?;
file.read_exact(&mut buf)?;
let mut n = Needle::default();
n.read_bytes(&buf, offset, size, version)?;
visitor.visit_needle(&n, offset)?;
} else {
let mut n = Needle::default();
n.read_header(&header);
visitor.visit_needle(&n, offset)?;
}
offset += total_size;
}
Ok(())
}
// ============================================================================
// Tests
// ============================================================================
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::needle::crc::CRC;
use tempfile::TempDir;
fn make_test_volume(dir: &str) -> Volume {
Volume::new(
dir,
dir,
"",
VolumeId(1),
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap()
}
#[test]
fn test_data_file_access_control_blocks_writer_until_reader_releases() {
let control = Arc::new(DataFileAccessControl::default());
let read_lease = control.read_lock();
let writer_control = control.clone();
let acquired = Arc::new(std::sync::atomic::AtomicBool::new(false));
let acquired_clone = acquired.clone();
let writer = std::thread::spawn(move || {
let _write_lease = writer_control.write_lock();
acquired_clone.store(true, std::sync::atomic::Ordering::Relaxed);
});
std::thread::sleep(std::time::Duration::from_millis(50));
assert!(!acquired.load(std::sync::atomic::Ordering::Relaxed));
drop(read_lease);
writer.join().unwrap();
assert!(acquired.load(std::sync::atomic::Ordering::Relaxed));
}
#[test]
fn test_volume_file_name() {
assert_eq!(volume_file_name("/data", "", VolumeId(1)), "/data/1");
assert_eq!(
volume_file_name("/data", "pics", VolumeId(42)),
"/data/pics_42"
);
}
#[test]
fn test_volume_create_and_load() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let v = make_test_volume(dir);
assert_eq!(v.version(), VERSION_3);
assert_eq!(v.file_count(), 0);
assert_eq!(v.content_size(), 0);
// .dat and .idx files should exist
assert!(Path::new(&v.file_name(".dat")).exists());
assert!(Path::new(&v.file_name(".idx")).exists());
}
#[test]
fn test_volume_write_read() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut v = make_test_volume(dir);
// Write a needle
let mut n = Needle {
id: NeedleId(1),
cookie: Cookie(0x12345678),
data: b"hello world".to_vec(),
data_size: 11,
flags: 0,
..Needle::default()
};
let (offset, size, unchanged) = v.write_needle(&mut n, true).unwrap();
assert!(!unchanged);
assert!(offset > 0); // after superblock
assert!(size.0 > 0);
assert_eq!(v.file_count(), 1);
// Read it back
let mut read_n = Needle {
id: NeedleId(1),
..Needle::default()
};
let count = v.read_needle(&mut read_n).unwrap();
assert_eq!(count, 11);
assert_eq!(read_n.data, b"hello world");
assert_eq!(read_n.cookie, Cookie(0x12345678));
}
#[test]
fn test_volume_write_dedup() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut v = make_test_volume(dir);
let mut n = Needle {
id: NeedleId(1),
cookie: Cookie(0xaa),
data: b"same data".to_vec(),
data_size: 9,
..Needle::default()
};
v.write_needle(&mut n, true).unwrap();
// Write same needle again — should be unchanged
let mut n2 = Needle {
id: NeedleId(1),
cookie: Cookie(0xaa),
data: b"same data".to_vec(),
data_size: 9,
..Needle::default()
};
n2.checksum = CRC::new(&n2.data);
let (_, _, unchanged) = v.write_needle(&mut n2, true).unwrap();
assert!(unchanged);
}
#[test]
fn test_volume_delete() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut v = make_test_volume(dir);
let mut n = Needle {
id: NeedleId(1),
cookie: Cookie(0xbb),
data: b"delete me".to_vec(),
data_size: 9,
..Needle::default()
};
v.write_needle(&mut n, true).unwrap();
assert_eq!(v.file_count(), 1);
let deleted_size = v
.delete_needle(&mut Needle {
id: NeedleId(1),
cookie: Cookie(0xbb),
..Needle::default()
})
.unwrap();
assert!(deleted_size.0 > 0);
assert_eq!(v.file_count(), 0);
assert_eq!(v.deleted_count(), 1);
// Read should fail with Deleted
let mut read_n = Needle {
id: NeedleId(1),
..Needle::default()
};
let err = v.read_needle(&mut read_n).unwrap_err();
assert!(matches!(err, VolumeError::Deleted));
}
#[test]
fn test_volume_multiple_needles() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut v = make_test_volume(dir);
for i in 1..=10 {
let data = format!("needle data {}", i);
let mut n = Needle {
id: NeedleId(i),
cookie: Cookie(i as u32),
data: data.as_bytes().to_vec(),
data_size: data.len() as u32,
..Needle::default()
};
v.write_needle(&mut n, true).unwrap();
}
assert_eq!(v.file_count(), 10);
assert_eq!(v.max_file_key(), NeedleId(10));
// Read back needle 5
let mut n = Needle {
id: NeedleId(5),
..Needle::default()
};
v.read_needle(&mut n).unwrap();
assert_eq!(n.data, b"needle data 5");
}
#[test]
fn test_volume_reload_from_disk() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
// Write some needles
{
let mut v = make_test_volume(dir);
for i in 1..=3 {
let data = format!("data {}", i);
let mut n = Needle {
id: NeedleId(i),
cookie: Cookie(i as u32),
data: data.as_bytes().to_vec(),
data_size: data.len() as u32,
..Needle::default()
};
v.write_needle(&mut n, true).unwrap();
}
v.sync_to_disk().unwrap();
}
// Reload and verify
let v = Volume::new(
dir,
dir,
"",
VolumeId(1),
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap();
assert_eq!(v.file_count(), 3);
let mut n = Needle {
id: NeedleId(2),
..Needle::default()
};
v.read_needle(&mut n).unwrap();
assert_eq!(std::str::from_utf8(&n.data).unwrap(), "data 2");
}
#[test]
fn test_volume_cookie_mismatch() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut v = make_test_volume(dir);
let mut n = Needle {
id: NeedleId(1),
cookie: Cookie(0xaa),
data: b"original".to_vec(),
data_size: 8,
..Needle::default()
};
v.write_needle(&mut n, true).unwrap();
// Write with wrong cookie
let mut n2 = Needle {
id: NeedleId(1),
cookie: Cookie(0xbb),
data: b"overwrite".to_vec(),
data_size: 9,
..Needle::default()
};
let err = v.write_needle(&mut n2, true).unwrap_err();
assert!(matches!(err, VolumeError::CookieMismatch(_)));
}
#[test]
fn test_volume_destroy() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let dat_path;
let idx_path;
{
let mut v = make_test_volume(dir);
dat_path = v.file_name(".dat");
idx_path = v.file_name(".idx");
assert!(Path::new(&dat_path).exists());
v.destroy().unwrap();
}
assert!(!Path::new(&dat_path).exists());
assert!(!Path::new(&idx_path).exists());
}
#[test]
fn test_get_append_at_ns() {
let t1 = get_append_at_ns(0);
assert!(t1 > 0);
let t2 = get_append_at_ns(t1);
assert!(t2 > t1);
// If we pass a future timestamp, should return last+1
let future = u64::MAX - 1;
let t3 = get_append_at_ns(future);
assert_eq!(t3, future + 1);
}
#[test]
fn test_volume_compact() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut v = make_test_volume(dir);
// Write 3 needles
for i in 1..=3u64 {
let mut n = Needle {
id: NeedleId(i),
cookie: Cookie(i as u32),
data: format!("data-{}", i).into_bytes(),
data_size: format!("data-{}", i).len() as u32,
..Needle::default()
};
v.write_needle(&mut n, true).unwrap();
}
assert_eq!(v.file_count(), 3);
// Delete needle 2
let mut del = Needle {
id: NeedleId(2),
cookie: Cookie(2),
..Needle::default()
};
v.delete_needle(&mut del).unwrap();
assert_eq!(v.file_count(), 2);
assert_eq!(v.deleted_count(), 1);
let dat_size_before = v.dat_file_size().unwrap();
// Compact
v.compact_by_index(0, 0, |_| true).unwrap();
// Verify compact files exist
assert!(Path::new(&v.file_name(".cpd")).exists());
assert!(Path::new(&v.file_name(".cpx")).exists());
// Commit: swap files and reload
v.commit_compact().unwrap();
// After compaction: 2 live needles, 0 deleted
assert_eq!(v.file_count(), 2);
assert_eq!(v.deleted_count(), 0);
// Dat should be smaller (deleted needle removed)
let dat_size_after = v.dat_file_size().unwrap();
assert!(
dat_size_after < dat_size_before,
"dat should shrink after compact"
);
// Read back live needles
let mut n1 = Needle {
id: NeedleId(1),
..Needle::default()
};
v.read_needle(&mut n1).unwrap();
assert_eq!(n1.data, b"data-1");
let mut n3 = Needle {
id: NeedleId(3),
..Needle::default()
};
v.read_needle(&mut n3).unwrap();
assert_eq!(n3.data, b"data-3");
// Needle 2 should not exist
let mut n2 = Needle {
id: NeedleId(2),
..Needle::default()
};
assert!(v.read_needle(&mut n2).is_err());
// Compact files should not exist after commit
assert!(!Path::new(&v.file_name(".cpd")).exists());
assert!(!Path::new(&v.file_name(".cpx")).exists());
// Cleanup should be a no-op
v.cleanup_compact().unwrap();
}
}