Browse Source

Address PR review: fix index loading, broadcast shutdown, Windows portability

- Fix needle_map index loading condition to correctly treat zero-offset
  and deleted-size entries as deletions (matches Go behavior)
- Update test to use realistic non-zero offsets (SuperBlock occupies offset 0)
- Replace single shutdown future with broadcast channel so all servers
  (HTTP, gRPC, public HTTP) receive graceful shutdown signal
- Add Windows platform support for volume file reads via FileExt::seek_read
rust-volume-server
Chris Lu 4 days ago
parent
commit
6f66448261
  1. 58
      seaweed-volume/src/main.rs
  2. 9
      seaweed-volume/src/storage/needle_map.rs
  3. 15
      seaweed-volume/src/storage/volume.rs

58
seaweed-volume/src/main.rs

@ -98,9 +98,12 @@ async fn run(config: VolumeServerConfig) {
info!("Starting public HTTP server on {}:{}", config.bind_ip, public_port);
}
// Set up graceful shutdown via SIGINT/SIGTERM
// Set up graceful shutdown via SIGINT/SIGTERM using broadcast channel
let (shutdown_tx, _) = tokio::sync::broadcast::channel::<()>(1);
let state_shutdown = state.clone();
let shutdown = async move {
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
let ctrl_c = tokio::signal::ctrl_c();
#[cfg(unix)]
{
@ -118,7 +121,8 @@ async fn run(config: VolumeServerConfig) {
info!("Received shutdown signal...");
}
*state_shutdown.is_stopping.write().unwrap() = true;
};
let _ = shutdown_tx_clone.send(());
});
// Spawn all servers concurrently
let admin_listener = tokio::net::TcpListener::bind(&admin_addr)
@ -126,24 +130,32 @@ async fn run(config: VolumeServerConfig) {
.unwrap_or_else(|e| panic!("Failed to bind HTTP to {}: {}", admin_addr, e));
info!("HTTP server listening on {}", admin_addr);
let http_handle = tokio::spawn(async move {
if let Err(e) = axum::serve(admin_listener, admin_router)
.with_graceful_shutdown(shutdown)
.await
{
error!("HTTP server error: {}", e);
}
});
let http_handle = {
let mut shutdown_rx = shutdown_tx.subscribe();
tokio::spawn(async move {
if let Err(e) = axum::serve(admin_listener, admin_router)
.with_graceful_shutdown(async move { let _ = shutdown_rx.recv().await; })
.await
{
error!("HTTP server error: {}", e);
}
})
};
let grpc_handle = tokio::spawn(async move {
let addr = grpc_addr.parse().expect("Invalid gRPC address");
info!("gRPC server listening on {}", addr);
tonic::transport::Server::builder()
.add_service(VolumeServerServer::new(grpc_service))
.serve(addr)
.await
.unwrap_or_else(|e| error!("gRPC server error: {}", e));
});
let grpc_handle = {
let mut shutdown_rx = shutdown_tx.subscribe();
tokio::spawn(async move {
let addr = grpc_addr.parse().expect("Invalid gRPC address");
info!("gRPC server listening on {}", addr);
if let Err(e) = tonic::transport::Server::builder()
.add_service(VolumeServerServer::new(grpc_service))
.serve_with_shutdown(addr, async move { let _ = shutdown_rx.recv().await; })
.await
{
error!("gRPC server error: {}", e);
}
})
};
let public_handle = if needs_public {
let public_router = seaweed_volume::server::volume_server::build_public_router(state.clone());
@ -152,8 +164,12 @@ async fn run(config: VolumeServerConfig) {
.await
.unwrap_or_else(|e| panic!("Failed to bind public HTTP to {}: {}", public_addr, e));
info!("Public HTTP server listening on {}", public_addr);
let mut shutdown_rx = shutdown_tx.subscribe();
Some(tokio::spawn(async move {
if let Err(e) = axum::serve(listener, public_router).await {
if let Err(e) = axum::serve(listener, public_router)
.with_graceful_shutdown(async move { let _ = shutdown_rx.recv().await; })
.await
{
error!("Public HTTP server error: {}", e);
}
}))

9
seaweed-volume/src/storage/needle_map.rs

@ -122,10 +122,10 @@ impl CompactNeedleMap {
pub fn load_from_idx<R: Read + Seek>(reader: &mut R) -> io::Result<Self> {
let mut nm = CompactNeedleMap::new();
idx::walk_index_file(reader, 0, |key, offset, size| {
if !offset.is_zero() || !size.is_deleted() {
nm.set(key, NeedleValue { offset, size });
} else {
if offset.is_zero() || size.is_deleted() {
nm.delete_from_map(key);
} else {
nm.set(key, NeedleValue { offset, size });
}
Ok(())
})?;
@ -300,8 +300,9 @@ mod tests {
#[test]
fn test_needle_map_load_from_idx() {
// Build an idx file in memory
// Note: offset 0 is reserved for the SuperBlock, so real needles start at offset >= 8
let mut idx_data = Vec::new();
idx::write_index_entry(&mut idx_data, NeedleId(1), Offset::from_actual_offset(0), Size(100)).unwrap();
idx::write_index_entry(&mut idx_data, NeedleId(1), Offset::from_actual_offset(8), Size(100)).unwrap();
idx::write_index_entry(&mut idx_data, NeedleId(2), Offset::from_actual_offset(128), Size(200)).unwrap();
idx::write_index_entry(&mut idx_data, NeedleId(3), Offset::from_actual_offset(384), Size(300)).unwrap();
// Delete needle 2

15
seaweed-volume/src/storage/volume.rs

@ -374,10 +374,14 @@ impl Volume {
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut buf, offset as u64)?;
}
#[cfg(not(unix))]
#[cfg(windows)]
{
// Fallback for non-unix (requires &mut)
compile_error!("Windows support requires different file I/O approach");
use std::os::windows::fs::FileExt;
dat_file.seek_read(&mut buf, offset as u64)?;
}
#[cfg(not(any(unix, windows)))]
{
compile_error!("Platform not supported: only unix and windows are supported");
}
n.read_bytes(&mut buf, offset, size, version)?;
@ -399,6 +403,11 @@ impl Volume {
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut buf, offset as u64)?;
}
#[cfg(windows)]
{
use std::os::windows::fs::FileExt;
dat_file.seek_read(&mut buf, offset as u64)?;
}
Ok(buf)
}

Loading…
Cancel
Save