Browse Source

Address all code review comments from PR #8539

Security:
- Require fid claim in JWT tokens (reject tokens without fid)

gRPC:
- Honor only_empty flag in VolumeDelete RPC

Storage:
- Reject duplicate volume IDs in add_volume
- Persist .idx entries before mutating in-memory state (crash consistency)
- Validate cookie on needle delete to prevent unauthorized tombstoning
- Skip full body parsing for deleted needles during volume scans
- Log warning when unsupported needle_map_kind is requested

Erasure coding:
- Sort index entries by actual offset for correct dedup ordering
- Propagate .dat read errors instead of encoding zero-filled shards
- Add non-unix fallback for ec_shard reads

Main:
- Propagate startup errors via Result instead of silent return
- Add TODO for JWT config wiring
rust-volume-server
Chris Lu 5 days ago
parent
commit
df604b0c14
  1. 16
      seaweed-volume/src/main.rs
  2. 11
      seaweed-volume/src/security.rs
  3. 10
      seaweed-volume/src/server/grpc_server.rs
  4. 13
      seaweed-volume/src/storage/erasure_coding/ec_encoder.rs
  5. 10
      seaweed-volume/src/storage/erasure_coding/ec_shard.rs
  6. 22
      seaweed-volume/src/storage/needle_map.rs
  7. 3
      seaweed-volume/src/storage/store.rs
  8. 37
      seaweed-volume/src/storage/volume.rs

16
seaweed-volume/src/main.rs

