From df75a325bf13804b434e0b594f6ab6828581bfc2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 9 Mar 2026 01:25:45 -0700 Subject: [PATCH] refactor: move EC volumes from Store to per-DiskLocation Matches Go's architecture where EC volumes are managed per disk location, enabling correct per-location max volume count calculation and proper distribution of EC shards across disks. --- seaweed-volume/src/server/grpc_server.rs | 24 +++--- seaweed-volume/src/server/heartbeat.rs | 28 +++--- seaweed-volume/src/storage/disk_location.rs | 84 +++++++++++++++++- seaweed-volume/src/storage/store.rs | 95 ++++++++++++--------- 4 files changed, 165 insertions(+), 66 deletions(-) diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 9b115f16d..d5e5424a0 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -122,7 +122,7 @@ impl VolumeServer for VolumeGrpcService { // Check if this is an EC volume let is_ec_volume = { let store = self.state.store.read().unwrap(); - store.ec_volumes.contains_key(&file_id.volume_id) + store.has_ec_volume(file_id.volume_id) }; // Cookie validation (unless skip_cookie_check) @@ -146,7 +146,7 @@ impl VolumeServer for VolumeGrpcService { } else { // For EC volumes, verify needle exists in ecx index let store = self.state.store.read().unwrap(); - if let Some(ec_vol) = store.ec_volumes.get(&file_id.volume_id) { + if let Some(ec_vol) = store.find_ec_volume(file_id.volume_id) { match ec_vol.find_needle_from_ecx(n.id) { Ok(Some((_, size))) if !size.is_deleted() => { // Needle exists and is not deleted — cookie check not possible @@ -247,7 +247,7 @@ impl VolumeServer for VolumeGrpcService { } else { // EC volume deletion: journal the delete locally (with cookie validation, matching Go) let mut store = self.state.store.write().unwrap(); - if let Some(ec_vol) = store.ec_volumes.get_mut(&file_id.volume_id) { + if let Some(ec_vol) = store.find_ec_volume_mut(file_id.volume_id) { match ec_vol.journal_delete_with_cookie(n.id, n.cookie) { Ok(()) => { results.push(volume_server_pb::DeleteResult { @@ -2085,7 +2085,7 @@ impl VolumeServer for VolumeGrpcService { let vid = VolumeId(req.volume_id); let store = self.state.store.read().unwrap(); - let ec_vol = store.ec_volumes.get(&vid).ok_or_else(|| { + let ec_vol = store.find_ec_volume(vid).ok_or_else(|| { Status::not_found(format!( "ec volume {} shard {} not found", req.volume_id, req.shard_id @@ -2147,7 +2147,7 @@ impl VolumeServer for VolumeGrpcService { let needle_id = NeedleId(req.file_key); let mut store = self.state.store.write().unwrap(); - if let Some(ec_vol) = store.ec_volumes.get_mut(&vid) { + if let Some(ec_vol) = store.find_ec_volume_mut(vid) { ec_vol .journal_delete(needle_id) .map_err(|e| Status::internal(e.to_string()))?; @@ -2168,8 +2168,7 @@ impl VolumeServer for VolumeGrpcService { let store = self.state.store.read().unwrap(); let ec_vol = store - .ec_volumes - .get(&vid) + .find_ec_volume(vid) .ok_or_else(|| Status::not_found(format!("ec volume {} not found", req.volume_id)))?; if ec_vol.collection != req.collection { @@ -2253,7 +2252,7 @@ impl VolumeServer for VolumeGrpcService { { let mut store = self.state.store.write().unwrap(); // Remove EC volume - if let Some(mut ec_vol) = store.ec_volumes.remove(&vid) { + if let Some(mut ec_vol) = store.remove_ec_volume(vid) { ec_vol.close(); } // Unmount existing volume if any, then mount fresh @@ -2277,8 +2276,7 @@ impl VolumeServer for VolumeGrpcService { let store = self.state.store.read().unwrap(); let ec_vol = store - .ec_volumes - .get(&vid) + .find_ec_volume(vid) .ok_or_else(|| Status::not_found(format!("ec volume {} not found", req.volume_id)))?; let mut shard_infos = Vec::new(); @@ -2853,7 +2851,7 @@ impl VolumeServer for VolumeGrpcService { let store = self.state.store.read().unwrap(); let vids: Vec = if req.volume_ids.is_empty() { - store.ec_volumes.keys().copied().collect() + store.locations.iter().flat_map(|loc| loc.ec_volumes().map(|(vid, _)| *vid)).collect() } else { req.volume_ids.iter().map(|&id| VolumeId(id)).collect() }; @@ -2866,7 +2864,7 @@ impl VolumeServer for VolumeGrpcService { for vid in &vids { let collection = { - if let Some(ecv) = store.ec_volumes.get(vid) { + if let Some(ecv) = store.find_ec_volume(*vid) { ecv.collection.clone() } else { return Err(Status::not_found(format!( @@ -3114,7 +3112,7 @@ impl VolumeServer for VolumeGrpcService { } // Fall back to EC shards - if let Some(ec_vol) = store.ec_volumes.get(&vid) { + if let Some(ec_vol) = store.find_ec_volume(vid) { match ec_vol.find_needle_from_ecx(needle_id) { Ok(Some((offset, size))) if !size.is_deleted() && !offset.is_zero() => { // Read the needle header from EC shards to get cookie diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index 587b01ea5..3fbed6338 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -527,20 +527,22 @@ fn collect_ec_heartbeat(state: &Arc) -> master_pb::Heartbeat let store = state.store.read().unwrap(); let mut ec_shards = Vec::new(); - for (vid, ec_vol) in &store.ec_volumes { - let mut ec_index_bits: u32 = 0; - for shard_opt in &ec_vol.shards { - if let Some(shard) = shard_opt { - ec_index_bits |= 1u32 << shard.shard_id; + for loc in &store.locations { + for (vid, ec_vol) in loc.ec_volumes() { + let mut ec_index_bits: u32 = 0; + for shard_opt in &ec_vol.shards { + if let Some(shard) = shard_opt { + ec_index_bits |= 1u32 << shard.shard_id; + } + } + if ec_index_bits > 0 { + ec_shards.push(master_pb::VolumeEcShardInformationMessage { + id: vid.0, + collection: ec_vol.collection.clone(), + ec_index_bits, + ..Default::default() + }); } - } - if ec_index_bits > 0 { - ec_shards.push(master_pb::VolumeEcShardInformationMessage { - id: vid.0, - collection: ec_vol.collection.clone(), - ec_index_bits, - ..Default::default() - }); } } diff --git a/seaweed-volume/src/storage/disk_location.rs b/seaweed-volume/src/storage/disk_location.rs index 4cc3f080c..e3ff03a3e 100644 --- a/seaweed-volume/src/storage/disk_location.rs +++ b/seaweed-volume/src/storage/disk_location.rs @@ -13,8 +13,10 @@ use tracing::{info, warn}; use crate::config::MinFreeSpace; use crate::storage::erasure_coding::ec_shard::{ - DATA_SHARDS_COUNT, ERASURE_CODING_LARGE_BLOCK_SIZE, ERASURE_CODING_SMALL_BLOCK_SIZE, + EcVolumeShard, DATA_SHARDS_COUNT, ERASURE_CODING_LARGE_BLOCK_SIZE, + ERASURE_CODING_SMALL_BLOCK_SIZE, }; +use crate::storage::erasure_coding::ec_volume::EcVolume; use crate::storage::needle_map::NeedleMapKind; use crate::storage::super_block::ReplicaPlacement; use crate::storage::types::*; @@ -30,6 +32,7 @@ pub struct DiskLocation { pub max_volume_count: AtomicI32, pub original_max_volume_count: i32, volumes: HashMap, + ec_volumes: HashMap, pub is_disk_space_low: AtomicBool, pub available_space: AtomicU64, pub min_free_space: MinFreeSpace, @@ -65,6 +68,7 @@ impl DiskLocation { max_volume_count: AtomicI32::new(max_volume_count), original_max_volume_count: max_volume_count, volumes: HashMap::new(), + ec_volumes: HashMap::new(), is_disk_space_low: AtomicBool::new(false), available_space: AtomicU64::new(0), min_free_space, @@ -475,12 +479,90 @@ impl DiskLocation { .set(free as f64); } + // ---- EC volume operations ---- + + /// Find an EC volume by ID. + pub fn find_ec_volume(&self, vid: VolumeId) -> Option<&EcVolume> { + self.ec_volumes.get(&vid) + } + + /// Find an EC volume by ID (mutable). + pub fn find_ec_volume_mut(&mut self, vid: VolumeId) -> Option<&mut EcVolume> { + self.ec_volumes.get_mut(&vid) + } + + /// Check if this location has an EC volume. + pub fn has_ec_volume(&self, vid: VolumeId) -> bool { + self.ec_volumes.contains_key(&vid) + } + + /// Remove an EC volume, returning it. + pub fn remove_ec_volume(&mut self, vid: VolumeId) -> Option { + self.ec_volumes.remove(&vid) + } + + /// Mount EC shards for a volume on this location. + pub fn mount_ec_shards( + &mut self, + vid: VolumeId, + collection: &str, + shard_ids: &[u32], + ) -> Result<(), VolumeError> { + let dir = self.directory.clone(); + let ec_vol = self + .ec_volumes + .entry(vid) + .or_insert_with(|| EcVolume::new(&dir, &dir, collection, vid).unwrap()); + + for &shard_id in shard_ids { + let shard = EcVolumeShard::new(&dir, collection, vid, shard_id as u8); + ec_vol.add_shard(shard).map_err(VolumeError::Io)?; + crate::metrics::VOLUME_GAUGE + .with_label_values(&[collection, "ec_shards"]) + .inc(); + } + Ok(()) + } + + /// Unmount EC shards for a volume on this location. + pub fn unmount_ec_shards(&mut self, vid: VolumeId, shard_ids: &[u32]) { + if let Some(ec_vol) = self.ec_volumes.get_mut(&vid) { + let collection = ec_vol.collection.clone(); + for &shard_id in shard_ids { + ec_vol.remove_shard(shard_id as u8); + crate::metrics::VOLUME_GAUGE + .with_label_values(&[&collection, "ec_shards"]) + .dec(); + } + if ec_vol.shard_count() == 0 { + let mut vol = self.ec_volumes.remove(&vid).unwrap(); + vol.close(); + } + } + } + + /// Total number of EC shards on this location. + pub fn ec_shard_count(&self) -> usize { + self.ec_volumes + .values() + .map(|ecv| ecv.shards.iter().filter(|s| s.is_some()).count()) + .sum() + } + + /// Iterate over all EC volumes. + pub fn ec_volumes(&self) -> impl Iterator { + self.ec_volumes.iter() + } + /// Close all volumes. pub fn close(&mut self) { for (_, v) in self.volumes.iter_mut() { v.close(); } self.volumes.clear(); + for (_, mut ec_vol) in self.ec_volumes.drain() { + ec_vol.close(); + } } } diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index 1602b7ea9..5997f7f57 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -4,7 +4,6 @@ //! It coordinates volume placement, lookup, and lifecycle operations. //! Matches Go's storage/store.go. -use std::collections::HashMap; use std::io; use std::sync::atomic::{AtomicU64, Ordering}; @@ -30,7 +29,6 @@ pub struct Store { pub public_url: String, pub data_center: String, pub rack: String, - pub ec_volumes: HashMap, } impl Store { @@ -46,7 +44,6 @@ impl Store { public_url: String::new(), data_center: String::new(), rack: String::new(), - ec_volumes: HashMap::new(), } } @@ -373,12 +370,9 @@ impl Store { .sum() } - /// Total EC shard count across all EC volumes. + /// Total EC shard count across all locations. pub fn ec_shard_count(&self) -> usize { - self.ec_volumes - .values() - .map(|ecv| ecv.shards.iter().filter(|s| s.is_some()).count()) - .sum() + self.locations.iter().map(|loc| loc.ec_shard_count()).sum() } /// Recalculate max volume counts for locations with original_max_volume_count == 0. @@ -392,8 +386,6 @@ impl Store { let mut has_changes = false; let mut new_max_total: i32 = 0; - let total_ec_shards = self.ec_shard_count(); - for loc in &self.locations { if loc.original_max_volume_count == 0 { let current = loc.max_volume_count.load(Ordering::Relaxed); @@ -403,7 +395,8 @@ impl Store { let unclaimed = (free as i64) - (unused_space as i64); let vol_count = loc.volumes_len() as i32; - let ec_equivalent = ((total_ec_shards + let loc_ec_shards = loc.ec_shard_count(); + let ec_equivalent = ((loc_ec_shards + crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT) / crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT) as i32; @@ -454,45 +447,72 @@ impl Store { collection: &str, shard_ids: &[u32], ) -> Result<(), VolumeError> { - // Find the directory where the EC files live - let dir = self.find_ec_dir(vid, collection).ok_or_else(|| { + // Find the location where the EC files live + let loc_idx = self.find_ec_location(vid, collection).ok_or_else(|| { VolumeError::Io(io::Error::new( io::ErrorKind::NotFound, format!("ec volume {} shards not found on disk", vid), )) })?; - let ec_vol = self - .ec_volumes - .entry(vid) - .or_insert_with(|| EcVolume::new(&dir, &dir, collection, vid).unwrap()); + self.locations[loc_idx].mount_ec_shards(vid, collection, shard_ids) + } - for &shard_id in shard_ids { - let shard = EcVolumeShard::new(&dir, collection, vid, shard_id as u8); - ec_vol.add_shard(shard).map_err(|e| VolumeError::Io(e))?; - crate::metrics::VOLUME_GAUGE - .with_label_values(&[collection, "ec_shards"]) - .inc(); + /// Unmount EC shards for a volume. + pub fn unmount_ec_shards(&mut self, vid: VolumeId, shard_ids: &[u32]) { + for loc in &mut self.locations { + if loc.has_ec_volume(vid) { + loc.unmount_ec_shards(vid, shard_ids); + return; + } } + } - Ok(()) + /// Find an EC volume across all locations. + pub fn find_ec_volume(&self, vid: VolumeId) -> Option<&EcVolume> { + for loc in &self.locations { + if let Some(ecv) = loc.find_ec_volume(vid) { + return Some(ecv); + } + } + None } - /// Unmount EC shards for a volume. - pub fn unmount_ec_shards(&mut self, vid: VolumeId, shard_ids: &[u32]) { - if let Some(ec_vol) = self.ec_volumes.get_mut(&vid) { - let collection = ec_vol.collection.clone(); - for &shard_id in shard_ids { - ec_vol.remove_shard(shard_id as u8); - crate::metrics::VOLUME_GAUGE - .with_label_values(&[&collection, "ec_shards"]) - .dec(); + /// Find an EC volume across all locations (mutable). + pub fn find_ec_volume_mut(&mut self, vid: VolumeId) -> Option<&mut EcVolume> { + for loc in &mut self.locations { + if let Some(ecv) = loc.find_ec_volume_mut(vid) { + return Some(ecv); } - if ec_vol.shard_count() == 0 { - let mut vol = self.ec_volumes.remove(&vid).unwrap(); - vol.close(); + } + None + } + + /// Check if any location has an EC volume. + pub fn has_ec_volume(&self, vid: VolumeId) -> bool { + self.locations.iter().any(|loc| loc.has_ec_volume(vid)) + } + + /// Remove an EC volume from whichever location has it. + pub fn remove_ec_volume(&mut self, vid: VolumeId) -> Option { + for loc in &mut self.locations { + if let Some(ecv) = loc.remove_ec_volume(vid) { + return Some(ecv); } } + None + } + + /// Find the location index containing EC files for a volume. + pub fn find_ec_location(&self, vid: VolumeId, collection: &str) -> Option { + for (i, loc) in self.locations.iter().enumerate() { + let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid); + let ecx_path = format!("{}.ecx", base); + if std::path::Path::new(&ecx_path).exists() { + return Some(i); + } + } + None } /// Delete EC shard files from disk. @@ -650,9 +670,6 @@ impl Store { for loc in &mut self.locations { loc.close(); } - for (_, mut ec_vol) in self.ec_volumes.drain() { - ec_vol.close(); - } } }