Browse Source

fix volume copy disk placement and heartbeat notification

rust-volume-server
Chris Lu 1 week ago
parent
commit
6a5d4d3f87
  1. 43
      seaweed-volume/src/server/grpc_server.rs

43
seaweed-volume/src/server/grpc_server.rs

@ -916,6 +916,7 @@ impl VolumeServer for VolumeGrpcService {
store.delete_volume(vid, false).map_err(|e| {
Status::internal(format!("failed to delete existing volume {}: {}", vid, e))
})?;
self.state.volume_state_notify.notify_one();
}
}
@ -958,28 +959,30 @@ impl VolumeServer for VolumeGrpcService {
.map_err(|e| Status::internal(format!("read volume file status failed, {}", e)))?
.into_inner();
let disk_type = if !req.disk_type.is_empty() {
&req.disk_type
let requested_disk_type = if !req.disk_type.is_empty() {
DiskType::from_string(&req.disk_type)
} else {
&vol_info.disk_type
DiskType::from_string(&vol_info.disk_type)
};
// Find a free disk location
let (data_base, idx_base) = {
// Find a free disk location using Go's Store.FindFreeLocation semantics.
let (data_base, idx_base, selected_disk_type) = {
let store = self.state.store.read().unwrap();
let mut found = None;
for loc in &store.locations {
if format!("{:?}", loc.disk_type) == *disk_type
|| disk_type.is_empty()
|| disk_type == "0"
{
if loc.available_space.load(Ordering::Relaxed) > vol_info.dat_file_size {
found = Some((loc.directory.clone(), loc.idx_directory.clone()));
break;
}
}
}
found.ok_or_else(|| Status::internal(format!("no space left {}", disk_type)))?
let Some(loc_idx) = store.find_free_location_predicate(|loc| {
loc.disk_type == requested_disk_type
&& loc.available_space.load(Ordering::Relaxed) > vol_info.dat_file_size
}) else {
return Err(Status::internal(format!(
"no space left {}",
requested_disk_type.readable_string()
)));
};
let loc = &store.locations[loc_idx];
(
loc.directory.clone(),
loc.idx_directory.clone(),
loc.disk_type.clone(),
)
};
let data_base_name =
@ -1146,14 +1149,14 @@ impl VolumeServer for VolumeGrpcService {
// Mount the volume
{
let disk_type_enum = DiskType::default();
let mut store = state.store.write().unwrap();
store
.mount_volume(vid, &vol_info.collection, disk_type_enum)
.mount_volume(vid, &vol_info.collection, selected_disk_type)
.map_err(|e| {
Status::internal(format!("failed to mount volume {}: {}", vid, e))
})?;
}
state.volume_state_notify.notify_one();
// Send final response with last_append_at_ns
let _ = tx

Loading…
Cancel
Save