diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index e52c9b235..281e21906 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -261,6 +261,24 @@ fn duplicate_directories(store: &Store, duplicated_uuids: &[String]) -> Vec bool { + let mut volume_opts_changed = false; + if store.get_preallocate() != hb_resp.preallocate { + store.set_preallocate(hb_resp.preallocate); + volume_opts_changed = true; + } + if hb_resp.volume_size_limit > 0 + && store.volume_size_limit.load(Ordering::Relaxed) != hb_resp.volume_size_limit + { + store + .volume_size_limit + .store(hb_resp.volume_size_limit, Ordering::Relaxed); + volume_opts_changed = true; + } + + volume_opts_changed && store.maybe_adjust_volume_max() +} + /// Perform one heartbeat session with a master server. async fn do_heartbeat( config: &HeartbeatConfig, @@ -320,21 +338,16 @@ async fn do_heartbeat( resp = response_stream.message() => { match resp { Ok(Some(hb_resp)) => { - if hb_resp.volume_size_limit > 0 { - let changed = { - let s = state.store.read().unwrap(); - s.volume_size_limit.store( - hb_resp.volume_size_limit, - std::sync::atomic::Ordering::Relaxed, - ); - s.maybe_adjust_volume_max() - }; - if changed { - let adjusted_hb = collect_heartbeat(config, state); - last_volumes = adjusted_hb.volumes.iter().map(|v| (v.id, v.clone())).collect(); - if tx.send(adjusted_hb).await.is_err() { - return Ok(None); - } + let changed = { + let s = state.store.read().unwrap(); + apply_master_volume_options(&s, &hb_resp) + }; + if changed { + let adjusted_hb = collect_heartbeat(config, state); + last_volumes = + adjusted_hb.volumes.iter().map(|v| (v.id, v.clone())).collect(); + if tx.send(adjusted_hb).await.is_err() { + return Ok(None); } } let metrics_changed = apply_metrics_push_settings( @@ -1357,4 +1370,23 @@ mod tests { assert_eq!(duplicate_dirs, vec![dir.to_string()]); } + + #[test] + fn test_apply_master_volume_options_updates_preallocate_and_size_limit() { + let store = Store::new(NeedleMapKind::InMemory); + store.volume_size_limit.store(1024, Ordering::Relaxed); + + let changed = apply_master_volume_options( + &store, + &master_pb::HeartbeatResponse { + volume_size_limit: 2048, + preallocate: true, + ..Default::default() + }, + ); + + assert!(store.get_preallocate()); + assert_eq!(store.volume_size_limit.load(Ordering::Relaxed), 2048); + assert!(!changed); + } } diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index 4c6c3caef..2702b82d8 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -5,7 +5,7 @@ //! Matches Go's storage/store.go. use std::io; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use crate::config::MinFreeSpace; use crate::pb::master_pb; @@ -22,6 +22,7 @@ use crate::storage::volume::{VifVolumeInfo, VolumeError}; pub struct Store { pub locations: Vec, pub needle_map_kind: NeedleMapKind, + preallocate: AtomicBool, pub volume_size_limit: AtomicU64, pub id: String, pub ip: String, @@ -37,6 +38,7 @@ impl Store { Store { locations: Vec::new(), needle_map_kind, + preallocate: AtomicBool::new(false), volume_size_limit: AtomicU64::new(0), id: String::new(), ip: String::new(), @@ -404,6 +406,14 @@ impl Store { self.locations.iter().map(|loc| loc.volumes_len()).sum() } + pub fn set_preallocate(&self, preallocate: bool) { + self.preallocate.store(preallocate, Ordering::Relaxed); + } + + pub fn get_preallocate(&self) -> bool { + self.preallocate.load(Ordering::Relaxed) + } + /// Total max volumes across all locations. pub fn max_volume_count(&self) -> i32 { self.locations @@ -433,7 +443,11 @@ impl Store { let current = loc.max_volume_count.load(Ordering::Relaxed); let (_, free) = super::disk_location::get_disk_stats(&loc.directory); - let unused_space = loc.unused_space(volume_size_limit); + let unused_space = if self.get_preallocate() { + 0 + } else { + loc.unused_space(volume_size_limit) + }; let unclaimed = (free as i64) - (unused_space as i64); let vol_count = loc.volumes_len() as i32; @@ -1087,6 +1101,68 @@ mod tests { assert!(store.has_volume(VolumeId(3))); } + #[test] + fn test_maybe_adjust_volume_max_honors_preallocate_flag() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store + .add_location( + dir, + dir, + 2, + DiskType::HardDrive, + MinFreeSpace::Percent(1.0), + Vec::new(), + ) + .unwrap(); + store.volume_size_limit.store(1024, Ordering::Relaxed); + store + .add_volume( + VolumeId(61), + "preallocate_case", + None, + None, + 0, + DiskType::HardDrive, + Version::current(), + ) + .unwrap(); + store + .add_volume( + VolumeId(62), + "preallocate_case", + None, + None, + 0, + DiskType::HardDrive, + Version::current(), + ) + .unwrap(); + for vid in [VolumeId(61), VolumeId(62)] { + let dat_path = store.find_volume(vid).unwrap().1.dat_path(); + std::fs::OpenOptions::new() + .write(true) + .open(dat_path) + .unwrap() + .set_len((crate::storage::super_block::SUPER_BLOCK_SIZE + 1) as u64) + .unwrap(); + } + store.locations[0].original_max_volume_count = 0; + store.locations[0].max_volume_count.store(0, Ordering::Relaxed); + + store.set_preallocate(false); + assert!(store.maybe_adjust_volume_max()); + let without_preallocate = store.locations[0].max_volume_count.load(Ordering::Relaxed); + + store.set_preallocate(true); + assert!(store.maybe_adjust_volume_max()); + let with_preallocate = store.locations[0].max_volume_count.load(Ordering::Relaxed); + + assert!(with_preallocate > without_preallocate); + } + #[test] fn test_delete_expired_ec_volumes_removes_expired_entries() { let tmp = TempDir::new().unwrap();