Browse Source

refactor: move EC volumes from Store to per-DiskLocation

Matches Go's architecture where EC volumes are managed per disk location,
enabling correct per-location max volume count calculation and proper
distribution of EC shards across disks.
rust-volume-server
Chris Lu 20 hours ago
parent
commit
df75a325bf
  1. 24
      seaweed-volume/src/server/grpc_server.rs
  2. 28
      seaweed-volume/src/server/heartbeat.rs
  3. 84
      seaweed-volume/src/storage/disk_location.rs
  4. 95
      seaweed-volume/src/storage/store.rs

24
seaweed-volume/src/server/grpc_server.rs

@ -122,7 +122,7 @@ impl VolumeServer for VolumeGrpcService {
// Check if this is an EC volume // Check if this is an EC volume
let is_ec_volume = { let is_ec_volume = {
let store = self.state.store.read().unwrap(); let store = self.state.store.read().unwrap();
store.ec_volumes.contains_key(&file_id.volume_id)
store.has_ec_volume(file_id.volume_id)
}; };
// Cookie validation (unless skip_cookie_check) // Cookie validation (unless skip_cookie_check)
@ -146,7 +146,7 @@ impl VolumeServer for VolumeGrpcService {
} else { } else {
// For EC volumes, verify needle exists in ecx index // For EC volumes, verify needle exists in ecx index
let store = self.state.store.read().unwrap(); let store = self.state.store.read().unwrap();
if let Some(ec_vol) = store.ec_volumes.get(&file_id.volume_id) {
if let Some(ec_vol) = store.find_ec_volume(file_id.volume_id) {
match ec_vol.find_needle_from_ecx(n.id) { match ec_vol.find_needle_from_ecx(n.id) {
Ok(Some((_, size))) if !size.is_deleted() => { Ok(Some((_, size))) if !size.is_deleted() => {
// Needle exists and is not deleted — cookie check not possible // Needle exists and is not deleted — cookie check not possible
@ -247,7 +247,7 @@ impl VolumeServer for VolumeGrpcService {
} else { } else {
// EC volume deletion: journal the delete locally (with cookie validation, matching Go) // EC volume deletion: journal the delete locally (with cookie validation, matching Go)
let mut store = self.state.store.write().unwrap(); let mut store = self.state.store.write().unwrap();
if let Some(ec_vol) = store.ec_volumes.get_mut(&file_id.volume_id) {
if let Some(ec_vol) = store.find_ec_volume_mut(file_id.volume_id) {
match ec_vol.journal_delete_with_cookie(n.id, n.cookie) { match ec_vol.journal_delete_with_cookie(n.id, n.cookie) {
Ok(()) => { Ok(()) => {
results.push(volume_server_pb::DeleteResult { results.push(volume_server_pb::DeleteResult {
@ -2085,7 +2085,7 @@ impl VolumeServer for VolumeGrpcService {
let vid = VolumeId(req.volume_id); let vid = VolumeId(req.volume_id);
let store = self.state.store.read().unwrap(); let store = self.state.store.read().unwrap();
let ec_vol = store.ec_volumes.get(&vid).ok_or_else(|| {
let ec_vol = store.find_ec_volume(vid).ok_or_else(|| {
Status::not_found(format!( Status::not_found(format!(
"ec volume {} shard {} not found", "ec volume {} shard {} not found",
req.volume_id, req.shard_id req.volume_id, req.shard_id
@ -2147,7 +2147,7 @@ impl VolumeServer for VolumeGrpcService {
let needle_id = NeedleId(req.file_key); let needle_id = NeedleId(req.file_key);
let mut store = self.state.store.write().unwrap(); let mut store = self.state.store.write().unwrap();
if let Some(ec_vol) = store.ec_volumes.get_mut(&vid) {
if let Some(ec_vol) = store.find_ec_volume_mut(vid) {
ec_vol ec_vol
.journal_delete(needle_id) .journal_delete(needle_id)
.map_err(|e| Status::internal(e.to_string()))?; .map_err(|e| Status::internal(e.to_string()))?;
@ -2168,8 +2168,7 @@ impl VolumeServer for VolumeGrpcService {
let store = self.state.store.read().unwrap(); let store = self.state.store.read().unwrap();
let ec_vol = store let ec_vol = store
.ec_volumes
.get(&vid)
.find_ec_volume(vid)
.ok_or_else(|| Status::not_found(format!("ec volume {} not found", req.volume_id)))?; .ok_or_else(|| Status::not_found(format!("ec volume {} not found", req.volume_id)))?;
if ec_vol.collection != req.collection { if ec_vol.collection != req.collection {
@ -2253,7 +2252,7 @@ impl VolumeServer for VolumeGrpcService {
{ {
let mut store = self.state.store.write().unwrap(); let mut store = self.state.store.write().unwrap();
// Remove EC volume // Remove EC volume
if let Some(mut ec_vol) = store.ec_volumes.remove(&vid) {
if let Some(mut ec_vol) = store.remove_ec_volume(vid) {
ec_vol.close(); ec_vol.close();
} }
// Unmount existing volume if any, then mount fresh // Unmount existing volume if any, then mount fresh
@ -2277,8 +2276,7 @@ impl VolumeServer for VolumeGrpcService {
let store = self.state.store.read().unwrap(); let store = self.state.store.read().unwrap();
let ec_vol = store let ec_vol = store
.ec_volumes
.get(&vid)
.find_ec_volume(vid)
.ok_or_else(|| Status::not_found(format!("ec volume {} not found", req.volume_id)))?; .ok_or_else(|| Status::not_found(format!("ec volume {} not found", req.volume_id)))?;
let mut shard_infos = Vec::new(); let mut shard_infos = Vec::new();
@ -2853,7 +2851,7 @@ impl VolumeServer for VolumeGrpcService {
let store = self.state.store.read().unwrap(); let store = self.state.store.read().unwrap();
let vids: Vec<VolumeId> = if req.volume_ids.is_empty() { let vids: Vec<VolumeId> = if req.volume_ids.is_empty() {
store.ec_volumes.keys().copied().collect()
store.locations.iter().flat_map(|loc| loc.ec_volumes().map(|(vid, _)| *vid)).collect()
} else { } else {
req.volume_ids.iter().map(|&id| VolumeId(id)).collect() req.volume_ids.iter().map(|&id| VolumeId(id)).collect()
}; };
@ -2866,7 +2864,7 @@ impl VolumeServer for VolumeGrpcService {
for vid in &vids { for vid in &vids {
let collection = { let collection = {
if let Some(ecv) = store.ec_volumes.get(vid) {
if let Some(ecv) = store.find_ec_volume(*vid) {
ecv.collection.clone() ecv.collection.clone()
} else { } else {
return Err(Status::not_found(format!( return Err(Status::not_found(format!(
@ -3114,7 +3112,7 @@ impl VolumeServer for VolumeGrpcService {
} }
// Fall back to EC shards // Fall back to EC shards
if let Some(ec_vol) = store.ec_volumes.get(&vid) {
if let Some(ec_vol) = store.find_ec_volume(vid) {
match ec_vol.find_needle_from_ecx(needle_id) { match ec_vol.find_needle_from_ecx(needle_id) {
Ok(Some((offset, size))) if !size.is_deleted() && !offset.is_zero() => { Ok(Some((offset, size))) if !size.is_deleted() && !offset.is_zero() => {
// Read the needle header from EC shards to get cookie // Read the needle header from EC shards to get cookie

28
seaweed-volume/src/server/heartbeat.rs

@ -527,20 +527,22 @@ fn collect_ec_heartbeat(state: &Arc<VolumeServerState>) -> master_pb::Heartbeat
let store = state.store.read().unwrap(); let store = state.store.read().unwrap();
let mut ec_shards = Vec::new(); let mut ec_shards = Vec::new();
for (vid, ec_vol) in &store.ec_volumes {
let mut ec_index_bits: u32 = 0;
for shard_opt in &ec_vol.shards {
if let Some(shard) = shard_opt {
ec_index_bits |= 1u32 << shard.shard_id;
for loc in &store.locations {
for (vid, ec_vol) in loc.ec_volumes() {
let mut ec_index_bits: u32 = 0;
for shard_opt in &ec_vol.shards {
if let Some(shard) = shard_opt {
ec_index_bits |= 1u32 << shard.shard_id;
}
}
if ec_index_bits > 0 {
ec_shards.push(master_pb::VolumeEcShardInformationMessage {
id: vid.0,
collection: ec_vol.collection.clone(),
ec_index_bits,
..Default::default()
});
} }
}
if ec_index_bits > 0 {
ec_shards.push(master_pb::VolumeEcShardInformationMessage {
id: vid.0,
collection: ec_vol.collection.clone(),
ec_index_bits,
..Default::default()
});
} }
} }

84
seaweed-volume/src/storage/disk_location.rs

@ -13,8 +13,10 @@ use tracing::{info, warn};
use crate::config::MinFreeSpace; use crate::config::MinFreeSpace;
use crate::storage::erasure_coding::ec_shard::{ use crate::storage::erasure_coding::ec_shard::{
DATA_SHARDS_COUNT, ERASURE_CODING_LARGE_BLOCK_SIZE, ERASURE_CODING_SMALL_BLOCK_SIZE,
EcVolumeShard, DATA_SHARDS_COUNT, ERASURE_CODING_LARGE_BLOCK_SIZE,
ERASURE_CODING_SMALL_BLOCK_SIZE,
}; };
use crate::storage::erasure_coding::ec_volume::EcVolume;
use crate::storage::needle_map::NeedleMapKind; use crate::storage::needle_map::NeedleMapKind;
use crate::storage::super_block::ReplicaPlacement; use crate::storage::super_block::ReplicaPlacement;
use crate::storage::types::*; use crate::storage::types::*;
@ -30,6 +32,7 @@ pub struct DiskLocation {
pub max_volume_count: AtomicI32, pub max_volume_count: AtomicI32,
pub original_max_volume_count: i32, pub original_max_volume_count: i32,
volumes: HashMap<VolumeId, Volume>, volumes: HashMap<VolumeId, Volume>,
ec_volumes: HashMap<VolumeId, EcVolume>,
pub is_disk_space_low: AtomicBool, pub is_disk_space_low: AtomicBool,
pub available_space: AtomicU64, pub available_space: AtomicU64,
pub min_free_space: MinFreeSpace, pub min_free_space: MinFreeSpace,
@ -65,6 +68,7 @@ impl DiskLocation {
max_volume_count: AtomicI32::new(max_volume_count), max_volume_count: AtomicI32::new(max_volume_count),
original_max_volume_count: max_volume_count, original_max_volume_count: max_volume_count,
volumes: HashMap::new(), volumes: HashMap::new(),
ec_volumes: HashMap::new(),
is_disk_space_low: AtomicBool::new(false), is_disk_space_low: AtomicBool::new(false),
available_space: AtomicU64::new(0), available_space: AtomicU64::new(0),
min_free_space, min_free_space,
@ -475,12 +479,90 @@ impl DiskLocation {
.set(free as f64); .set(free as f64);
} }
// ---- EC volume operations ----
/// Find an EC volume by ID.
pub fn find_ec_volume(&self, vid: VolumeId) -> Option<&EcVolume> {
self.ec_volumes.get(&vid)
}
/// Find an EC volume by ID (mutable).
pub fn find_ec_volume_mut(&mut self, vid: VolumeId) -> Option<&mut EcVolume> {
self.ec_volumes.get_mut(&vid)
}
/// Check if this location has an EC volume.
pub fn has_ec_volume(&self, vid: VolumeId) -> bool {
self.ec_volumes.contains_key(&vid)
}
/// Remove an EC volume, returning it.
pub fn remove_ec_volume(&mut self, vid: VolumeId) -> Option<EcVolume> {
self.ec_volumes.remove(&vid)
}
/// Mount EC shards for a volume on this location.
pub fn mount_ec_shards(
&mut self,
vid: VolumeId,
collection: &str,
shard_ids: &[u32],
) -> Result<(), VolumeError> {
let dir = self.directory.clone();
let ec_vol = self
.ec_volumes
.entry(vid)
.or_insert_with(|| EcVolume::new(&dir, &dir, collection, vid).unwrap());
for &shard_id in shard_ids {
let shard = EcVolumeShard::new(&dir, collection, vid, shard_id as u8);
ec_vol.add_shard(shard).map_err(VolumeError::Io)?;
crate::metrics::VOLUME_GAUGE
.with_label_values(&[collection, "ec_shards"])
.inc();
}
Ok(())
}
/// Unmount EC shards for a volume on this location.
pub fn unmount_ec_shards(&mut self, vid: VolumeId, shard_ids: &[u32]) {
if let Some(ec_vol) = self.ec_volumes.get_mut(&vid) {
let collection = ec_vol.collection.clone();
for &shard_id in shard_ids {
ec_vol.remove_shard(shard_id as u8);
crate::metrics::VOLUME_GAUGE
.with_label_values(&[&collection, "ec_shards"])
.dec();
}
if ec_vol.shard_count() == 0 {
let mut vol = self.ec_volumes.remove(&vid).unwrap();
vol.close();
}
}
}
/// Total number of EC shards on this location.
pub fn ec_shard_count(&self) -> usize {
self.ec_volumes
.values()
.map(|ecv| ecv.shards.iter().filter(|s| s.is_some()).count())
.sum()
}
/// Iterate over all EC volumes.
pub fn ec_volumes(&self) -> impl Iterator<Item = (&VolumeId, &EcVolume)> {
self.ec_volumes.iter()
}
/// Close all volumes. /// Close all volumes.
pub fn close(&mut self) { pub fn close(&mut self) {
for (_, v) in self.volumes.iter_mut() { for (_, v) in self.volumes.iter_mut() {
v.close(); v.close();
} }
self.volumes.clear(); self.volumes.clear();
for (_, mut ec_vol) in self.ec_volumes.drain() {
ec_vol.close();
}
} }
} }

95
seaweed-volume/src/storage/store.rs

@ -4,7 +4,6 @@
//! It coordinates volume placement, lookup, and lifecycle operations. //! It coordinates volume placement, lookup, and lifecycle operations.
//! Matches Go's storage/store.go. //! Matches Go's storage/store.go.
use std::collections::HashMap;
use std::io; use std::io;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
@ -30,7 +29,6 @@ pub struct Store {
pub public_url: String, pub public_url: String,
pub data_center: String, pub data_center: String,
pub rack: String, pub rack: String,
pub ec_volumes: HashMap<VolumeId, EcVolume>,
} }
impl Store { impl Store {
@ -46,7 +44,6 @@ impl Store {
public_url: String::new(), public_url: String::new(),
data_center: String::new(), data_center: String::new(),
rack: String::new(), rack: String::new(),
ec_volumes: HashMap::new(),
} }
} }
@ -373,12 +370,9 @@ impl Store {
.sum() .sum()
} }
/// Total EC shard count across all EC volumes.
/// Total EC shard count across all locations.
pub fn ec_shard_count(&self) -> usize { pub fn ec_shard_count(&self) -> usize {
self.ec_volumes
.values()
.map(|ecv| ecv.shards.iter().filter(|s| s.is_some()).count())
.sum()
self.locations.iter().map(|loc| loc.ec_shard_count()).sum()
} }
/// Recalculate max volume counts for locations with original_max_volume_count == 0. /// Recalculate max volume counts for locations with original_max_volume_count == 0.
@ -392,8 +386,6 @@ impl Store {
let mut has_changes = false; let mut has_changes = false;
let mut new_max_total: i32 = 0; let mut new_max_total: i32 = 0;
let total_ec_shards = self.ec_shard_count();
for loc in &self.locations { for loc in &self.locations {
if loc.original_max_volume_count == 0 { if loc.original_max_volume_count == 0 {
let current = loc.max_volume_count.load(Ordering::Relaxed); let current = loc.max_volume_count.load(Ordering::Relaxed);
@ -403,7 +395,8 @@ impl Store {
let unclaimed = (free as i64) - (unused_space as i64); let unclaimed = (free as i64) - (unused_space as i64);
let vol_count = loc.volumes_len() as i32; let vol_count = loc.volumes_len() as i32;
let ec_equivalent = ((total_ec_shards
let loc_ec_shards = loc.ec_shard_count();
let ec_equivalent = ((loc_ec_shards
+ crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT) + crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT)
/ crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT) / crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT)
as i32; as i32;
@ -454,45 +447,72 @@ impl Store {
collection: &str, collection: &str,
shard_ids: &[u32], shard_ids: &[u32],
) -> Result<(), VolumeError> { ) -> Result<(), VolumeError> {
// Find the directory where the EC files live
let dir = self.find_ec_dir(vid, collection).ok_or_else(|| {
// Find the location where the EC files live
let loc_idx = self.find_ec_location(vid, collection).ok_or_else(|| {
VolumeError::Io(io::Error::new( VolumeError::Io(io::Error::new(
io::ErrorKind::NotFound, io::ErrorKind::NotFound,
format!("ec volume {} shards not found on disk", vid), format!("ec volume {} shards not found on disk", vid),
)) ))
})?; })?;
let ec_vol = self
.ec_volumes
.entry(vid)
.or_insert_with(|| EcVolume::new(&dir, &dir, collection, vid).unwrap());
self.locations[loc_idx].mount_ec_shards(vid, collection, shard_ids)
}
for &shard_id in shard_ids {
let shard = EcVolumeShard::new(&dir, collection, vid, shard_id as u8);
ec_vol.add_shard(shard).map_err(|e| VolumeError::Io(e))?;
crate::metrics::VOLUME_GAUGE
.with_label_values(&[collection, "ec_shards"])
.inc();
/// Unmount EC shards for a volume.
pub fn unmount_ec_shards(&mut self, vid: VolumeId, shard_ids: &[u32]) {
for loc in &mut self.locations {
if loc.has_ec_volume(vid) {
loc.unmount_ec_shards(vid, shard_ids);
return;
}
} }
}
Ok(())
/// Find an EC volume across all locations.
pub fn find_ec_volume(&self, vid: VolumeId) -> Option<&EcVolume> {
for loc in &self.locations {
if let Some(ecv) = loc.find_ec_volume(vid) {
return Some(ecv);
}
}
None
} }
/// Unmount EC shards for a volume.
pub fn unmount_ec_shards(&mut self, vid: VolumeId, shard_ids: &[u32]) {
if let Some(ec_vol) = self.ec_volumes.get_mut(&vid) {
let collection = ec_vol.collection.clone();
for &shard_id in shard_ids {
ec_vol.remove_shard(shard_id as u8);
crate::metrics::VOLUME_GAUGE
.with_label_values(&[&collection, "ec_shards"])
.dec();
/// Find an EC volume across all locations (mutable).
pub fn find_ec_volume_mut(&mut self, vid: VolumeId) -> Option<&mut EcVolume> {
for loc in &mut self.locations {
if let Some(ecv) = loc.find_ec_volume_mut(vid) {
return Some(ecv);
} }
if ec_vol.shard_count() == 0 {
let mut vol = self.ec_volumes.remove(&vid).unwrap();
vol.close();
}
None
}
/// Check if any location has an EC volume.
pub fn has_ec_volume(&self, vid: VolumeId) -> bool {
self.locations.iter().any(|loc| loc.has_ec_volume(vid))
}
/// Remove an EC volume from whichever location has it.
pub fn remove_ec_volume(&mut self, vid: VolumeId) -> Option<EcVolume> {
for loc in &mut self.locations {
if let Some(ecv) = loc.remove_ec_volume(vid) {
return Some(ecv);
} }
} }
None
}
/// Find the location index containing EC files for a volume.
pub fn find_ec_location(&self, vid: VolumeId, collection: &str) -> Option<usize> {
for (i, loc) in self.locations.iter().enumerate() {
let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid);
let ecx_path = format!("{}.ecx", base);
if std::path::Path::new(&ecx_path).exists() {
return Some(i);
}
}
None
} }
/// Delete EC shard files from disk. /// Delete EC shard files from disk.
@ -650,9 +670,6 @@ impl Store {
for loc in &mut self.locations { for loc in &mut self.locations {
loc.close(); loc.close();
} }
for (_, mut ec_vol) in self.ec_volumes.drain() {
ec_vol.close();
}
} }
} }

Loading…
Cancel
Save