Browse Source

fix SetState to persist state to disk with rollback on failure

Go's State.Update saves VolumeServerState to a state.pb file after
each SetState call, and rolls back the in-memory state if persistence
fails. Rust was only updating in-memory atomics, so maintenance mode
would be lost on server restart. Now saves protobuf-encoded state.pb
and loads it on startup.
rust-volume-server
Chris Lu 1 week ago
parent
commit
add0dbde5a
  1. 20
      seaweed-volume/src/main.rs
  2. 48
      seaweed-volume/src/server/grpc_server.rs
  3. 1
      seaweed-volume/src/server/heartbeat.rs
  4. 2
      seaweed-volume/src/server/volume_server.rs
  5. 1
      seaweed-volume/src/server/write_queue.rs

20
seaweed-volume/src/main.rs

@ -326,8 +326,28 @@ async fn run(
read_buffer_size_bytes: (config.read_buffer_size_mb.max(1) as usize) * 1024 * 1024,
security_file,
cli_white_list,
state_file_path: if config.folders.is_empty() {
String::new()
} else {
std::path::Path::new(&config.folders[0])
.join("state.pb")
.to_string_lossy()
.into_owned()
},
});
// Load persisted state from disk if it exists (matches Go's State.Load on startup)
if let Some(saved) =
seaweed_volume::server::grpc_server::load_state_file(&state.state_file_path)
{
state
.maintenance
.store(saved.maintenance, std::sync::atomic::Ordering::Relaxed);
state
.state_version
.store(saved.version, std::sync::atomic::Ordering::Relaxed);
}
if !config.masters.is_empty() {
let hb_config = seaweed_volume::server::heartbeat::HeartbeatConfig {
ip: config.ip.clone(),

48
seaweed-volume/src/server/grpc_server.rs

@ -28,6 +28,31 @@ fn volume_is_remote_only(dat_path: &str, has_remote_file: bool) -> bool {
has_remote_file && !std::path::Path::new(dat_path).exists()
}
/// Persist VolumeServerState to a state.pb file (matches Go's State.save).
fn save_state_file(
path: &str,
state: &volume_server_pb::VolumeServerState,
) -> Result<(), std::io::Error> {
if path.is_empty() {
return Ok(());
}
use prost::Message;
let buf = state.encode_to_vec();
std::fs::write(path, buf)
}
/// Load VolumeServerState from a state.pb file (matches Go's State.Load).
pub fn load_state_file(
path: &str,
) -> Option<volume_server_pb::VolumeServerState> {
if path.is_empty() || !std::path::Path::new(path).exists() {
return None;
}
let data = std::fs::read(path).ok()?;
use prost::Message;
volume_server_pb::VolumeServerState::decode(data.as_slice()).ok()
}
struct WriteThrottler {
bytes_per_second: i64,
last_size_counter: i64,
@ -876,16 +901,29 @@ impl VolumeServer for VolumeGrpcService {
)));
}
// Save previous state for rollback on persistence failure (matches Go)
let prev_maintenance = self.state.maintenance.load(Ordering::Relaxed);
let prev_version = current_version;
self.state
.maintenance
.store(new_state.maintenance, Ordering::Relaxed);
let new_version = self.state.state_version.fetch_add(1, Ordering::Relaxed) + 1;
// Persist to disk (matches Go's State.save)
let pb = volume_server_pb::VolumeServerState {
maintenance: new_state.maintenance,
version: new_version,
};
if let Err(e) = save_state_file(&self.state.state_file_path, &pb) {
// Rollback in-memory state on save failure (matches Go)
self.state.maintenance.store(prev_maintenance, Ordering::Relaxed);
self.state.state_version.store(prev_version, Ordering::Relaxed);
return Err(Status::internal(format!("failed to save state: {}", e)));
}
Ok(Response::new(volume_server_pb::SetStateResponse {
state: Some(volume_server_pb::VolumeServerState {
maintenance: new_state.maintenance,
version: new_version,
}),
state: Some(pb),
}))
} else {
// nil state = no-op, return current state
@ -4282,6 +4320,7 @@ mod tests {
read_buffer_size_bytes: 1024 * 1024,
security_file: String::new(),
cli_white_list: vec![],
state_file_path: String::new(),
});
(
@ -4383,6 +4422,7 @@ mod tests {
read_buffer_size_bytes: 1024 * 1024,
security_file: String::new(),
cli_white_list: vec![],
state_file_path: String::new(),
});
(VolumeGrpcService { state }, tmp)

1
seaweed-volume/src/server/heartbeat.rs

@ -1034,6 +1034,7 @@ mod tests {
read_buffer_size_bytes: 4 * 1024 * 1024,
security_file: String::new(),
cli_white_list: vec![],
state_file_path: String::new(),
})
}

2
seaweed-volume/src/server/volume_server.rs

@ -101,6 +101,8 @@ pub struct VolumeServerState {
pub security_file: String,
/// Original CLI whitelist entries — stored for SIGHUP reload.
pub cli_white_list: Vec<String>,
/// Path to state.pb file for persisting VolumeServerState across restarts.
pub state_file_path: String,
}
impl VolumeServerState {

1
seaweed-volume/src/server/write_queue.rs

@ -218,6 +218,7 @@ mod tests {
read_buffer_size_bytes: 4 * 1024 * 1024,
security_file: String::new(),
cli_white_list: vec![],
state_file_path: String::new(),
})
}

Loading…
Cancel
Save