|
|
|
@ -11,6 +11,8 @@ use std::sync::Arc; |
|
|
|
use tokio_stream::Stream;
|
|
|
|
use tonic::{Request, Response, Status, Streaming};
|
|
|
|
|
|
|
|
use crate::pb::master_pb;
|
|
|
|
use crate::pb::master_pb::seaweed_client::SeaweedClient;
|
|
|
|
use crate::pb::volume_server_pb;
|
|
|
|
use crate::pb::volume_server_pb::volume_server_server::VolumeServer;
|
|
|
|
use crate::storage::needle::needle::{self, Needle};
|
|
|
|
@ -20,10 +22,65 @@ use super::volume_server::VolumeServerState; |
|
|
|
|
|
|
|
type BoxStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + 'static>>;
|
|
|
|
|
|
|
|
struct MasterVolumeInfo {
|
|
|
|
volume_id: VolumeId,
|
|
|
|
collection: String,
|
|
|
|
replica_placement: u8,
|
|
|
|
ttl: u32,
|
|
|
|
disk_type: String,
|
|
|
|
ip: String,
|
|
|
|
port: u16,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct VolumeGrpcService {
|
|
|
|
pub state: Arc<VolumeServerState>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl VolumeGrpcService {
|
|
|
|
async fn notify_master_volume_readonly(
|
|
|
|
&self,
|
|
|
|
info: &MasterVolumeInfo,
|
|
|
|
is_readonly: bool,
|
|
|
|
) -> Result<(), Status> {
|
|
|
|
let master_url = self.state.master_url.clone();
|
|
|
|
if master_url.is_empty() {
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
let grpc_addr = parse_grpc_address(&master_url).map_err(|e| {
|
|
|
|
Status::internal(format!("invalid master address {}: {}", master_url, e))
|
|
|
|
})?;
|
|
|
|
let endpoint = tonic::transport::Channel::from_shared(format!("http://{}", grpc_addr))
|
|
|
|
.map_err(|e| Status::internal(format!("master address {}: {}", master_url, e)))?
|
|
|
|
.connect_timeout(std::time::Duration::from_secs(5))
|
|
|
|
.timeout(std::time::Duration::from_secs(30));
|
|
|
|
let channel = endpoint
|
|
|
|
.connect()
|
|
|
|
.await
|
|
|
|
.map_err(|e| Status::internal(format!("connect to master {}: {}", master_url, e)))?;
|
|
|
|
let mut client = SeaweedClient::new(channel);
|
|
|
|
client
|
|
|
|
.volume_mark_readonly(master_pb::VolumeMarkReadonlyRequest {
|
|
|
|
ip: info.ip.clone(),
|
|
|
|
port: info.port as u32,
|
|
|
|
volume_id: info.volume_id.0,
|
|
|
|
collection: info.collection.clone(),
|
|
|
|
replica_placement: info.replica_placement as u32,
|
|
|
|
ttl: info.ttl,
|
|
|
|
disk_type: info.disk_type.clone(),
|
|
|
|
is_readonly,
|
|
|
|
..Default::default()
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(|e| {
|
|
|
|
Status::internal(format!(
|
|
|
|
"set volume {} readonly={} on master {}: {}",
|
|
|
|
info.volume_id, is_readonly, master_url, e
|
|
|
|
))
|
|
|
|
})?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tonic::async_trait]
|
|
|
|
impl VolumeServer for VolumeGrpcService {
|
|
|
|
// ---- Core volume operations ----
|
|
|
|
@ -544,13 +601,37 @@ impl VolumeServer for VolumeGrpcService { |
|
|
|
self.state.check_maintenance()?;
|
|
|
|
let req = request.into_inner();
|
|
|
|
let vid = VolumeId(req.volume_id);
|
|
|
|
let mut store = self.state.store.write().unwrap();
|
|
|
|
let (_, vol) = store
|
|
|
|
.find_volume_mut(vid)
|
|
|
|
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
|
|
|
|
vol.set_read_only_persist(req.persist);
|
|
|
|
drop(store);
|
|
|
|
let info = {
|
|
|
|
let store = self.state.store.read().unwrap();
|
|
|
|
let (loc_idx, vol) = store
|
|
|
|
.find_volume(vid)
|
|
|
|
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
|
|
|
|
MasterVolumeInfo {
|
|
|
|
volume_id: vid,
|
|
|
|
collection: vol.collection.clone(),
|
|
|
|
replica_placement: vol.super_block.replica_placement.to_byte(),
|
|
|
|
ttl: vol.super_block.ttl.to_u32(),
|
|
|
|
disk_type: store.locations[loc_idx].disk_type.to_string(),
|
|
|
|
ip: store.ip.clone(),
|
|
|
|
port: store.port,
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
// Step 1: stop master from redirecting traffic here
|
|
|
|
self.notify_master_volume_readonly(&info, true).await?;
|
|
|
|
|
|
|
|
// Step 2: mark local volume as readonly
|
|
|
|
{
|
|
|
|
let mut store = self.state.store.write().unwrap();
|
|
|
|
let (_, vol) = store
|
|
|
|
.find_volume_mut(vid)
|
|
|
|
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
|
|
|
|
vol.set_read_only_persist(req.persist);
|
|
|
|
}
|
|
|
|
self.state.volume_state_notify.notify_one();
|
|
|
|
|
|
|
|
// Step 3: tell master again to cover race with heartbeat
|
|
|
|
self.notify_master_volume_readonly(&info, true).await?;
|
|
|
|
Ok(Response::new(
|
|
|
|
volume_server_pb::VolumeMarkReadonlyResponse {},
|
|
|
|
))
|
|
|
|
@ -563,13 +644,33 @@ impl VolumeServer for VolumeGrpcService { |
|
|
|
self.state.check_maintenance()?;
|
|
|
|
let req = request.into_inner();
|
|
|
|
let vid = VolumeId(req.volume_id);
|
|
|
|
let mut store = self.state.store.write().unwrap();
|
|
|
|
let (_, vol) = store
|
|
|
|
.find_volume_mut(vid)
|
|
|
|
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
|
|
|
|
vol.set_writable();
|
|
|
|
drop(store);
|
|
|
|
let info = {
|
|
|
|
let store = self.state.store.read().unwrap();
|
|
|
|
let (loc_idx, vol) = store
|
|
|
|
.find_volume(vid)
|
|
|
|
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
|
|
|
|
MasterVolumeInfo {
|
|
|
|
volume_id: vid,
|
|
|
|
collection: vol.collection.clone(),
|
|
|
|
replica_placement: vol.super_block.replica_placement.to_byte(),
|
|
|
|
ttl: vol.super_block.ttl.to_u32(),
|
|
|
|
disk_type: store.locations[loc_idx].disk_type.to_string(),
|
|
|
|
ip: store.ip.clone(),
|
|
|
|
port: store.port,
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
{
|
|
|
|
let mut store = self.state.store.write().unwrap();
|
|
|
|
let (_, vol) = store
|
|
|
|
.find_volume_mut(vid)
|
|
|
|
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
|
|
|
|
vol.set_writable();
|
|
|
|
}
|
|
|
|
self.state.volume_state_notify.notify_one();
|
|
|
|
|
|
|
|
// enable master to redirect traffic here
|
|
|
|
self.notify_master_volume_readonly(&info, false).await?;
|
|
|
|
Ok(Response::new(
|
|
|
|
volume_server_pb::VolumeMarkWritableResponse {},
|
|
|
|
))
|
|
|
|
|