From d2bc9c43f70366bb4f27236d42a90ce3e48591d7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2026 17:47:44 -0700 Subject: [PATCH] feat: incomplete volume cleanup on startup matching Go behavior - Check for .note file (interrupted VolumeCopy) and remove partial volumes - Validate EC shards before skipping .dat loading: check shard count, uniform size, and expected size from .dat file - Remove stale .cpd/.cpx compaction temp files on startup - Skip already-loaded volumes on repeated load_existing_volumes calls - Fix healthz test: set is_heartbeating=true in test state --- seaweed-volume/src/storage/disk_location.rs | 188 +++++++++++++++++++- seaweed-volume/src/storage/volume.rs | 2 +- seaweed-volume/tests/http_integration.rs | 2 +- 3 files changed, 188 insertions(+), 4 deletions(-) diff --git a/seaweed-volume/src/storage/disk_location.rs b/seaweed-volume/src/storage/disk_location.rs index 0ed307781..7fa89248d 100644 --- a/seaweed-volume/src/storage/disk_location.rs +++ b/seaweed-volume/src/storage/disk_location.rs @@ -9,11 +9,16 @@ use std::fs; use std::io; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering}; +use tracing::{info, warn}; + use crate::config::MinFreeSpace; +use crate::storage::erasure_coding::ec_shard::{ + DATA_SHARDS_COUNT, ERASURE_CODING_LARGE_BLOCK_SIZE, ERASURE_CODING_SMALL_BLOCK_SIZE, +}; use crate::storage::needle_map::NeedleMapKind; use crate::storage::super_block::ReplicaPlacement; use crate::storage::types::*; -use crate::storage::volume::{Volume, VolumeError}; +use crate::storage::volume::{remove_volume_files, volume_file_name, Volume, VolumeError}; /// A single disk location managing volumes in one directory. pub struct DiskLocation { @@ -83,6 +88,10 @@ impl DiskLocation { // ---- Volume management ---- /// Load existing volumes from the directory. + /// + /// Matches Go's `loadExistingVolume`: checks for incomplete volumes (.note file), + /// validates EC shards before skipping .dat loading, and cleans up stale + /// compaction temp files (.cpd/.cpx). pub fn load_existing_volumes(&mut self, needle_map_kind: NeedleMapKind) -> io::Result<()> { // Ensure directory exists fs::create_dir_all(&self.directory)?; @@ -105,6 +114,55 @@ impl DiskLocation { } for (collection, vid) in dat_files { + let volume_name = volume_file_name(&self.directory, &collection, vid); + let idx_name = volume_file_name(&self.idx_directory, &collection, vid); + + // Check for incomplete volume (.note file means a VolumeCopy was interrupted) + let note_path = format!("{}.note", volume_name); + if std::path::Path::new(¬e_path).exists() { + let note = fs::read_to_string(¬e_path).unwrap_or_default(); + warn!( + volume_id = vid.0, + "volume was not completed: {}, removing files", note + ); + remove_volume_files(&volume_name); + remove_volume_files(&idx_name); + continue; + } + + // If valid EC shards exist (.ecx file present), skip loading .dat + let ecx_path = format!("{}.ecx", idx_name); + if std::path::Path::new(&ecx_path).exists() { + if self.validate_ec_volume(&collection, vid) { + // Valid EC volume — don't load .dat + continue; + } else { + warn!( + volume_id = vid.0, + "EC volume validation failed, removing incomplete EC files" + ); + self.remove_ec_volume_files(&collection, vid); + // Fall through to load .dat file + } + } + + // Clean up stale compaction temp files + let cpd_path = format!("{}.cpd", volume_name); + let cpx_path = format!("{}.cpx", idx_name); + if std::path::Path::new(&cpd_path).exists() { + info!(volume_id = vid.0, "removing stale compaction file .cpd"); + let _ = fs::remove_file(&cpd_path); + } + if std::path::Path::new(&cpx_path).exists() { + info!(volume_id = vid.0, "removing stale compaction file .cpx"); + let _ = fs::remove_file(&cpx_path); + } + + // Skip if already loaded (e.g., from a previous call) + if self.volumes.contains_key(&vid) { + continue; + } + match Volume::new( &self.directory, &self.idx_directory, @@ -120,7 +178,7 @@ impl DiskLocation { self.volumes.insert(vid, v); } Err(e) => { - eprintln!("Error loading volume {}: {}", vid, e); + warn!(volume_id = vid.0, error = %e, "failed to load volume"); } } } @@ -128,6 +186,111 @@ impl DiskLocation { Ok(()) } + /// Validate EC volume shards: all shards must be same size, and if .dat exists, + /// need at least DATA_SHARDS_COUNT shards with size matching expected. + fn validate_ec_volume(&self, collection: &str, vid: VolumeId) -> bool { + let base = volume_file_name(&self.directory, collection, vid); + let dat_path = format!("{}.dat", base); + + let mut expected_shard_size: Option = None; + let dat_exists = std::path::Path::new(&dat_path).exists(); + + if dat_exists { + if let Ok(meta) = fs::metadata(&dat_path) { + expected_shard_size = Some(calculate_expected_shard_size(meta.len() as i64)); + } else { + return false; + } + } + + let mut shard_count = 0usize; + let mut actual_shard_size: Option = None; + const MAX_SHARD_COUNT: usize = 32; + + for i in 0..MAX_SHARD_COUNT { + let shard_path = format!("{}.ec{:02}", base, i); + match fs::metadata(&shard_path) { + Ok(meta) if meta.len() > 0 => { + let size = meta.len() as i64; + if let Some(prev) = actual_shard_size { + if size != prev { + warn!( + volume_id = vid.0, + shard = i, + size, + expected = prev, + "EC shard size mismatch" + ); + return false; + } + } else { + actual_shard_size = Some(size); + } + shard_count += 1; + } + Err(e) if e.kind() != io::ErrorKind::NotFound => { + warn!( + volume_id = vid.0, + shard = i, + error = %e, + "failed to stat EC shard" + ); + return false; + } + _ => {} // not found or zero size — skip + } + } + + // If .dat exists, validate shard size matches expected + if dat_exists { + if let (Some(actual), Some(expected)) = (actual_shard_size, expected_shard_size) { + if actual != expected { + warn!( + volume_id = vid.0, + actual_shard_size = actual, + expected_shard_size = expected, + "EC shard size doesn't match .dat file" + ); + return false; + } + } + } + + // Distributed EC (no .dat): any shard count is valid + if !dat_exists { + return true; + } + + // With .dat: need at least DATA_SHARDS_COUNT shards + if shard_count < DATA_SHARDS_COUNT { + warn!( + volume_id = vid.0, + shard_count, + required = DATA_SHARDS_COUNT, + "EC volume has .dat but too few shards" + ); + return false; + } + + true + } + + /// Remove all EC-related files for a volume. + fn remove_ec_volume_files(&self, collection: &str, vid: VolumeId) { + let base = volume_file_name(&self.directory, collection, vid); + let idx_base = volume_file_name(&self.idx_directory, collection, vid); + const MAX_SHARD_COUNT: usize = 32; + + // Remove index files first (.ecx, .ecj) + let _ = fs::remove_file(format!("{}.ecx", idx_base)); + let _ = fs::remove_file(format!("{}.ecj", idx_base)); + + // Remove all EC shard files (.ec00 ~ .ec31) + for i in 0..MAX_SHARD_COUNT { + let _ = fs::remove_file(format!("{}.ec{:02}", base, i)); + } + } + /// Find a volume by ID. pub fn find_volume(&self, vid: VolumeId) -> Option<&Volume> { self.volumes.get(&vid) @@ -298,6 +461,27 @@ pub fn get_disk_stats(path: &str) -> (u64, u64) { } } +/// Calculate expected EC shard size from .dat file size. +/// Matches Go's `calculateExpectedShardSize`: large blocks (1GB * data_shards) first, +/// then small blocks (1MB * data_shards) for the remainder. +fn calculate_expected_shard_size(dat_file_size: i64) -> i64 { + let large_batch_size = + ERASURE_CODING_LARGE_BLOCK_SIZE as i64 * DATA_SHARDS_COUNT as i64; + let num_large_batches = dat_file_size / large_batch_size; + let mut shard_size = num_large_batches * ERASURE_CODING_LARGE_BLOCK_SIZE as i64; + let remaining = dat_file_size - (num_large_batches * large_batch_size); + + if remaining > 0 { + let small_batch_size = + ERASURE_CODING_SMALL_BLOCK_SIZE as i64 * DATA_SHARDS_COUNT as i64; + // Ceiling division + let num_small_batches = (remaining + small_batch_size - 1) / small_batch_size; + shard_size += num_small_batches * ERASURE_CODING_SMALL_BLOCK_SIZE as i64; + } + + shard_size +} + /// Parse a volume filename like "collection_42.dat" or "42.dat" into (collection, VolumeId). fn parse_volume_filename(filename: &str) -> Option<(String, VolumeId)> { let stem = filename.strip_suffix(".dat")?; diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 7bb985437..a5ad16660 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -2068,7 +2068,7 @@ fn get_append_at_ns(last: u64) -> u64 { } /// Remove all files associated with a volume. -fn remove_volume_files(base: &str) { +pub(crate) fn remove_volume_files(base: &str) { for ext in &[".dat", ".idx", ".vif", ".sdx", ".cpd", ".cpx", ".note"] { let _ = fs::remove_file(format!("{}{}", base, ext)); } diff --git a/seaweed-volume/tests/http_integration.rs b/seaweed-volume/tests/http_integration.rs index cb363e706..669b2dbaa 100644 --- a/seaweed-volume/tests/http_integration.rs +++ b/seaweed-volume/tests/http_integration.rs @@ -63,7 +63,7 @@ fn test_state_with_signing_key(signing_key: Vec) -> (Arc, data_center: String::new(), rack: String::new(), file_size_limit_bytes: 0, - is_heartbeating: std::sync::atomic::AtomicBool::new(false), + is_heartbeating: std::sync::atomic::AtomicBool::new(true), has_master: false, pre_stop_seconds: 0, volume_state_notify: tokio::sync::Notify::new(),