diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 236ab2025..33dcef654 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -11,6 +11,8 @@ use std::sync::Arc; use tokio_stream::Stream; use tonic::{Request, Response, Status, Streaming}; +use crate::pb::master_pb; +use crate::pb::master_pb::seaweed_client::SeaweedClient; use crate::pb::volume_server_pb; use crate::pb::volume_server_pb::volume_server_server::VolumeServer; use crate::storage::needle::needle::{self, Needle}; @@ -20,10 +22,65 @@ use super::volume_server::VolumeServerState; type BoxStream = Pin> + Send + 'static>>; +struct MasterVolumeInfo { + volume_id: VolumeId, + collection: String, + replica_placement: u8, + ttl: u32, + disk_type: String, + ip: String, + port: u16, +} + pub struct VolumeGrpcService { pub state: Arc, } +impl VolumeGrpcService { + async fn notify_master_volume_readonly( + &self, + info: &MasterVolumeInfo, + is_readonly: bool, + ) -> Result<(), Status> { + let master_url = self.state.master_url.clone(); + if master_url.is_empty() { + return Ok(()); + } + let grpc_addr = parse_grpc_address(&master_url).map_err(|e| { + Status::internal(format!("invalid master address {}: {}", master_url, e)) + })?; + let endpoint = tonic::transport::Channel::from_shared(format!("http://{}", grpc_addr)) + .map_err(|e| Status::internal(format!("master address {}: {}", master_url, e)))? + .connect_timeout(std::time::Duration::from_secs(5)) + .timeout(std::time::Duration::from_secs(30)); + let channel = endpoint + .connect() + .await + .map_err(|e| Status::internal(format!("connect to master {}: {}", master_url, e)))?; + let mut client = SeaweedClient::new(channel); + client + .volume_mark_readonly(master_pb::VolumeMarkReadonlyRequest { + ip: info.ip.clone(), + port: info.port as u32, + volume_id: info.volume_id.0, + collection: info.collection.clone(), + replica_placement: info.replica_placement as u32, + ttl: info.ttl, + disk_type: info.disk_type.clone(), + is_readonly, + ..Default::default() + }) + .await + .map_err(|e| { + Status::internal(format!( + "set volume {} readonly={} on master {}: {}", + info.volume_id, is_readonly, master_url, e + )) + })?; + Ok(()) + } +} + #[tonic::async_trait] impl VolumeServer for VolumeGrpcService { // ---- Core volume operations ---- @@ -544,13 +601,37 @@ impl VolumeServer for VolumeGrpcService { self.state.check_maintenance()?; let req = request.into_inner(); let vid = VolumeId(req.volume_id); - let mut store = self.state.store.write().unwrap(); - let (_, vol) = store - .find_volume_mut(vid) - .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; - vol.set_read_only_persist(req.persist); - drop(store); + let info = { + let store = self.state.store.read().unwrap(); + let (loc_idx, vol) = store + .find_volume(vid) + .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; + MasterVolumeInfo { + volume_id: vid, + collection: vol.collection.clone(), + replica_placement: vol.super_block.replica_placement.to_byte(), + ttl: vol.super_block.ttl.to_u32(), + disk_type: store.locations[loc_idx].disk_type.to_string(), + ip: store.ip.clone(), + port: store.port, + } + }; + + // Step 1: stop master from redirecting traffic here + self.notify_master_volume_readonly(&info, true).await?; + + // Step 2: mark local volume as readonly + { + let mut store = self.state.store.write().unwrap(); + let (_, vol) = store + .find_volume_mut(vid) + .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; + vol.set_read_only_persist(req.persist); + } self.state.volume_state_notify.notify_one(); + + // Step 3: tell master again to cover race with heartbeat + self.notify_master_volume_readonly(&info, true).await?; Ok(Response::new( volume_server_pb::VolumeMarkReadonlyResponse {}, )) @@ -563,13 +644,33 @@ impl VolumeServer for VolumeGrpcService { self.state.check_maintenance()?; let req = request.into_inner(); let vid = VolumeId(req.volume_id); - let mut store = self.state.store.write().unwrap(); - let (_, vol) = store - .find_volume_mut(vid) - .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; - vol.set_writable(); - drop(store); + let info = { + let store = self.state.store.read().unwrap(); + let (loc_idx, vol) = store + .find_volume(vid) + .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; + MasterVolumeInfo { + volume_id: vid, + collection: vol.collection.clone(), + replica_placement: vol.super_block.replica_placement.to_byte(), + ttl: vol.super_block.ttl.to_u32(), + disk_type: store.locations[loc_idx].disk_type.to_string(), + ip: store.ip.clone(), + port: store.port, + } + }; + + { + let mut store = self.state.store.write().unwrap(); + let (_, vol) = store + .find_volume_mut(vid) + .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; + vol.set_writable(); + } self.state.volume_state_notify.notify_one(); + + // enable master to redirect traffic here + self.notify_master_volume_readonly(&info, false).await?; Ok(Response::new( volume_server_pb::VolumeMarkWritableResponse {}, ))