From 49a6e459a2c3ebd1653a13d65cece052511bb4c6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Mar 2026 15:49:06 -0800 Subject: [PATCH] Implement 6 more gRPC RPCs: mark readonly/writable, configure, write/read needle, EC generate Adds set_read_only, set_writable, set_replica_placement, and write_needle_blob methods to Volume. Makes find_volume_mut public on Store. Fills in previously stubbed RPCs: volume_mark_readonly, volume_mark_writable, volume_configure, write_needle_blob, read_needle_meta, volume_ec_shards_generate. --- seaweed-volume/src/server/grpc_server.rs | 98 +++++++++++++++++++++--- seaweed-volume/src/storage/store.rs | 2 +- seaweed-volume/src/storage/volume.rs | 37 +++++++++ 3 files changed, 124 insertions(+), 13 deletions(-) diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index d4a61d828..abd8b0a71 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -210,23 +210,51 @@ impl VolumeServer for VolumeGrpcService { async fn volume_mark_readonly( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("volume_mark_readonly not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + let mut store = self.state.store.write().unwrap(); + let (_, vol) = store.find_volume_mut(vid) + .ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?; + vol.set_read_only(); + Ok(Response::new(volume_server_pb::VolumeMarkReadonlyResponse {})) } async fn volume_mark_writable( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("volume_mark_writable not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + let mut store = self.state.store.write().unwrap(); + let (_, vol) = store.find_volume_mut(vid) + .ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?; + vol.set_writable(); + Ok(Response::new(volume_server_pb::VolumeMarkWritableResponse {})) } async fn volume_configure( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("volume_configure not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + let rp = crate::storage::super_block::ReplicaPlacement::from_string(&req.replication) + .map_err(|e| Status::invalid_argument(e.to_string()))?; + + let mut store = self.state.store.write().unwrap(); + let (_, vol) = store.find_volume_mut(vid) + .ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?; + + match vol.set_replica_placement(rp) { + Ok(()) => Ok(Response::new(volume_server_pb::VolumeConfigureResponse { + error: String::new(), + })), + Err(e) => Ok(Response::new(volume_server_pb::VolumeConfigureResponse { + error: e.to_string(), + })), + } } async fn volume_status( @@ -338,16 +366,47 @@ impl VolumeServer for VolumeGrpcService { async fn read_needle_meta( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("read_needle_meta not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + let needle_id = NeedleId(req.needle_id); + + let store = self.state.store.read().unwrap(); + let mut n = Needle { id: needle_id, ..Needle::default() }; + match store.read_volume_needle(vid, &mut n) { + Ok(_) => { + let ttl_str = n.ttl.as_ref().map_or(String::new(), |t| t.to_string()); + Ok(Response::new(volume_server_pb::ReadNeedleMetaResponse { + cookie: n.cookie.0, + last_modified: n.last_modified, + crc: n.checksum.0, + ttl: ttl_str, + append_at_ns: n.append_at_ns, + })) + } + Err(e) => Err(Status::not_found(e.to_string())), + } } async fn write_needle_blob( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("write_needle_blob not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + + let mut store = self.state.store.write().unwrap(); + let (_, vol) = store.find_volume_mut(vid) + .ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?; + + // Write the raw needle blob at the end of the dat file (append) + let dat_size = vol.dat_file_size() + .map_err(|e| Status::internal(e.to_string()))? as i64; + vol.write_needle_blob(dat_size, &req.needle_blob) + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new(volume_server_pb::WriteNeedleBlobResponse {})) } type ReadAllNeedlesStream = BoxStream; @@ -377,9 +436,24 @@ impl VolumeServer for VolumeGrpcService { async fn volume_ec_shards_generate( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("volume_ec_shards_generate not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + let collection = &req.collection; + + // Find the volume's directory + let dir = { + let store = self.state.store.read().unwrap(); + let (loc_idx, _) = store.find_volume(vid) + .ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?; + store.locations[loc_idx].directory.clone() + }; + + crate::storage::erasure_coding::ec_encoder::write_ec_files(&dir, collection, vid) + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new(volume_server_pb::VolumeEcShardsGenerateResponse {})) } async fn volume_ec_shards_rebuild( diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index 06c3dfac8..3dc13854d 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -69,7 +69,7 @@ impl Store { } /// Find which location contains a volume (mutable). - fn find_volume_mut(&mut self, vid: VolumeId) -> Option<(usize, &mut crate::storage::volume::Volume)> { + pub fn find_volume_mut(&mut self, vid: VolumeId) -> Option<(usize, &mut crate::storage::volume::Volume)> { for (i, loc) in self.locations.iter_mut().enumerate() { if let Some(v) = loc.find_volume_mut(vid) { return Some((i, v)); diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index e62ed267d..2e7a4ef12 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -590,6 +590,43 @@ impl Volume { self.no_write_or_delete || self.no_write_can_delete } + /// Mark this volume as read-only (no writes or deletes). + pub fn set_read_only(&mut self) { + self.no_write_or_delete = true; + } + + /// Mark this volume as writable (allow writes and deletes). + pub fn set_writable(&mut self) { + self.no_write_or_delete = false; + self.no_write_can_delete = false; + } + + /// Change the replication placement and rewrite the super block. + pub fn set_replica_placement(&mut self, rp: ReplicaPlacement) -> Result<(), VolumeError> { + self.super_block.replica_placement = rp; + let bytes = self.super_block.to_bytes(); + let dat_file = self.dat_file.as_mut().ok_or_else(|| { + VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open")) + })?; + dat_file.seek(SeekFrom::Start(0))?; + dat_file.write_all(&bytes)?; + dat_file.sync_all()?; + Ok(()) + } + + /// Write a raw needle blob at a specific offset in the .dat file. + pub fn write_needle_blob(&mut self, offset: i64, needle_blob: &[u8]) -> Result<(), VolumeError> { + if self.no_write_or_delete { + return Err(VolumeError::ReadOnly); + } + let dat_file = self.dat_file.as_mut().ok_or_else(|| { + VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open")) + })?; + dat_file.seek(SeekFrom::Start(offset as u64))?; + dat_file.write_all(needle_blob)?; + Ok(()) + } + pub fn needs_replication(&self) -> bool { self.super_block.replica_placement.get_copy_count() > 1 }