Browse Source

fix: round 4 Go parity — VolumeConfigure unmount, 304 headers, EC encoding, compaction TTL

- VolumeConfigure: don't fail on unmount of non-existent volume (Go returns nil)
- 304 responses include ETag/Last-Modified headers per HTTP spec
- Conditional header checks run before chunk manifest expansion
- EC encoder uses two-phase approach: 1GB large blocks then 1MB small blocks
- Compaction uses volume-level TTL (not per-needle TTL) for filtering
- VolumeConfigure does unmount/modify-disk/remount cycle matching Go
- VolumeMarkReadonly persists flag to .vif when persist=true
- AllocateVolume accepts version parameter
- Multipart boundary uses leading CRLF per RFC 2046
- MIME type override skipped for chunk manifests
rust-volume-server
Chris Lu 1 day ago
parent
commit
fc2ab297ba
  1. 49
      seaweed-volume/src/server/grpc_server.rs
  2. 51
      seaweed-volume/src/server/handlers.rs
  3. 4
      seaweed-volume/src/server/heartbeat.rs
  4. 19
      seaweed-volume/src/storage/disk_location.rs
  5. 37
      seaweed-volume/src/storage/erasure_coding/ec_encoder.rs
  6. 82
      seaweed-volume/src/storage/store.rs
  7. 23
      seaweed-volume/src/storage/volume.rs
  8. 4
      seaweed-volume/tests/http_integration.rs

49
seaweed-volume/src/server/grpc_server.rs

