diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index aa0de2c68..ffe627c2e 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -32,10 +32,13 @@ fn main() { .build() .expect("Failed to build tokio runtime"); - rt.block_on(run(config)); + if let Err(e) = rt.block_on(run(config)) { + error!("Volume server failed: {}", e); + std::process::exit(1); + } } -async fn run(config: VolumeServerConfig) { +async fn run(config: VolumeServerConfig) -> Result<(), Box> { // Initialize the store let mut store = Store::new(config.index_type); store.ip = config.ip.clone(); @@ -59,13 +62,13 @@ async fn run(config: VolumeServerConfig) { "Adding storage location: {} (max_volumes={}, disk_type={:?})", dir, max_volumes, disk_type ); - if let Err(e) = store.add_location(dir, idx_dir, max_volumes, disk_type) { - error!("Failed to add storage location {}: {}", dir, e); - return; - } + store.add_location(dir, idx_dir, max_volumes, disk_type) + .map_err(|e| format!("Failed to add storage location {}: {}", dir, e))?; } // Build shared state + // TODO: Wire up JWT signing keys from config. Empty keys are acceptable for now + // while the Rust volume server is still in development. let guard = Guard::new( &config.white_list, SigningKey(vec![]), @@ -185,4 +188,5 @@ async fn run(config: VolumeServerConfig) { } info!("Volume server stopped."); + Ok(()) } diff --git a/seaweed-volume/src/security.rs b/seaweed-volume/src/security.rs index 9c8dfd970..654538c40 100644 --- a/seaweed-volume/src/security.rs +++ b/seaweed-volume/src/security.rs @@ -224,13 +224,17 @@ impl Guard { let token = token.ok_or(JwtError::MissingToken)?; let claims = decode_jwt(key, token)?; - if let Some(ref fid) = claims.fid { - if fid != expected_fid { + match claims.fid { + None => { + return Err(JwtError::MissingFileIdClaim); + } + Some(ref fid) if fid != expected_fid => { return Err(JwtError::FileIdMismatch { expected: expected_fid.to_string(), got: fid.to_string(), }); } + _ => {} } Ok(()) @@ -301,6 +305,9 @@ pub enum JwtError { #[error("JWT error: {0}")] Jwt(#[from] jsonwebtoken::errors::Error), + #[error("JWT token missing required fid claim")] + MissingFileIdClaim, + #[error("file ID mismatch: expected {expected}, got {got}")] FileIdMismatch { expected: String, got: String }, } diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index abd8b0a71..4b7c7aa2e 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -201,8 +201,16 @@ impl VolumeServer for VolumeGrpcService { &self, request: Request, ) -> Result, Status> { - let vid = VolumeId(request.into_inner().volume_id); + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); let mut store = self.state.store.write().unwrap(); + if req.only_empty { + let (_, vol) = store.find_volume(vid) + .ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?; + if vol.file_count() > 0 { + return Err(Status::failed_precondition("volume is not empty")); + } + } store.delete_volume(vid) .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(volume_server_pb::VolumeDeleteResponse {})) diff --git a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs index fcd08c674..3283bda0d 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_encoder.rs @@ -72,11 +72,15 @@ fn write_sorted_ecx_from_idx(idx_path: &str, ecx_path: &str) -> io::Result<()> { Ok(()) })?; - // Sort by NeedleId - entries.sort_by_key(|&(key, _, _)| key); + // Sort by NeedleId, then by actual offset so later entries come last + entries.sort_by_key(|&(key, offset, _)| (key, offset.to_actual_offset())); - // Remove duplicates (keep last entry for each key) + // Remove duplicates (keep last/latest entry for each key). + // dedup_by_key keeps the first in each run, so we reverse first, + // dedup, then reverse back. + entries.reverse(); entries.dedup_by_key(|entry| entry.0); + entries.reverse(); // Write sorted entries to .ecx let mut ecx_file = File::create(ecx_path)?; @@ -140,8 +144,7 @@ fn encode_one_batch( #[cfg(unix)] { use std::os::unix::fs::FileExt; - // Read what we can; zeros fill the rest (already initialized) - let _ = dat_file.read_at(&mut buffers[i], read_offset); + dat_file.read_at(&mut buffers[i], read_offset)?; } } diff --git a/seaweed-volume/src/storage/erasure_coding/ec_shard.rs b/seaweed-volume/src/storage/erasure_coding/ec_shard.rs index 2fa4d6f66..5bd410606 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_shard.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_shard.rs @@ -79,6 +79,16 @@ impl EcVolumeShard { use std::os::unix::fs::FileExt; file.read_at(buf, offset) } + + #[cfg(not(unix))] + { + use std::io::{Read, Seek, SeekFrom}; + // File::read_at is unix-only; fall back to seek + read. + // We need a mutable reference for seek/read, so clone the handle. + let mut f = file.try_clone()?; + f.seek(SeekFrom::Start(offset))?; + f.read(buf) + } } /// Write data to the shard file (appends). diff --git a/seaweed-volume/src/storage/needle_map.rs b/seaweed-volume/src/storage/needle_map.rs index 6a8595507..284610042 100644 --- a/seaweed-volume/src/storage/needle_map.rs +++ b/seaweed-volume/src/storage/needle_map.rs @@ -142,16 +142,16 @@ impl CompactNeedleMap { /// Insert or update an entry. Appends to .idx file if present. pub fn put(&mut self, key: NeedleId, offset: Offset, size: Size) -> io::Result<()> { - let old = self.map.get(&key).cloned(); - let nv = NeedleValue { offset, size }; - self.metric.on_put(key, old.as_ref(), size); - self.map.insert(key, nv); - - // Append to index file + // Persist to idx file BEFORE mutating in-memory state for crash consistency if let Some(ref mut idx_file) = self.idx_file { idx::write_index_entry(idx_file, key, offset, size)?; self.idx_file_offset += NEEDLE_MAP_ENTRY_SIZE as u64; } + + let old = self.map.get(&key).cloned(); + let nv = NeedleValue { offset, size }; + self.metric.on_put(key, old.as_ref(), size); + self.map.insert(key, nv); Ok(()) } @@ -164,15 +164,15 @@ impl CompactNeedleMap { pub fn delete(&mut self, key: NeedleId, offset: Offset) -> io::Result> { if let Some(old) = self.map.get(&key).cloned() { if old.size.is_valid() { - self.metric.on_delete(&old); - let deleted_size = Size(-(old.size.0)); - self.map.insert(key, NeedleValue { offset, size: deleted_size }); - - // Append tombstone to index file + // Persist tombstone to idx file BEFORE mutating in-memory state for crash consistency if let Some(ref mut idx_file) = self.idx_file { idx::write_index_entry(idx_file, key, offset, TOMBSTONE_FILE_SIZE)?; self.idx_file_offset += NEEDLE_MAP_ENTRY_SIZE as u64; } + + self.metric.on_delete(&old); + let deleted_size = Size(-(old.size.0)); + self.map.insert(key, NeedleValue { offset, size: deleted_size }); return Ok(Some(old.size)); } } diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index 3dc13854d..03e0058b9 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -116,6 +116,9 @@ impl Store { preallocate: u64, disk_type: DiskType, ) -> Result<(), VolumeError> { + if self.find_volume(vid).is_some() { + return Err(VolumeError::AlreadyExists); + } let loc_idx = self.find_free_location(&disk_type).ok_or_else(|| { VolumeError::Io(io::Error::new( io::ErrorKind::Other, diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index d3334307f..12ffb4371 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -14,6 +14,8 @@ use std::io::{self, Read, Seek, SeekFrom, Write}; use std::path::Path; use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::warn; + use crate::storage::needle::needle::{self, Needle, NeedleError, get_actual_size}; use crate::storage::needle_map::{CompactNeedleMap, NeedleMapKind}; use crate::storage::super_block::{SuperBlock, ReplicaPlacement, SUPER_BLOCK_SIZE}; @@ -43,6 +45,9 @@ pub enum VolumeError { #[error("volume not empty")] NotEmpty, + #[error("volume already exists")] + AlreadyExists, + #[error("volume is read-only")] ReadOnly, @@ -234,6 +239,14 @@ impl Volume { } fn load_index(&mut self) -> Result<(), VolumeError> { + if self.needle_map_kind != NeedleMapKind::InMemory { + warn!( + volume_id = self.id.0, + kind = ?self.needle_map_kind, + "only InMemory needle map is currently supported, falling back to InMemory" + ); + } + let idx_path = self.file_name(".idx"); // Ensure idx directory exists @@ -541,15 +554,15 @@ impl Volume { } fn do_delete_request(&mut self, n: &mut Needle) -> Result { - let (found, size) = if let Some(nm) = &self.nm { + let (found, size, stored_offset) = if let Some(nm) = &self.nm { if let Some(nv) = nm.get(n.id) { if !nv.size.is_deleted() { - (true, nv.size) + (true, nv.size, nv.offset) } else { - (false, Size(0)) + (false, Size(0), Offset::default()) } } else { - (false, Size(0)) + (false, Size(0), Offset::default()) } } else { return Ok(Size(0)); @@ -559,6 +572,15 @@ impl Volume { return Ok(Size(0)); } + // Cookie validation: read stored needle header and verify cookie matches + { + let mut existing = Needle::default(); + self.read_needle_header(&mut existing, stored_offset.to_actual_offset())?; + if existing.cookie != n.cookie { + return Err(VolumeError::CookieMismatch(n.cookie.0)); + } + } + // Write tombstone: append needle with empty data n.data = vec![]; n.append_at_ns = get_append_at_ns(self.last_append_at_ns); @@ -784,7 +806,12 @@ pub fn scan_volume_file( let body_length = needle::needle_body_length(size, version); let total_size = NEEDLE_HEADER_SIZE as i64 + body_length; - if visitor.read_needle_body() { + // Skip full body parsing for deleted needles (tombstone or negative size) + if size.is_deleted() || size.0 <= 0 { + let mut n = Needle::default(); + n.read_header(&header); + visitor.visit_needle(&n, offset)?; + } else if visitor.read_needle_body() { let mut buf = vec![0u8; total_size as usize]; file.seek(SeekFrom::Start(offset as u64))?; file.read_exact(&mut buf)?;