From 6f66448261dcd4f579782f794954d461e6a377c6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Mar 2026 16:13:24 -0800 Subject: [PATCH] Address PR review: fix index loading, broadcast shutdown, Windows portability - Fix needle_map index loading condition to correctly treat zero-offset and deleted-size entries as deletions (matches Go behavior) - Update test to use realistic non-zero offsets (SuperBlock occupies offset 0) - Replace single shutdown future with broadcast channel so all servers (HTTP, gRPC, public HTTP) receive graceful shutdown signal - Add Windows platform support for volume file reads via FileExt::seek_read --- seaweed-volume/src/main.rs | 58 +++++++++++++++--------- seaweed-volume/src/storage/needle_map.rs | 9 ++-- seaweed-volume/src/storage/volume.rs | 15 ++++-- 3 files changed, 54 insertions(+), 28 deletions(-) diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index 5a4e25b0a..aa0de2c68 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -98,9 +98,12 @@ async fn run(config: VolumeServerConfig) { info!("Starting public HTTP server on {}:{}", config.bind_ip, public_port); } - // Set up graceful shutdown via SIGINT/SIGTERM + // Set up graceful shutdown via SIGINT/SIGTERM using broadcast channel + let (shutdown_tx, _) = tokio::sync::broadcast::channel::<()>(1); + let state_shutdown = state.clone(); - let shutdown = async move { + let shutdown_tx_clone = shutdown_tx.clone(); + tokio::spawn(async move { let ctrl_c = tokio::signal::ctrl_c(); #[cfg(unix)] { @@ -118,7 +121,8 @@ async fn run(config: VolumeServerConfig) { info!("Received shutdown signal..."); } *state_shutdown.is_stopping.write().unwrap() = true; - }; + let _ = shutdown_tx_clone.send(()); + }); // Spawn all servers concurrently let admin_listener = tokio::net::TcpListener::bind(&admin_addr) @@ -126,24 +130,32 @@ async fn run(config: VolumeServerConfig) { .unwrap_or_else(|e| panic!("Failed to bind HTTP to {}: {}", admin_addr, e)); info!("HTTP server listening on {}", admin_addr); - let http_handle = tokio::spawn(async move { - if let Err(e) = axum::serve(admin_listener, admin_router) - .with_graceful_shutdown(shutdown) - .await - { - error!("HTTP server error: {}", e); - } - }); + let http_handle = { + let mut shutdown_rx = shutdown_tx.subscribe(); + tokio::spawn(async move { + if let Err(e) = axum::serve(admin_listener, admin_router) + .with_graceful_shutdown(async move { let _ = shutdown_rx.recv().await; }) + .await + { + error!("HTTP server error: {}", e); + } + }) + }; - let grpc_handle = tokio::spawn(async move { - let addr = grpc_addr.parse().expect("Invalid gRPC address"); - info!("gRPC server listening on {}", addr); - tonic::transport::Server::builder() - .add_service(VolumeServerServer::new(grpc_service)) - .serve(addr) - .await - .unwrap_or_else(|e| error!("gRPC server error: {}", e)); - }); + let grpc_handle = { + let mut shutdown_rx = shutdown_tx.subscribe(); + tokio::spawn(async move { + let addr = grpc_addr.parse().expect("Invalid gRPC address"); + info!("gRPC server listening on {}", addr); + if let Err(e) = tonic::transport::Server::builder() + .add_service(VolumeServerServer::new(grpc_service)) + .serve_with_shutdown(addr, async move { let _ = shutdown_rx.recv().await; }) + .await + { + error!("gRPC server error: {}", e); + } + }) + }; let public_handle = if needs_public { let public_router = seaweed_volume::server::volume_server::build_public_router(state.clone()); @@ -152,8 +164,12 @@ async fn run(config: VolumeServerConfig) { .await .unwrap_or_else(|e| panic!("Failed to bind public HTTP to {}: {}", public_addr, e)); info!("Public HTTP server listening on {}", public_addr); + let mut shutdown_rx = shutdown_tx.subscribe(); Some(tokio::spawn(async move { - if let Err(e) = axum::serve(listener, public_router).await { + if let Err(e) = axum::serve(listener, public_router) + .with_graceful_shutdown(async move { let _ = shutdown_rx.recv().await; }) + .await + { error!("Public HTTP server error: {}", e); } })) diff --git a/seaweed-volume/src/storage/needle_map.rs b/seaweed-volume/src/storage/needle_map.rs index 09dc9d07d..6a8595507 100644 --- a/seaweed-volume/src/storage/needle_map.rs +++ b/seaweed-volume/src/storage/needle_map.rs @@ -122,10 +122,10 @@ impl CompactNeedleMap { pub fn load_from_idx(reader: &mut R) -> io::Result { let mut nm = CompactNeedleMap::new(); idx::walk_index_file(reader, 0, |key, offset, size| { - if !offset.is_zero() || !size.is_deleted() { - nm.set(key, NeedleValue { offset, size }); - } else { + if offset.is_zero() || size.is_deleted() { nm.delete_from_map(key); + } else { + nm.set(key, NeedleValue { offset, size }); } Ok(()) })?; @@ -300,8 +300,9 @@ mod tests { #[test] fn test_needle_map_load_from_idx() { // Build an idx file in memory + // Note: offset 0 is reserved for the SuperBlock, so real needles start at offset >= 8 let mut idx_data = Vec::new(); - idx::write_index_entry(&mut idx_data, NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap(); + idx::write_index_entry(&mut idx_data, NeedleId(1), Offset::from_actual_offset(8), Size(100)).unwrap(); idx::write_index_entry(&mut idx_data, NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap(); idx::write_index_entry(&mut idx_data, NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap(); // Delete needle 2 diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 2e7a4ef12..d3334307f 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -374,10 +374,14 @@ impl Volume { use std::os::unix::fs::FileExt; dat_file.read_exact_at(&mut buf, offset as u64)?; } - #[cfg(not(unix))] + #[cfg(windows)] { - // Fallback for non-unix (requires &mut) - compile_error!("Windows support requires different file I/O approach"); + use std::os::windows::fs::FileExt; + dat_file.seek_read(&mut buf, offset as u64)?; + } + #[cfg(not(any(unix, windows)))] + { + compile_error!("Platform not supported: only unix and windows are supported"); } n.read_bytes(&mut buf, offset, size, version)?; @@ -399,6 +403,11 @@ impl Volume { use std::os::unix::fs::FileExt; dat_file.read_exact_at(&mut buf, offset as u64)?; } + #[cfg(windows)] + { + use std::os::windows::fs::FileExt; + dat_file.seek_read(&mut buf, offset as u64)?; + } Ok(buf) }