Browse Source

Match Go Destroy: add only_empty parameter to reject non-empty volume deletion

rust-volume-server
Chris Lu 3 days ago
parent
commit
8b6a4a2528
  1. 10
      seaweed-volume/src/server/grpc_server.rs
  2. 4
      seaweed-volume/src/server/heartbeat.rs
  3. 8
      seaweed-volume/src/storage/disk_location.rs
  4. 4
      seaweed-volume/src/storage/store.rs
  5. 19
      seaweed-volume/src/storage/volume.rs

10
seaweed-volume/src/server/grpc_server.rs

@ -653,7 +653,7 @@ impl VolumeServer for VolumeGrpcService {
}
}
store
.delete_volume(vid)
.delete_volume(vid, false)
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(volume_server_pb::VolumeDeleteResponse {}))
}
@ -690,7 +690,8 @@ impl VolumeServer for VolumeGrpcService {
let (_, vol) = store
.find_volume_mut(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
vol.set_read_only_persist(req.persist);
vol.set_read_only_persist(req.persist)
.map_err(|e| Status::internal(e.to_string()))?;
}
self.state.volume_state_notify.notify_one();
@ -729,7 +730,8 @@ impl VolumeServer for VolumeGrpcService {
let (_, vol) = store
.find_volume_mut(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
vol.set_writable();
vol.set_writable()
.map_err(|e| Status::internal(e.to_string()))?;
}
self.state.volume_state_notify.notify_one();
@ -876,7 +878,7 @@ impl VolumeServer for VolumeGrpcService {
if store.find_volume(vid).is_some() {
drop(store);
let mut store = self.state.store.write().unwrap();
store.delete_volume(vid).map_err(|e| {
store.delete_volume(vid, false).map_err(|e| {
Status::internal(format!("failed to delete existing volume {}: {}", vid, e))
})?;
}

4
seaweed-volume/src/server/heartbeat.rs

@ -845,7 +845,7 @@ fn build_heartbeat_with_ec_status(
}
for vid in delete_vids {
let _ = loc.delete_volume(vid);
let _ = loc.delete_volume(vid, false);
}
}
@ -1135,7 +1135,7 @@ mod tests {
{
let (_, volume) = store.find_volume_mut(VolumeId(17)).unwrap();
volume.set_read_only();
volume.set_read_only().unwrap();
volume.volume_info.files.push(Default::default());
volume.refresh_remote_write_mode();
}

8
seaweed-volume/src/storage/disk_location.rs

@ -375,12 +375,12 @@ impl DiskLocation {
}
/// Remove, close, and delete all files for a volume.
pub fn delete_volume(&mut self, vid: VolumeId) -> Result<(), VolumeError> {
pub fn delete_volume(&mut self, vid: VolumeId, only_empty: bool) -> Result<(), VolumeError> {
if let Some(mut v) = self.volumes.remove(&vid) {
crate::metrics::VOLUME_GAUGE
.with_label_values(&[&v.collection, "volume"])
.dec();
v.destroy()?;
v.destroy(only_empty)?;
Ok(())
} else {
Err(VolumeError::NotFound)
@ -401,7 +401,7 @@ impl DiskLocation {
crate::metrics::VOLUME_GAUGE
.with_label_values(&[&v.collection, "volume"])
.dec();
v.destroy()?;
v.destroy(false)?;
}
}
@ -823,7 +823,7 @@ mod tests {
.unwrap();
assert_eq!(loc.volumes_len(), 2);
loc.delete_volume(VolumeId(1)).unwrap();
loc.delete_volume(VolumeId(1), false).unwrap();
assert_eq!(loc.volumes_len(), 1);
assert!(loc.find_volume(VolumeId(1)).is_none());
}

4
seaweed-volume/src/storage/store.rs

@ -197,10 +197,10 @@ impl Store {
}
/// Delete a volume from any location.
pub fn delete_volume(&mut self, vid: VolumeId) -> Result<(), VolumeError> {
pub fn delete_volume(&mut self, vid: VolumeId, only_empty: bool) -> Result<(), VolumeError> {
for loc in &mut self.locations {
if loc.find_volume(vid).is_some() {
return loc.delete_volume(vid);
return loc.delete_volume(vid, only_empty);
}
}
Err(VolumeError::NotFound)

19
seaweed-volume/src/storage/volume.rs

@ -2029,7 +2029,7 @@ impl Volume {
pub fn set_read_only_persist(&mut self, persist: bool) {
self.no_write_or_delete = true;
if persist {
self.save_vif();
let _ = self.save_vif();
}
}
@ -2037,7 +2037,7 @@ impl Volume {
pub fn set_writable(&mut self) {
self.no_write_or_delete = false;
self.no_write_can_delete = self.has_remote_file;
self.save_vif();
let _ = self.save_vif();
}
/// Recompute the Go-style write/delete mode from the current remote tier state.
@ -2138,12 +2138,13 @@ impl Volume {
let vif_path = self.vif_path();
// Match Go: if file exists but is not writable, return an error
if vif_path.exists() {
let metadata = fs::metadata(&vif_path)?;
let path = std::path::Path::new(&vif_path);
if path.exists() {
let metadata = fs::metadata(path)?;
if metadata.permissions().readonly() {
return Err(VolumeError::Io(io::Error::new(
io::ErrorKind::PermissionDenied,
format!("failed to check {} not writable", vif_path.display()),
format!("failed to check {} not writable", vif_path),
)));
}
}
@ -3464,7 +3465,7 @@ mod tests {
dat_path = v.file_name(".dat");
idx_path = v.file_name(".idx");
assert!(Path::new(&dat_path).exists());
v.destroy().unwrap();
v.destroy(false).unwrap();
}
assert!(!Path::new(&dat_path).exists());
@ -3811,7 +3812,7 @@ mod tests {
extension: ".dat".to_string(),
});
v.refresh_remote_write_mode();
v.set_writable();
v.set_writable().unwrap();
assert!(v.is_read_only());
assert!(!v.no_write_or_delete);
@ -4072,7 +4073,7 @@ mod tests {
assert!(std::path::Path::new(&idx_path).exists());
// Destroy the volume
v.destroy().unwrap();
v.destroy(false).unwrap();
// .dat and .idx should be gone
assert!(
@ -4129,7 +4130,7 @@ mod tests {
assert!(std::path::Path::new(&dat_path).exists());
assert!(std::path::Path::new(&idx_path).exists());
v.destroy().unwrap();
v.destroy(false).unwrap();
assert!(
!std::path::Path::new(&dat_path).exists(),

Loading…
Cancel
Save