Browse Source

Add per-shard EC mount/unmount, location predicate search, idx directory for EC

rust-volume-server
Chris Lu 3 days ago
parent
commit
886dac6fcb
  1. 3
      seaweed-volume/src/storage/disk_location.rs
  2. 53
      seaweed-volume/src/storage/store.rs

3
seaweed-volume/src/storage/disk_location.rs

@ -546,10 +546,11 @@ impl DiskLocation {
shard_ids: &[u32],
) -> Result<(), VolumeError> {
let dir = self.directory.clone();
let idx_dir = self.idx_directory.clone();
let ec_vol = self
.ec_volumes
.entry(vid)
.or_insert_with(|| EcVolume::new(&dir, &dir, collection, vid).unwrap());
.or_insert_with(|| EcVolume::new(&dir, &idx_dir, collection, vid).unwrap());
for &shard_id in shard_ids {
let shard = EcVolumeShard::new(&dir, collection, vid, shard_id as u8);

53
seaweed-volume/src/storage/store.rs

@ -147,6 +147,20 @@ impl Store {
best.map(|(i, _)| i)
}
/// Find a free location matching a predicate.
/// Matches Go's Store.FindFreeLocation: finds first location passing the filter.
pub fn find_free_location_predicate<F>(&self, pred: F) -> Option<usize>
where
F: Fn(&DiskLocation) -> bool,
{
for (i, loc) in self.locations.iter().enumerate() {
if pred(loc) {
return Some(i);
}
}
None
}
/// Create a new volume, placing it on the location with the most free space.
pub fn add_volume(
&mut self,
@ -467,7 +481,7 @@ impl Store {
// ---- EC volume operations ----
/// Mount EC shards for a volume.
/// Mount EC shards for a volume (batch).
pub fn mount_ec_shards(
&mut self,
vid: VolumeId,
@ -485,7 +499,29 @@ impl Store {
self.locations[loc_idx].mount_ec_shards(vid, collection, shard_ids)
}
/// Unmount EC shards for a volume.
/// Mount a single EC shard, searching all locations for the shard file.
/// Matches Go's Store.MountEcShards which mounts one shard at a time.
pub fn mount_ec_shard(
&mut self,
vid: VolumeId,
collection: &str,
shard_id: u32,
) -> Result<(), VolumeError> {
for loc in &mut self.locations {
// Check if the shard file exists on this location
let shard = EcVolumeShard::new(&loc.directory, collection, vid, shard_id as u8);
if std::path::Path::new(&shard.file_name()).exists() {
loc.mount_ec_shards(vid, collection, &[shard_id])?;
return Ok(());
}
}
Err(VolumeError::Io(io::Error::new(
io::ErrorKind::NotFound,
format!("MountEcShards {}.{} not found on disk", vid, shard_id),
)))
}
/// Unmount EC shards for a volume (batch).
pub fn unmount_ec_shards(&mut self, vid: VolumeId, shard_ids: &[u32]) {
for loc in &mut self.locations {
if loc.has_ec_volume(vid) {
@ -495,6 +531,19 @@ impl Store {
}
}
/// Unmount a single EC shard, searching all locations.
/// Matches Go's Store.UnmountEcShards which unmounts one shard at a time.
pub fn unmount_ec_shard(&mut self, vid: VolumeId, shard_id: u32) -> Result<(), VolumeError> {
for loc in &mut self.locations {
if loc.has_ec_volume(vid) {
loc.unmount_ec_shards(vid, &[shard_id]);
return Ok(());
}
}
// Go returns nil if shard not found (no error)
Ok(())
}
/// Find an EC volume across all locations.
pub fn find_ec_volume(&self, vid: VolumeId) -> Option<&EcVolume> {
for loc in &self.locations {

Loading…
Cancel
Save