Browse Source

Honor master preallocate in volume max

rust-volume-server
Chris Lu 4 days ago
parent
commit
4fcbe1c6d4
  1. 62
      seaweed-volume/src/server/heartbeat.rs
  2. 80
      seaweed-volume/src/storage/store.rs

62
seaweed-volume/src/server/heartbeat.rs

@ -261,6 +261,24 @@ fn duplicate_directories(store: &Store, duplicated_uuids: &[String]) -> Vec<Stri
duplicate_dirs duplicate_dirs
} }
fn apply_master_volume_options(store: &Store, hb_resp: &master_pb::HeartbeatResponse) -> bool {
let mut volume_opts_changed = false;
if store.get_preallocate() != hb_resp.preallocate {
store.set_preallocate(hb_resp.preallocate);
volume_opts_changed = true;
}
if hb_resp.volume_size_limit > 0
&& store.volume_size_limit.load(Ordering::Relaxed) != hb_resp.volume_size_limit
{
store
.volume_size_limit
.store(hb_resp.volume_size_limit, Ordering::Relaxed);
volume_opts_changed = true;
}
volume_opts_changed && store.maybe_adjust_volume_max()
}
/// Perform one heartbeat session with a master server. /// Perform one heartbeat session with a master server.
async fn do_heartbeat( async fn do_heartbeat(
config: &HeartbeatConfig, config: &HeartbeatConfig,
@ -320,21 +338,16 @@ async fn do_heartbeat(
resp = response_stream.message() => { resp = response_stream.message() => {
match resp { match resp {
Ok(Some(hb_resp)) => { Ok(Some(hb_resp)) => {
if hb_resp.volume_size_limit > 0 {
let changed = {
let s = state.store.read().unwrap();
s.volume_size_limit.store(
hb_resp.volume_size_limit,
std::sync::atomic::Ordering::Relaxed,
);
s.maybe_adjust_volume_max()
};
if changed {
let adjusted_hb = collect_heartbeat(config, state);
last_volumes = adjusted_hb.volumes.iter().map(|v| (v.id, v.clone())).collect();
if tx.send(adjusted_hb).await.is_err() {
return Ok(None);
}
let changed = {
let s = state.store.read().unwrap();
apply_master_volume_options(&s, &hb_resp)
};
if changed {
let adjusted_hb = collect_heartbeat(config, state);
last_volumes =
adjusted_hb.volumes.iter().map(|v| (v.id, v.clone())).collect();
if tx.send(adjusted_hb).await.is_err() {
return Ok(None);
} }
} }
let metrics_changed = apply_metrics_push_settings( let metrics_changed = apply_metrics_push_settings(
@ -1357,4 +1370,23 @@ mod tests {
assert_eq!(duplicate_dirs, vec![dir.to_string()]); assert_eq!(duplicate_dirs, vec![dir.to_string()]);
} }
#[test]
fn test_apply_master_volume_options_updates_preallocate_and_size_limit() {
let store = Store::new(NeedleMapKind::InMemory);
store.volume_size_limit.store(1024, Ordering::Relaxed);
let changed = apply_master_volume_options(
&store,
&master_pb::HeartbeatResponse {
volume_size_limit: 2048,
preallocate: true,
..Default::default()
},
);
assert!(store.get_preallocate());
assert_eq!(store.volume_size_limit.load(Ordering::Relaxed), 2048);
assert!(!changed);
}
} }

80
seaweed-volume/src/storage/store.rs

@ -5,7 +5,7 @@
//! Matches Go's storage/store.go. //! Matches Go's storage/store.go.
use std::io; use std::io;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use crate::config::MinFreeSpace; use crate::config::MinFreeSpace;
use crate::pb::master_pb; use crate::pb::master_pb;
@ -22,6 +22,7 @@ use crate::storage::volume::{VifVolumeInfo, VolumeError};
pub struct Store { pub struct Store {
pub locations: Vec<DiskLocation>, pub locations: Vec<DiskLocation>,
pub needle_map_kind: NeedleMapKind, pub needle_map_kind: NeedleMapKind,
preallocate: AtomicBool,
pub volume_size_limit: AtomicU64, pub volume_size_limit: AtomicU64,
pub id: String, pub id: String,
pub ip: String, pub ip: String,
@ -37,6 +38,7 @@ impl Store {
Store { Store {
locations: Vec::new(), locations: Vec::new(),
needle_map_kind, needle_map_kind,
preallocate: AtomicBool::new(false),
volume_size_limit: AtomicU64::new(0), volume_size_limit: AtomicU64::new(0),
id: String::new(), id: String::new(),
ip: String::new(), ip: String::new(),
@ -404,6 +406,14 @@ impl Store {
self.locations.iter().map(|loc| loc.volumes_len()).sum() self.locations.iter().map(|loc| loc.volumes_len()).sum()
} }
pub fn set_preallocate(&self, preallocate: bool) {
self.preallocate.store(preallocate, Ordering::Relaxed);
}
pub fn get_preallocate(&self) -> bool {
self.preallocate.load(Ordering::Relaxed)
}
/// Total max volumes across all locations. /// Total max volumes across all locations.
pub fn max_volume_count(&self) -> i32 { pub fn max_volume_count(&self) -> i32 {
self.locations self.locations
@ -433,7 +443,11 @@ impl Store {
let current = loc.max_volume_count.load(Ordering::Relaxed); let current = loc.max_volume_count.load(Ordering::Relaxed);
let (_, free) = super::disk_location::get_disk_stats(&loc.directory); let (_, free) = super::disk_location::get_disk_stats(&loc.directory);
let unused_space = loc.unused_space(volume_size_limit);
let unused_space = if self.get_preallocate() {
0
} else {
loc.unused_space(volume_size_limit)
};
let unclaimed = (free as i64) - (unused_space as i64); let unclaimed = (free as i64) - (unused_space as i64);
let vol_count = loc.volumes_len() as i32; let vol_count = loc.volumes_len() as i32;
@ -1087,6 +1101,68 @@ mod tests {
assert!(store.has_volume(VolumeId(3))); assert!(store.has_volume(VolumeId(3)));
} }
#[test]
fn test_maybe_adjust_volume_max_honors_preallocate_flag() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory);
store
.add_location(
dir,
dir,
2,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
store.volume_size_limit.store(1024, Ordering::Relaxed);
store
.add_volume(
VolumeId(61),
"preallocate_case",
None,
None,
0,
DiskType::HardDrive,
Version::current(),
)
.unwrap();
store
.add_volume(
VolumeId(62),
"preallocate_case",
None,
None,
0,
DiskType::HardDrive,
Version::current(),
)
.unwrap();
for vid in [VolumeId(61), VolumeId(62)] {
let dat_path = store.find_volume(vid).unwrap().1.dat_path();
std::fs::OpenOptions::new()
.write(true)
.open(dat_path)
.unwrap()
.set_len((crate::storage::super_block::SUPER_BLOCK_SIZE + 1) as u64)
.unwrap();
}
store.locations[0].original_max_volume_count = 0;
store.locations[0].max_volume_count.store(0, Ordering::Relaxed);
store.set_preallocate(false);
assert!(store.maybe_adjust_volume_max());
let without_preallocate = store.locations[0].max_volume_count.load(Ordering::Relaxed);
store.set_preallocate(true);
assert!(store.maybe_adjust_volume_max());
let with_preallocate = store.locations[0].max_volume_count.load(Ordering::Relaxed);
assert!(with_preallocate > without_preallocate);
}
#[test] #[test]
fn test_delete_expired_ec_volumes_removes_expired_entries() { fn test_delete_expired_ec_volumes_removes_expired_entries() {
let tmp = TempDir::new().unwrap(); let tmp = TempDir::new().unwrap();

Loading…
Cancel
Save