From d736aac06a8e26caf9e348ab8d4a4163d06c8f61 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Mar 2026 13:50:34 -0700 Subject: [PATCH] Honor maintenanceMBps during volume copy --- seaweed-volume/src/main.rs | 1 + seaweed-volume/src/server/grpc_server.rs | 53 ++++++++++++++++++++++ seaweed-volume/src/server/heartbeat.rs | 1 + seaweed-volume/src/server/volume_server.rs | 2 + seaweed-volume/src/server/write_queue.rs | 1 + seaweed-volume/tests/http_integration.rs | 1 + 6 files changed, 59 insertions(+) diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index db3c62c0a..1e4d29043 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -310,6 +310,7 @@ async fn run(config: VolumeServerConfig) -> Result<(), Box = Pin> + Send + 'static>>; +struct WriteThrottler { + bytes_per_second: i64, + last_size_counter: i64, + last_size_check_time: std::time::Instant, +} + +impl WriteThrottler { + fn new(bytes_per_second: i64) -> Self { + Self { + bytes_per_second, + last_size_counter: 0, + last_size_check_time: std::time::Instant::now(), + } + } + + async fn maybe_slowdown(&mut self, delta: i64) { + if self.bytes_per_second <= 0 { + return; + } + + self.last_size_counter += delta; + let elapsed = self.last_size_check_time.elapsed(); + if elapsed <= std::time::Duration::from_millis(100) { + return; + } + + let over_limit_bytes = self.last_size_counter - self.bytes_per_second / 10; + if over_limit_bytes > 0 { + let over_ratio = over_limit_bytes as f64 / self.bytes_per_second as f64; + let sleep_time = std::time::Duration::from_millis((over_ratio * 1000.0) as u64); + if !sleep_time.is_zero() { + tokio::time::sleep(sleep_time).await; + } + } + + self.last_size_counter = 0; + self.last_size_check_time = std::time::Instant::now(); + } +} + struct MasterVolumeInfo { volume_id: VolumeId, collection: String, @@ -923,6 +963,12 @@ impl VolumeServer for VolumeGrpcService { let result = async { let report_interval: i64 = 128 * 1024 * 1024; let mut next_report_target: i64 = report_interval; + let io_byte_per_second = if req.io_byte_per_second > 0 { + req.io_byte_per_second + } else { + state.maintenance_byte_per_second + }; + let mut throttler = WriteThrottler::new(io_byte_per_second); // Copy .dat file if !has_remote_dat { @@ -941,6 +987,7 @@ impl VolumeServer for VolumeGrpcService { Some(&tx), &mut next_report_target, report_interval, + &mut throttler, ) .await .map_err(|e| Status::internal(e))?; @@ -965,6 +1012,7 @@ impl VolumeServer for VolumeGrpcService { None, &mut next_report_target, report_interval, + &mut throttler, ) .await .map_err(|e| Status::internal(e))?; @@ -988,6 +1036,7 @@ impl VolumeServer for VolumeGrpcService { None, &mut next_report_target, report_interval, + &mut throttler, ) .await .map_err(|e| Status::internal(e))?; @@ -3382,6 +3431,7 @@ async fn copy_file_from_source( >, next_report_target: &mut i64, report_interval: i64, + throttler: &mut WriteThrottler, ) -> Result { let copy_req = volume_server_pb::CopyFileRequest { volume_id, @@ -3427,6 +3477,9 @@ async fn copy_file_from_source( file.write_all(&resp.file_content) .map_err(|e| format!("write file {}: {}", dest_path, e))?; progressed_bytes += resp.file_content.len() as i64; + throttler + .maybe_slowdown(resp.file_content.len() as i64) + .await; if let Some(tx) = progress_tx { if progressed_bytes > *next_report_target { diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index dfe1477c1..d84412ae3 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -658,6 +658,7 @@ mod tests { data_center: String::new(), rack: String::new(), file_size_limit_bytes: 0, + maintenance_byte_per_second: 0, is_heartbeating: std::sync::atomic::AtomicBool::new(false), has_master: true, pre_stop_seconds: 0, diff --git a/seaweed-volume/src/server/volume_server.rs b/seaweed-volume/src/server/volume_server.rs index 64a481ddd..68da3dac9 100644 --- a/seaweed-volume/src/server/volume_server.rs +++ b/seaweed-volume/src/server/volume_server.rs @@ -60,6 +60,8 @@ pub struct VolumeServerState { pub rack: String, /// File size limit in bytes (0 = no limit). pub file_size_limit_bytes: i64, + /// Default IO rate limit for maintenance copy/replication work. + pub maintenance_byte_per_second: i64, /// Whether the server is connected to master (heartbeat active). pub is_heartbeating: AtomicBool, /// Whether master addresses are configured. diff --git a/seaweed-volume/src/server/write_queue.rs b/seaweed-volume/src/server/write_queue.rs index 3a2aadf58..54c5e89fb 100644 --- a/seaweed-volume/src/server/write_queue.rs +++ b/seaweed-volume/src/server/write_queue.rs @@ -186,6 +186,7 @@ mod tests { data_center: String::new(), rack: String::new(), file_size_limit_bytes: 0, + maintenance_byte_per_second: 0, is_heartbeating: AtomicBool::new(false), has_master: false, pre_stop_seconds: 0, diff --git a/seaweed-volume/tests/http_integration.rs b/seaweed-volume/tests/http_integration.rs index 82fd8aa6e..5133d00a2 100644 --- a/seaweed-volume/tests/http_integration.rs +++ b/seaweed-volume/tests/http_integration.rs @@ -71,6 +71,7 @@ fn test_state_with_signing_key(signing_key: Vec) -> (Arc, data_center: String::new(), rack: String::new(), file_size_limit_bytes: 0, + maintenance_byte_per_second: 0, is_heartbeating: std::sync::atomic::AtomicBool::new(true), has_master: false, pre_stop_seconds: 0,