@ -32,10 +32,13 @@ fn main() {
.build()
.expect("Failed to build tokio runtime");
rt.block_on(run(config));
if let Err(e) = rt.block_on(run(config)) {
error!("Volume server failed: {}", e);
std::process::exit(1);
}
}
async fn run(config: VolumeServerConfig) {
async fn run(config: VolumeServerConfig) -> Result<(), Box<dyn std::error::Error>> {
// Initialize the store
let mut store = Store::new(config.index_type);
store.ip = config.ip.clone();
@ -59,13 +62,13 @@ async fn run(config: VolumeServerConfig) {
"Adding storage location: {} (max_volumes={}, disk_type={:?})",
dir, max_volumes, disk_type
);
if let Err(e) = store.add_location(dir, idx_dir, max_volumes, disk_type) {
error!("Failed to add storage location {}: {}", dir, e);
return;
}
store.add_location(dir, idx_dir, max_volumes, disk_type)
.map_err(|e| format!("Failed to add storage location {}: {}", dir, e))?;
}
// Build shared state
// TODO: Wire up JWT signing keys from config. Empty keys are acceptable for now
// while the Rust volume server is still in development.
let guard = Guard::new(
&config.white_list,
SigningKey(vec![]),
@ -185,4 +188,5 @@ async fn run(config: VolumeServerConfig) {
}
info!("Volume server stopped.");
Ok(())
}

11
seaweed-volume/src/security.rs

@ -224,13 +224,17 @@ impl Guard {
let token = token.ok_or(JwtError::MissingToken)?;
let claims = decode_jwt(key, token)?;
if let Some(ref fid) = claims.fid {
if fid != expected_fid {
match claims.fid {
None => {
return Err(JwtError::MissingFileIdClaim);
}
Some(ref fid) if fid != expected_fid => {
return Err(JwtError::FileIdMismatch {
expected: expected_fid.to_string(),
got: fid.to_string(),
});
}
_ => {}
}
Ok(())
@ -301,6 +305,9 @@ pub enum JwtError {
#[error("JWT error: {0}")]
Jwt(#[from] jsonwebtoken::errors::Error),
#[error("JWT token missing required fid claim")]
MissingFileIdClaim,
#[error("file ID mismatch: expected {expected}, got {got}")]
FileIdMismatch { expected: String, got: String },
}

10
seaweed-volume/src/server/grpc_server.rs

@ -201,8 +201,16 @@ impl VolumeServer for VolumeGrpcService {
&self,
request: Request<volume_server_pb::VolumeDeleteRequest>,
) -> Result<Response<volume_server_pb::VolumeDeleteResponse>, Status> {
let vid = VolumeId(request.into_inner().volume_id);
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let mut store = self.state.store.write().unwrap();
if req.only_empty {
let (_, vol) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
if vol.file_count() > 0 {
return Err(Status::failed_precondition("volume is not empty"));
}
}
store.delete_volume(vid)
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(volume_server_pb::VolumeDeleteResponse {}))

13
seaweed-volume/src/storage/erasure_coding/ec_encoder.rs

@ -72,11 +72,15 @@ fn write_sorted_ecx_from_idx(idx_path: &str, ecx_path: &str) -> io::Result<()> {
Ok(())
})?;
// Sort by NeedleId
entries.sort_by_key(|&(key, _, _)| key);
// Sort by NeedleId, then by actual offset so later entries come last
entries.sort_by_key(|&(key, offset, _)| (key, offset.to_actual_offset()));
// Remove duplicates (keep last entry for each key)
// Remove duplicates (keep last/latest entry for each key).
// dedup_by_key keeps the first in each run, so we reverse first,
// dedup, then reverse back.
entries.reverse();
entries.dedup_by_key(|entry| entry.0);
entries.reverse();
// Write sorted entries to .ecx
let mut ecx_file = File::create(ecx_path)?;
@ -140,8 +144,7 @@ fn encode_one_batch(
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
// Read what we can; zeros fill the rest (already initialized)
let _ = dat_file.read_at(&mut buffers[i], read_offset);
dat_file.read_at(&mut buffers[i], read_offset)?;
}
}

10
seaweed-volume/src/storage/erasure_coding/ec_shard.rs

@ -79,6 +79,16 @@ impl EcVolumeShard {
use std::os::unix::fs::FileExt;
file.read_at(buf, offset)
}
#[cfg(not(unix))]
{
use std::io::{Read, Seek, SeekFrom};
// File::read_at is unix-only; fall back to seek + read.
// We need a mutable reference for seek/read, so clone the handle.
let mut f = file.try_clone()?;
f.seek(SeekFrom::Start(offset))?;
f.read(buf)
}
}
/// Write data to the shard file (appends).

22
seaweed-volume/src/storage/needle_map.rs

@ -142,16 +142,16 @@ impl CompactNeedleMap {
/// Insert or update an entry. Appends to .idx file if present.
pub fn put(&mut self, key: NeedleId, offset: Offset, size: Size) -> io::Result<()> {
let old = self.map.get(&key).cloned();
let nv = NeedleValue { offset, size };
self.metric.on_put(key, old.as_ref(), size);
self.map.insert(key, nv);
// Append to index file
// Persist to idx file BEFORE mutating in-memory state for crash consistency
if let Some(ref mut idx_file) = self.idx_file {
idx::write_index_entry(idx_file, key, offset, size)?;
self.idx_file_offset += NEEDLE_MAP_ENTRY_SIZE as u64;
}
let old = self.map.get(&key).cloned();
let nv = NeedleValue { offset, size };
self.metric.on_put(key, old.as_ref(), size);
self.map.insert(key, nv);
Ok(())
}
@ -164,15 +164,15 @@ impl CompactNeedleMap {
pub fn delete(&mut self, key: NeedleId, offset: Offset) -> io::Result<Option<Size>> {
if let Some(old) = self.map.get(&key).cloned() {
if old.size.is_valid() {
self.metric.on_delete(&old);
let deleted_size = Size(-(old.size.0));
self.map.insert(key, NeedleValue { offset, size: deleted_size });
// Append tombstone to index file
// Persist tombstone to idx file BEFORE mutating in-memory state for crash consistency
if let Some(ref mut idx_file) = self.idx_file {
idx::write_index_entry(idx_file, key, offset, TOMBSTONE_FILE_SIZE)?;
self.idx_file_offset += NEEDLE_MAP_ENTRY_SIZE as u64;
}
self.metric.on_delete(&old);
let deleted_size = Size(-(old.size.0));
self.map.insert(key, NeedleValue { offset, size: deleted_size });
return Ok(Some(old.size));
}
}

3
seaweed-volume/src/storage/store.rs

@ -116,6 +116,9 @@ impl Store {
preallocate: u64,
disk_type: DiskType,
) -> Result<(), VolumeError> {
if self.find_volume(vid).is_some() {
return Err(VolumeError::AlreadyExists);
}
let loc_idx = self.find_free_location(&disk_type).ok_or_else(|| {
VolumeError::Io(io::Error::new(
io::ErrorKind::Other,

37
seaweed-volume/src/storage/volume.rs

@ -14,6 +14,8 @@ use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::warn;
use crate::storage::needle::needle::{self, Needle, NeedleError, get_actual_size};
use crate::storage::needle_map::{CompactNeedleMap, NeedleMapKind};
use crate::storage::super_block::{SuperBlock, ReplicaPlacement, SUPER_BLOCK_SIZE};
@ -43,6 +45,9 @@ pub enum VolumeError {
#[error("volume not empty")]
NotEmpty,
#[error("volume already exists")]
AlreadyExists,
#[error("volume is read-only")]
ReadOnly,
@ -234,6 +239,14 @@ impl Volume {
}
fn load_index(&mut self) -> Result<(), VolumeError> {
if self.needle_map_kind != NeedleMapKind::InMemory {
warn!(
volume_id = self.id.0,
kind = ?self.needle_map_kind,
"only InMemory needle map is currently supported, falling back to InMemory"
);
}
let idx_path = self.file_name(".idx");
// Ensure idx directory exists
@ -541,15 +554,15 @@ impl Volume {
}
fn do_delete_request(&mut self, n: &mut Needle) -> Result<Size, VolumeError> {
let (found, size) = if let Some(nm) = &self.nm {
let (found, size, stored_offset) = if let Some(nm) = &self.nm {
if let Some(nv) = nm.get(n.id) {
if !nv.size.is_deleted() {
(true, nv.size)
(true, nv.size, nv.offset)
} else {
(false, Size(0))
(false, Size(0), Offset::default())
}
} else {
(false, Size(0))
(false, Size(0), Offset::default())
}
} else {
return Ok(Size(0));
@ -559,6 +572,15 @@ impl Volume {
return Ok(Size(0));
}
// Cookie validation: read stored needle header and verify cookie matches
{
let mut existing = Needle::default();
self.read_needle_header(&mut existing, stored_offset.to_actual_offset())?;
if existing.cookie != n.cookie {
return Err(VolumeError::CookieMismatch(n.cookie.0));
}
}
// Write tombstone: append needle with empty data
n.data = vec![];
n.append_at_ns = get_append_at_ns(self.last_append_at_ns);
@ -784,7 +806,12 @@ pub fn scan_volume_file(
let body_length = needle::needle_body_length(size, version);
let total_size = NEEDLE_HEADER_SIZE as i64 + body_length;
if visitor.read_needle_body() {
// Skip full body parsing for deleted needles (tombstone or negative size)
if size.is_deleted() || size.0 <= 0 {
let mut n = Needle::default();
n.read_header(&header);
visitor.visit_needle(&n, offset)?;
} else if visitor.read_needle_body() {
let mut buf = vec![0u8; total_size as usize];
file.seek(SeekFrom::Start(offset as u64))?;
file.read_exact(&mut buf)?;

Loading…
Cancel
Save