@ -9,11 +9,16 @@ use std::fs;
use std ::io ;
use std ::io ;
use std ::sync ::atomic ::{ AtomicBool , AtomicI32 , AtomicU64 , Ordering } ;
use std ::sync ::atomic ::{ AtomicBool , AtomicI32 , AtomicU64 , Ordering } ;
use tracing ::{ info , warn } ;
use crate ::config ::MinFreeSpace ;
use crate ::config ::MinFreeSpace ;
use crate ::storage ::erasure_coding ::ec_shard ::{
DATA_SHARDS_COUNT , ERASURE_CODING_LARGE_BLOCK_SIZE , ERASURE_CODING_SMALL_BLOCK_SIZE ,
} ;
use crate ::storage ::needle_map ::NeedleMapKind ;
use crate ::storage ::needle_map ::NeedleMapKind ;
use crate ::storage ::super_block ::ReplicaPlacement ;
use crate ::storage ::super_block ::ReplicaPlacement ;
use crate ::storage ::types ::* ;
use crate ::storage ::types ::* ;
use crate ::storage ::volume ::{ Volume , VolumeError } ;
use crate ::storage ::volume ::{ remove_volume_files , volume_file_name , Volume , VolumeError } ;
/// A single disk location managing volumes in one directory.
/// A single disk location managing volumes in one directory.
pub struct DiskLocation {
pub struct DiskLocation {
@ -83,6 +88,10 @@ impl DiskLocation {
// ---- Volume management ----
// ---- Volume management ----
/// Load existing volumes from the directory.
/// Load existing volumes from the directory.
///
/// Matches Go's `loadExistingVolume`: checks for incomplete volumes (.note file),
/// validates EC shards before skipping .dat loading, and cleans up stale
/// compaction temp files (.cpd/.cpx).
pub fn load_existing_volumes ( & mut self , needle_map_kind : NeedleMapKind ) -> io ::Result < ( ) > {
pub fn load_existing_volumes ( & mut self , needle_map_kind : NeedleMapKind ) -> io ::Result < ( ) > {
// Ensure directory exists
// Ensure directory exists
fs ::create_dir_all ( & self . directory ) ? ;
fs ::create_dir_all ( & self . directory ) ? ;
@ -105,6 +114,55 @@ impl DiskLocation {
}
}
for ( collection , vid ) in dat_files {
for ( collection , vid ) in dat_files {
let volume_name = volume_file_name ( & self . directory , & collection , vid ) ;
let idx_name = volume_file_name ( & self . idx_directory , & collection , vid ) ;
// Check for incomplete volume (.note file means a VolumeCopy was interrupted)
let note_path = format ! ( "{}.note" , volume_name ) ;
if std ::path ::Path ::new ( & note_path ) . exists ( ) {
let note = fs ::read_to_string ( & note_path ) . unwrap_or_default ( ) ;
warn ! (
volume_id = vid . 0 ,
"volume was not completed: {}, removing files" , note
) ;
remove_volume_files ( & volume_name ) ;
remove_volume_files ( & idx_name ) ;
continue ;
}
// If valid EC shards exist (.ecx file present), skip loading .dat
let ecx_path = format ! ( "{}.ecx" , idx_name ) ;
if std ::path ::Path ::new ( & ecx_path ) . exists ( ) {
if self . validate_ec_volume ( & collection , vid ) {
// Valid EC volume — don't load .dat
continue ;
} else {
warn ! (
volume_id = vid . 0 ,
"EC volume validation failed, removing incomplete EC files"
) ;
self . remove_ec_volume_files ( & collection , vid ) ;
// Fall through to load .dat file
}
}
// Clean up stale compaction temp files
let cpd_path = format ! ( "{}.cpd" , volume_name ) ;
let cpx_path = format ! ( "{}.cpx" , idx_name ) ;
if std ::path ::Path ::new ( & cpd_path ) . exists ( ) {
info ! ( volume_id = vid . 0 , "removing stale compaction file .cpd" ) ;
let _ = fs ::remove_file ( & cpd_path ) ;
}
if std ::path ::Path ::new ( & cpx_path ) . exists ( ) {
info ! ( volume_id = vid . 0 , "removing stale compaction file .cpx" ) ;
let _ = fs ::remove_file ( & cpx_path ) ;
}
// Skip if already loaded (e.g., from a previous call)
if self . volumes . contains_key ( & vid ) {
continue ;
}
match Volume ::new (
match Volume ::new (
& self . directory ,
& self . directory ,
& self . idx_directory ,
& self . idx_directory ,
@ -120,7 +178,7 @@ impl DiskLocation {
self . volumes . insert ( vid , v ) ;
self . volumes . insert ( vid , v ) ;
}
}
Err ( e ) = > {
Err ( e ) = > {
eprintln ! ( "Error loading volume {}: {}" , vid , e ) ;
warn ! ( volume_id = vid . 0 , error = % e , "failed to load volume" ) ;
}
}
}
}
}
}
@ -128,6 +186,111 @@ impl DiskLocation {
Ok ( ( ) )
Ok ( ( ) )
}
}
/// Validate EC volume shards: all shards must be same size, and if .dat exists,
/// need at least DATA_SHARDS_COUNT shards with size matching expected.
fn validate_ec_volume ( & self , collection : & str , vid : VolumeId ) -> bool {
let base = volume_file_name ( & self . directory , collection , vid ) ;
let dat_path = format ! ( "{}.dat" , base ) ;
let mut expected_shard_size : Option < i64 > = None ;
let dat_exists = std ::path ::Path ::new ( & dat_path ) . exists ( ) ;
if dat_exists {
if let Ok ( meta ) = fs ::metadata ( & dat_path ) {
expected_shard_size = Some ( calculate_expected_shard_size ( meta . len ( ) as i64 ) ) ;
} else {
return false ;
}
}
let mut shard_count = 0 usize ;
let mut actual_shard_size : Option < i64 > = None ;
const MAX_SHARD_COUNT : usize = 32 ;
for i in 0 . . MAX_SHARD_COUNT {
let shard_path = format ! ( "{}.ec{:02}" , base , i ) ;
match fs ::metadata ( & shard_path ) {
Ok ( meta ) if meta . len ( ) > 0 = > {
let size = meta . len ( ) as i64 ;
if let Some ( prev ) = actual_shard_size {
if size ! = prev {
warn ! (
volume_id = vid . 0 ,
shard = i ,
size ,
expected = prev ,
"EC shard size mismatch"
) ;
return false ;
}
} else {
actual_shard_size = Some ( size ) ;
}
shard_count + = 1 ;
}
Err ( e ) if e . kind ( ) ! = io ::ErrorKind ::NotFound = > {
warn ! (
volume_id = vid . 0 ,
shard = i ,
error = % e ,
"failed to stat EC shard"
) ;
return false ;
}
_ = > { } // not found or zero size — skip
}
}
// If .dat exists, validate shard size matches expected
if dat_exists {
if let ( Some ( actual ) , Some ( expected ) ) = ( actual_shard_size , expected_shard_size ) {
if actual ! = expected {
warn ! (
volume_id = vid . 0 ,
actual_shard_size = actual ,
expected_shard_size = expected ,
"EC shard size doesn't match .dat file"
) ;
return false ;
}
}
}
// Distributed EC (no .dat): any shard count is valid
if ! dat_exists {
return true ;
}
// With .dat: need at least DATA_SHARDS_COUNT shards
if shard_count < DATA_SHARDS_COUNT {
warn ! (
volume_id = vid . 0 ,
shard_count ,
required = DATA_SHARDS_COUNT ,
"EC volume has .dat but too few shards"
) ;
return false ;
}
true
}
/// Remove all EC-related files for a volume.
fn remove_ec_volume_files ( & self , collection : & str , vid : VolumeId ) {
let base = volume_file_name ( & self . directory , collection , vid ) ;
let idx_base = volume_file_name ( & self . idx_directory , collection , vid ) ;
const MAX_SHARD_COUNT : usize = 32 ;
// Remove index files first (.ecx, .ecj)
let _ = fs ::remove_file ( format ! ( "{}.ecx" , idx_base ) ) ;
let _ = fs ::remove_file ( format ! ( "{}.ecj" , idx_base ) ) ;
// Remove all EC shard files (.ec00 ~ .ec31)
for i in 0 . . MAX_SHARD_COUNT {
let _ = fs ::remove_file ( format ! ( "{}.ec{:02}" , base , i ) ) ;
}
}
/// Find a volume by ID.
/// Find a volume by ID.
pub fn find_volume ( & self , vid : VolumeId ) -> Option < & Volume > {
pub fn find_volume ( & self , vid : VolumeId ) -> Option < & Volume > {
self . volumes . get ( & vid )
self . volumes . get ( & vid )
@ -298,6 +461,27 @@ pub fn get_disk_stats(path: &str) -> (u64, u64) {
}
}
}
}
/// Calculate expected EC shard size from .dat file size.
/// Matches Go's `calculateExpectedShardSize`: large blocks (1GB * data_shards) first,
/// then small blocks (1MB * data_shards) for the remainder.
fn calculate_expected_shard_size ( dat_file_size : i64 ) -> i64 {
let large_batch_size =
ERASURE_CODING_LARGE_BLOCK_SIZE as i64 * DATA_SHARDS_COUNT as i64 ;
let num_large_batches = dat_file_size / large_batch_size ;
let mut shard_size = num_large_batches * ERASURE_CODING_LARGE_BLOCK_SIZE as i64 ;
let remaining = dat_file_size - ( num_large_batches * large_batch_size ) ;
if remaining > 0 {
let small_batch_size =
ERASURE_CODING_SMALL_BLOCK_SIZE as i64 * DATA_SHARDS_COUNT as i64 ;
// Ceiling division
let num_small_batches = ( remaining + small_batch_size - 1 ) / small_batch_size ;
shard_size + = num_small_batches * ERASURE_CODING_SMALL_BLOCK_SIZE as i64 ;
}
shard_size
}
/// Parse a volume filename like "collection_42.dat" or "42.dat" into (collection, VolumeId).
/// Parse a volume filename like "collection_42.dat" or "42.dat" into (collection, VolumeId).
fn parse_volume_filename ( filename : & str ) -> Option < ( String , VolumeId ) > {
fn parse_volume_filename ( filename : & str ) -> Option < ( String , VolumeId ) > {
let stem = filename . strip_suffix ( ".dat" ) ? ;
let stem = filename . strip_suffix ( ".dat" ) ? ;