@ -353,6 +353,12 @@ impl VolumeServer for VolumeGrpcService {
};
let disk_type = DiskType::from_string(&req.disk_type);
let version = if req.version > 0 {
crate::storage::types::Version(req.version as u8)
} else {
crate::storage::types::Version::current()
};
let mut store = self.state.store.write().unwrap();
store
.add_volume(
@ -362,6 +368,7 @@ impl VolumeServer for VolumeGrpcService {
ttl,
req.preallocate as u64,
disk_type,
version,
)
.map_err(|e| Status::internal(e.to_string()))?;
@ -541,7 +548,7 @@ impl VolumeServer for VolumeGrpcService {
let (_, vol) = store
.find_volume_mut(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
vol.set_read_only();
vol.set_read_only_persist(req.persist);
drop(store);
self.state.volume_state_notify.notify_one();
Ok(Response::new(
@ -582,29 +589,39 @@ impl VolumeServer for VolumeGrpcService {
Ok(rp) => rp,
Err(e) => {
return Ok(Response::new(volume_server_pb::VolumeConfigureResponse {
error: format!("invalid replica placement: {}", e),
error: format!("volume configure replication {}: {}", req.replication, e),
}));
}
};
let mut store = self.state.store.write().unwrap();
let (_, vol) = match store.find_volume_mut(vid) {
Some(v) => v,
None => {
return Ok(Response::new(volume_server_pb::VolumeConfigureResponse {
error: format!("volume {} not found on disk, failed to restore mount", vid),
}));
// Unmount the volume (Go returns nil for non-existent volumes, so we don't
// treat a missing volume as an error here — configure_volume will catch it)
store.unmount_volume(vid);
// Modify the super block on disk (replica_placement byte)
if let Err(e) = store.configure_volume(vid, rp) {
let mut error = format!("volume configure {}: {}", vid, e);
// Error recovery: try to re-mount anyway
if let Err(mount_err) = store.mount_volume(vid, "", DiskType::HardDrive) {
error += &format!(". Also failed to restore mount: {}", mount_err);
}
};
return Ok(Response::new(volume_server_pb::VolumeConfigureResponse {
error,
}));
}
match vol.set_replica_placement(rp) {
Ok(()) => Ok(Response::new(volume_server_pb::VolumeConfigureResponse {
error: String::new(),
})),
Err(e) => Ok(Response::new(volume_server_pb::VolumeConfigureResponse {
error: e.to_string(),
})),
// Re-mount the volume
if let Err(e) = store.mount_volume(vid, "", DiskType::HardDrive) {
return Ok(Response::new(volume_server_pb::VolumeConfigureResponse {
error: format!("volume configure mount {}: {}", vid, e),
}));
}
Ok(Response::new(volume_server_pb::VolumeConfigureResponse {
error: String::new(),
}))
}
async fn volume_status(

51
seaweed-volume/src/server/handlers.rs

@ -795,15 +795,8 @@ async fn get_or_head_handler_inner(
n = n_full;
}
// Chunk manifest expansion (needs full data)
if n.is_chunk_manifest() && !bypass_cm {
if let Some(resp) = try_expand_chunk_manifest(&state, &n, &headers, &method) {
return resp;
}
// If manifest expansion fails (invalid JSON etc.), fall through to raw data
}
// Build ETag
// Build ETag and Last-Modified BEFORE conditional checks and chunk manifest expansion
// (matches Go order: conditional checks first, then chunk manifest)
let etag = format!("\"{}\"", n.etag());
// Build Last-Modified header (RFC 1123 format) — must be done before conditional checks
@ -827,7 +820,13 @@ async fn get_or_head_handler_inner(
chrono::NaiveDateTime::parse_from_str(ims_str, "%a, %d %b %Y %H:%M:%S GMT")
{
if (n.last_modified as i64) <= ims_time.and_utc().timestamp() {
return StatusCode::NOT_MODIFIED.into_response();
let mut not_modified_headers = HeaderMap::new();
not_modified_headers.insert(header::ETAG, etag.parse().unwrap());
if let Some(ref lm) = last_modified_str {
not_modified_headers
.insert(header::LAST_MODIFIED, lm.parse().unwrap());
}
return (StatusCode::NOT_MODIFIED, not_modified_headers).into_response();
}
}
}
@ -838,11 +837,24 @@ async fn get_or_head_handler_inner(
if let Some(if_none_match) = headers.get(header::IF_NONE_MATCH) {
if let Ok(inm) = if_none_match.to_str() {
if inm == etag {
return StatusCode::NOT_MODIFIED.into_response();
let mut not_modified_headers = HeaderMap::new();
not_modified_headers.insert(header::ETAG, etag.parse().unwrap());
if let Some(ref lm) = last_modified_str {
not_modified_headers.insert(header::LAST_MODIFIED, lm.parse().unwrap());
}
return (StatusCode::NOT_MODIFIED, not_modified_headers).into_response();
}
}
}
// Chunk manifest expansion (needs full data) — after conditional checks, before response
if n.is_chunk_manifest() && !bypass_cm {
if let Some(resp) = try_expand_chunk_manifest(&state, &n, &headers, &method) {
return resp;
}
// If manifest expansion fails (invalid JSON etc.), fall through to raw data
}
let mut response_headers = HeaderMap::new();
response_headers.insert(header::ETAG, etag.parse().unwrap());
@ -875,8 +887,16 @@ async fn get_or_head_handler_inner(
}
// H6: Determine Content-Type: filter application/octet-stream, use mime_guess
// For chunk manifests, skip extension-based MIME override — use stored MIME as-is (Go parity)
let content_type = if let Some(ref ct) = query.response_content_type {
Some(ct.clone())
} else if n.is_chunk_manifest() {
// Chunk manifests: use the stored MIME directly without filtering or extension detection
if !n.mime.is_empty() {
Some(String::from_utf8_lossy(&n.mime).to_string())
} else {
None
}
} else {
// Get MIME from needle, but filter out application/octet-stream
let needle_mime = if !n.mime.is_empty() {
@ -1172,8 +1192,13 @@ fn handle_range_request(range_str: &str, data: &[u8], mut headers: HeaderMap) ->
.to_string();
let mut body = Vec::new();
for &(start, end) in &ranges {
body.extend_from_slice(format!("\r\n--{}\r\n", boundary).as_bytes());
for (i, &(start, end)) in ranges.iter().enumerate() {
// First boundary has no leading CRLF per RFC 2046
if i == 0 {
body.extend_from_slice(format!("--{}\r\n", boundary).as_bytes());
} else {
body.extend_from_slice(format!("\r\n--{}\r\n", boundary).as_bytes());
}
body.extend_from_slice(format!("Content-Type: {}\r\n", content_type).as_bytes());
body.extend_from_slice(
format!("Content-Range: bytes {}-{}/{}\r\n\r\n", start, end, total).as_bytes(),

4
seaweed-volume/src/server/heartbeat.rs

@ -484,7 +484,7 @@ mod tests {
use crate::remote_storage::s3_tier::S3TierRegistry;
use crate::security::{Guard, SigningKey};
use crate::storage::needle_map::NeedleMapKind;
use crate::storage::types::{DiskType, VolumeId};
use crate::storage::types::{DiskType, Version, VolumeId};
use std::sync::RwLock;
fn test_config() -> HeartbeatConfig {
@ -553,7 +553,7 @@ mod tests {
)
.unwrap();
store
.add_volume(VolumeId(7), "pics", None, None, 0, DiskType::HardDrive)
.add_volume(VolumeId(7), "pics", None, None, 0, DiskType::HardDrive, Version::current())
.unwrap();
let heartbeat = build_heartbeat(&test_config(), &store);

19
seaweed-volume/src/storage/disk_location.rs

@ -152,6 +152,7 @@ impl DiskLocation {
replica_placement: Option<ReplicaPlacement>,
ttl: Option<crate::storage::needle::ttl::TTL>,
preallocate: u64,
version: Version,
) -> Result<(), VolumeError> {
let v = Volume::new(
&self.directory,
@ -162,7 +163,7 @@ impl DiskLocation {
replica_placement,
ttl,
preallocate,
Version::current(),
version,
)?;
self.volumes.insert(vid, v);
Ok(())
@ -348,7 +349,7 @@ mod tests {
)
.unwrap();
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0)
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
assert_eq!(loc.volumes_len(), 1);
@ -373,9 +374,9 @@ mod tests {
Vec::new(),
)
.unwrap();
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0)
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
loc.create_volume(VolumeId(2), "test", NeedleMapKind::InMemory, None, None, 0)
loc.create_volume(VolumeId(2), "test", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
loc.close();
}
@ -412,9 +413,9 @@ mod tests {
)
.unwrap();
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0)
loc.create_volume(VolumeId(1), "", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
loc.create_volume(VolumeId(2), "", NeedleMapKind::InMemory, None, None, 0)
loc.create_volume(VolumeId(2), "", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
assert_eq!(loc.volumes_len(), 2);
@ -437,11 +438,11 @@ mod tests {
)
.unwrap();
loc.create_volume(VolumeId(1), "pics", NeedleMapKind::InMemory, None, None, 0)
loc.create_volume(VolumeId(1), "pics", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
loc.create_volume(VolumeId(2), "pics", NeedleMapKind::InMemory, None, None, 0)
loc.create_volume(VolumeId(2), "pics", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
loc.create_volume(VolumeId(3), "docs", NeedleMapKind::InMemory, None, None, 0)
loc.create_volume(VolumeId(3), "docs", NeedleMapKind::InMemory, None, None, 0, Version::current())
.unwrap();
assert_eq!(loc.volumes_len(), 3);

37
seaweed-volume/src/storage/erasure_coding/ec_encoder.rs

@ -313,6 +313,10 @@ fn write_sorted_ecx_from_idx(idx_path: &str, ecx_path: &str) -> io::Result<()> {
}
/// Encode the .dat file data into shard files.
///
/// Uses a two-phase approach matching Go's ec_encoder.go:
/// 1. Process as many large blocks (1GB) as possible
/// 2. Process remaining data with small blocks (1MB)
fn encode_dat_file(
dat_file: &File,
dat_size: i64,
@ -321,19 +325,37 @@ fn encode_dat_file(
data_shards: usize,
parity_shards: usize,
) -> io::Result<()> {
let block_size = ERASURE_CODING_SMALL_BLOCK_SIZE;
let row_size = block_size * data_shards;
let mut remaining = dat_size;
let mut offset: u64 = 0;
// Process all data in small blocks to avoid large memory allocations
// Phase 1: Process large blocks (1GB each) while enough data remains
let large_block_size = ERASURE_CODING_LARGE_BLOCK_SIZE;
let large_row_size = large_block_size * data_shards;
while remaining >= large_row_size as i64 {
encode_one_batch(
dat_file,
offset,
large_block_size,
rs,
shards,
data_shards,
parity_shards,
)?;
offset += large_row_size as u64;
remaining -= large_row_size as i64;
}
// Phase 2: Process remaining data with small blocks (1MB each)
let small_block_size = ERASURE_CODING_SMALL_BLOCK_SIZE;
let small_row_size = small_block_size * data_shards;
while remaining > 0 {
let to_process = remaining.min(row_size as i64);
let to_process = remaining.min(small_row_size as i64);
encode_one_batch(
dat_file,
offset,
block_size,
small_block_size,
rs,
shards,
data_shards,
@ -365,7 +387,8 @@ fn encode_one_batch(
"block_size * shard count overflows usize",
)
})?;
const MAX_BATCH_ALLOC: usize = 1024 * 1024 * 1024; // 1 GiB safety limit
// Large-block encoding uses 1 GiB * 14 shards = 14 GiB; allow up to 16 GiB.
const MAX_BATCH_ALLOC: usize = 16 * 1024 * 1024 * 1024; // 16 GiB safety limit
if total_alloc > MAX_BATCH_ALLOC {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,

82
seaweed-volume/src/storage/store.rs

@ -149,6 +149,7 @@ impl Store {
ttl: Option<crate::storage::needle::ttl::TTL>,
preallocate: u64,
disk_type: DiskType,
version: Version,
) -> Result<(), VolumeError> {
if self.find_volume(vid).is_some() {
return Err(VolumeError::AlreadyExists);
@ -167,6 +168,7 @@ impl Store {
replica_placement,
ttl,
preallocate,
version,
)
}
@ -208,7 +210,71 @@ impl Store {
let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid);
let dat_path = format!("{}.dat", base);
if std::path::Path::new(&dat_path).exists() {
return loc.create_volume(vid, collection, self.needle_map_kind, None, None, 0);
return loc.create_volume(vid, collection, self.needle_map_kind, None, None, 0, Version::current());
}
}
Err(VolumeError::Io(io::Error::new(
io::ErrorKind::NotFound,
format!("volume {} not found on disk", vid),
)))
}
/// Configure a volume's replica placement on disk.
/// The volume must already be unmounted. This opens the .dat file directly,
/// modifies the replica_placement byte (offset 1), and writes it back.
pub fn configure_volume(
&self,
vid: VolumeId,
rp: ReplicaPlacement,
) -> Result<(), VolumeError> {
use std::io::{Read, Seek, SeekFrom, Write};
// Find the .dat file across all locations
for loc in &self.locations {
// Try both empty and all known collection prefixes
// We scan the directory for matching .dat files
let dir = &loc.directory;
let patterns = [
format!("{}/{}.dat", dir, vid.0),
];
for dat_path in &patterns {
if std::path::Path::new(dat_path).exists() {
let mut file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(dat_path)
.map_err(VolumeError::Io)?;
// Read the super block header (at least 8 bytes)
let mut header = [0u8; 8];
file.read_exact(&mut header).map_err(VolumeError::Io)?;
// Byte 1 is the replica_placement
header[1] = rp.to_byte();
file.seek(SeekFrom::Start(0)).map_err(VolumeError::Io)?;
file.write_all(&header).map_err(VolumeError::Io)?;
file.sync_all().map_err(VolumeError::Io)?;
return Ok(());
}
}
// Also check collection-prefixed files
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
let name = entry.file_name();
let name = name.to_string_lossy();
if name.ends_with(&format!("_{}.dat", vid.0)) {
let dat_path = entry.path();
let mut file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&dat_path)
.map_err(VolumeError::Io)?;
let mut header = [0u8; 8];
file.read_exact(&mut header).map_err(VolumeError::Io)?;
header[1] = rp.to_byte();
file.seek(SeekFrom::Start(0)).map_err(VolumeError::Io)?;
file.write_all(&header).map_err(VolumeError::Io)?;
file.sync_all().map_err(VolumeError::Io)?;
return Ok(());
}
}
}
}
Err(VolumeError::Io(io::Error::new(
@ -582,7 +648,7 @@ mod tests {
let mut store = make_test_store(&[dir]);
store
.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive)
.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive, Version::current())
.unwrap();
assert!(store.has_volume(VolumeId(1)));
assert!(!store.has_volume(VolumeId(2)));
@ -595,7 +661,7 @@ mod tests {
let dir = tmp.path().to_str().unwrap();
let mut store = make_test_store(&[dir]);
store
.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive)
.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive, Version::current())
.unwrap();
// Write
@ -661,10 +727,10 @@ mod tests {
// Add volumes — should go to location with fewest volumes
store
.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive)
.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive, Version::current())
.unwrap();
store
.add_volume(VolumeId(2), "", None, None, 0, DiskType::HardDrive)
.add_volume(VolumeId(2), "", None, None, 0, DiskType::HardDrive, Version::current())
.unwrap();
assert_eq!(store.total_volume_count(), 2);
@ -680,13 +746,13 @@ mod tests {
let mut store = make_test_store(&[dir]);
store
.add_volume(VolumeId(1), "pics", None, None, 0, DiskType::HardDrive)
.add_volume(VolumeId(1), "pics", None, None, 0, DiskType::HardDrive, Version::current())
.unwrap();
store
.add_volume(VolumeId(2), "pics", None, None, 0, DiskType::HardDrive)
.add_volume(VolumeId(2), "pics", None, None, 0, DiskType::HardDrive, Version::current())
.unwrap();
store
.add_volume(VolumeId(3), "docs", None, None, 0, DiskType::HardDrive)
.add_volume(VolumeId(3), "docs", None, None, 0, DiskType::HardDrive, Version::current())
.unwrap();
assert_eq!(store.total_volume_count(), 3);

23
seaweed-volume/src/storage/volume.rs

@ -1305,11 +1305,20 @@ impl Volume {
}
/// Mark this volume as read-only (no writes or deletes).
/// If `persist` is true, the readonly state is saved to the .vif file.
pub fn set_read_only(&mut self) {
self.no_write_or_delete = true;
self.save_vif();
}
/// Mark this volume as read-only, optionally persisting to .vif.
pub fn set_read_only_persist(&mut self, persist: bool) {
self.no_write_or_delete = true;
if persist {
self.save_vif();
}
}
/// Mark this volume as writable (allow writes and deletes).
pub fn set_writable(&mut self) {
self.no_write_or_delete = false;
@ -1756,15 +1765,13 @@ impl Volume {
};
self.read_needle_data_at(&mut n, offset.to_actual_offset(), size)?;
// Skip TTL-expired needles
// Skip TTL-expired needles using the volume's TTL (matches Go's volume_vacuum.go)
if n.has_ttl() {
if let Some(ref ttl) = n.ttl {
let ttl_minutes = ttl.minutes();
if ttl_minutes > 0 && n.last_modified > 0 {
let expire_at = n.last_modified + (ttl_minutes as u64) * 60;
if now >= expire_at {
continue;
}
let ttl_minutes = self.super_block.ttl.minutes();
if ttl_minutes > 0 && n.last_modified > 0 {
let expire_at = n.last_modified + (ttl_minutes as u64) * 60;
if now >= expire_at {
continue;
}
}
}

4
seaweed-volume/tests/http_integration.rs

@ -16,7 +16,7 @@ use seaweed_volume::server::volume_server::{
};
use seaweed_volume::storage::needle_map::NeedleMapKind;
use seaweed_volume::storage::store::Store;
use seaweed_volume::storage::types::{DiskType, VolumeId};
use seaweed_volume::storage::types::{DiskType, Version, VolumeId};
use tempfile::TempDir;
@ -42,7 +42,7 @@ fn test_state_with_signing_key(signing_key: Vec<u8>) -> (Arc<VolumeServerState>,
)
.expect("failed to add location");
store
.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive)
.add_volume(VolumeId(1), "", None, None, 0, DiskType::HardDrive, Version::current())
.expect("failed to create volume");
let guard = Guard::new(&[], SigningKey(signing_key), 0, SigningKey(vec![]), 0);

Loading…
Cancel
Save