From add0dbde5affb61a87dc3f584fd7c779755e295b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 18 Mar 2026 11:38:12 -0700 Subject: [PATCH] fix SetState to persist state to disk with rollback on failure Go's State.Update saves VolumeServerState to a state.pb file after each SetState call, and rolls back the in-memory state if persistence fails. Rust was only updating in-memory atomics, so maintenance mode would be lost on server restart. Now saves protobuf-encoded state.pb and loads it on startup. --- seaweed-volume/src/main.rs | 20 +++++++++ seaweed-volume/src/server/grpc_server.rs | 48 ++++++++++++++++++++-- seaweed-volume/src/server/heartbeat.rs | 1 + seaweed-volume/src/server/volume_server.rs | 2 + seaweed-volume/src/server/write_queue.rs | 1 + 5 files changed, 68 insertions(+), 4 deletions(-) diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index 9f9e3827d..ccf92fdcc 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -326,8 +326,28 @@ async fn run( read_buffer_size_bytes: (config.read_buffer_size_mb.max(1) as usize) * 1024 * 1024, security_file, cli_white_list, + state_file_path: if config.folders.is_empty() { + String::new() + } else { + std::path::Path::new(&config.folders[0]) + .join("state.pb") + .to_string_lossy() + .into_owned() + }, }); + // Load persisted state from disk if it exists (matches Go's State.Load on startup) + if let Some(saved) = + seaweed_volume::server::grpc_server::load_state_file(&state.state_file_path) + { + state + .maintenance + .store(saved.maintenance, std::sync::atomic::Ordering::Relaxed); + state + .state_version + .store(saved.version, std::sync::atomic::Ordering::Relaxed); + } + if !config.masters.is_empty() { let hb_config = seaweed_volume::server::heartbeat::HeartbeatConfig { ip: config.ip.clone(), diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 06a5a5832..29f8ba53c 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -28,6 +28,31 @@ fn volume_is_remote_only(dat_path: &str, has_remote_file: bool) -> bool { has_remote_file && !std::path::Path::new(dat_path).exists() } +/// Persist VolumeServerState to a state.pb file (matches Go's State.save). +fn save_state_file( + path: &str, + state: &volume_server_pb::VolumeServerState, +) -> Result<(), std::io::Error> { + if path.is_empty() { + return Ok(()); + } + use prost::Message; + let buf = state.encode_to_vec(); + std::fs::write(path, buf) +} + +/// Load VolumeServerState from a state.pb file (matches Go's State.Load). +pub fn load_state_file( + path: &str, +) -> Option { + if path.is_empty() || !std::path::Path::new(path).exists() { + return None; + } + let data = std::fs::read(path).ok()?; + use prost::Message; + volume_server_pb::VolumeServerState::decode(data.as_slice()).ok() +} + struct WriteThrottler { bytes_per_second: i64, last_size_counter: i64, @@ -876,16 +901,29 @@ impl VolumeServer for VolumeGrpcService { ))); } + // Save previous state for rollback on persistence failure (matches Go) + let prev_maintenance = self.state.maintenance.load(Ordering::Relaxed); + let prev_version = current_version; + self.state .maintenance .store(new_state.maintenance, Ordering::Relaxed); let new_version = self.state.state_version.fetch_add(1, Ordering::Relaxed) + 1; + // Persist to disk (matches Go's State.save) + let pb = volume_server_pb::VolumeServerState { + maintenance: new_state.maintenance, + version: new_version, + }; + if let Err(e) = save_state_file(&self.state.state_file_path, &pb) { + // Rollback in-memory state on save failure (matches Go) + self.state.maintenance.store(prev_maintenance, Ordering::Relaxed); + self.state.state_version.store(prev_version, Ordering::Relaxed); + return Err(Status::internal(format!("failed to save state: {}", e))); + } + Ok(Response::new(volume_server_pb::SetStateResponse { - state: Some(volume_server_pb::VolumeServerState { - maintenance: new_state.maintenance, - version: new_version, - }), + state: Some(pb), })) } else { // nil state = no-op, return current state @@ -4282,6 +4320,7 @@ mod tests { read_buffer_size_bytes: 1024 * 1024, security_file: String::new(), cli_white_list: vec![], + state_file_path: String::new(), }); ( @@ -4383,6 +4422,7 @@ mod tests { read_buffer_size_bytes: 1024 * 1024, security_file: String::new(), cli_white_list: vec![], + state_file_path: String::new(), }); (VolumeGrpcService { state }, tmp) diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index 502304d3e..e4cb5aafc 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -1034,6 +1034,7 @@ mod tests { read_buffer_size_bytes: 4 * 1024 * 1024, security_file: String::new(), cli_white_list: vec![], + state_file_path: String::new(), }) } diff --git a/seaweed-volume/src/server/volume_server.rs b/seaweed-volume/src/server/volume_server.rs index 72df67fcd..1201a5e83 100644 --- a/seaweed-volume/src/server/volume_server.rs +++ b/seaweed-volume/src/server/volume_server.rs @@ -101,6 +101,8 @@ pub struct VolumeServerState { pub security_file: String, /// Original CLI whitelist entries — stored for SIGHUP reload. pub cli_white_list: Vec, + /// Path to state.pb file for persisting VolumeServerState across restarts. + pub state_file_path: String, } impl VolumeServerState { diff --git a/seaweed-volume/src/server/write_queue.rs b/seaweed-volume/src/server/write_queue.rs index 3e1861bb2..112ae5684 100644 --- a/seaweed-volume/src/server/write_queue.rs +++ b/seaweed-volume/src/server/write_queue.rs @@ -218,6 +218,7 @@ mod tests { read_buffer_size_bytes: 4 * 1024 * 1024, security_file: String::new(), cli_white_list: vec![], + state_file_path: String::new(), }) }