Browse Source

Implement 6 more gRPC RPCs: mark readonly/writable, configure, write/read needle, EC generate

Adds set_read_only, set_writable, set_replica_placement, and write_needle_blob
methods to Volume. Makes find_volume_mut public on Store. Fills in previously
stubbed RPCs: volume_mark_readonly, volume_mark_writable, volume_configure,
write_needle_blob, read_needle_meta, volume_ec_shards_generate.
rust-volume-server
Chris Lu 4 days ago
parent
commit
49a6e459a2
  1. 98
      seaweed-volume/src/server/grpc_server.rs
  2. 2
      seaweed-volume/src/storage/store.rs
  3. 37
      seaweed-volume/src/storage/volume.rs

98
seaweed-volume/src/server/grpc_server.rs

@ -210,23 +210,51 @@ impl VolumeServer for VolumeGrpcService {
async fn volume_mark_readonly(
&self,
_request: Request<volume_server_pb::VolumeMarkReadonlyRequest>,
request: Request<volume_server_pb::VolumeMarkReadonlyRequest>,
) -> Result<Response<volume_server_pb::VolumeMarkReadonlyResponse>, Status> {
Err(Status::unimplemented("volume_mark_readonly not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let mut store = self.state.store.write().unwrap();
let (_, vol) = store.find_volume_mut(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
vol.set_read_only();
Ok(Response::new(volume_server_pb::VolumeMarkReadonlyResponse {}))
}
async fn volume_mark_writable(
&self,
_request: Request<volume_server_pb::VolumeMarkWritableRequest>,
request: Request<volume_server_pb::VolumeMarkWritableRequest>,
) -> Result<Response<volume_server_pb::VolumeMarkWritableResponse>, Status> {
Err(Status::unimplemented("volume_mark_writable not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let mut store = self.state.store.write().unwrap();
let (_, vol) = store.find_volume_mut(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
vol.set_writable();
Ok(Response::new(volume_server_pb::VolumeMarkWritableResponse {}))
}
async fn volume_configure(
&self,
_request: Request<volume_server_pb::VolumeConfigureRequest>,
request: Request<volume_server_pb::VolumeConfigureRequest>,
) -> Result<Response<volume_server_pb::VolumeConfigureResponse>, Status> {
Err(Status::unimplemented("volume_configure not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let rp = crate::storage::super_block::ReplicaPlacement::from_string(&req.replication)
.map_err(|e| Status::invalid_argument(e.to_string()))?;
let mut store = self.state.store.write().unwrap();
let (_, vol) = store.find_volume_mut(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
match vol.set_replica_placement(rp) {
Ok(()) => Ok(Response::new(volume_server_pb::VolumeConfigureResponse {
error: String::new(),
})),
Err(e) => Ok(Response::new(volume_server_pb::VolumeConfigureResponse {
error: e.to_string(),
})),
}
}
async fn volume_status(
@ -338,16 +366,47 @@ impl VolumeServer for VolumeGrpcService {
async fn read_needle_meta(
&self,
_request: Request<volume_server_pb::ReadNeedleMetaRequest>,
request: Request<volume_server_pb::ReadNeedleMetaRequest>,
) -> Result<Response<volume_server_pb::ReadNeedleMetaResponse>, Status> {
Err(Status::unimplemented("read_needle_meta not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let needle_id = NeedleId(req.needle_id);
let store = self.state.store.read().unwrap();
let mut n = Needle { id: needle_id, ..Needle::default() };
match store.read_volume_needle(vid, &mut n) {
Ok(_) => {
let ttl_str = n.ttl.as_ref().map_or(String::new(), |t| t.to_string());
Ok(Response::new(volume_server_pb::ReadNeedleMetaResponse {
cookie: n.cookie.0,
last_modified: n.last_modified,
crc: n.checksum.0,
ttl: ttl_str,
append_at_ns: n.append_at_ns,
}))
}
Err(e) => Err(Status::not_found(e.to_string())),
}
}
async fn write_needle_blob(
&self,
_request: Request<volume_server_pb::WriteNeedleBlobRequest>,
request: Request<volume_server_pb::WriteNeedleBlobRequest>,
) -> Result<Response<volume_server_pb::WriteNeedleBlobResponse>, Status> {
Err(Status::unimplemented("write_needle_blob not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let mut store = self.state.store.write().unwrap();
let (_, vol) = store.find_volume_mut(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
// Write the raw needle blob at the end of the dat file (append)
let dat_size = vol.dat_file_size()
.map_err(|e| Status::internal(e.to_string()))? as i64;
vol.write_needle_blob(dat_size, &req.needle_blob)
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(volume_server_pb::WriteNeedleBlobResponse {}))
}
type ReadAllNeedlesStream = BoxStream<volume_server_pb::ReadAllNeedlesResponse>;
@ -377,9 +436,24 @@ impl VolumeServer for VolumeGrpcService {
async fn volume_ec_shards_generate(
&self,
_request: Request<volume_server_pb::VolumeEcShardsGenerateRequest>,
request: Request<volume_server_pb::VolumeEcShardsGenerateRequest>,
) -> Result<Response<volume_server_pb::VolumeEcShardsGenerateResponse>, Status> {
Err(Status::unimplemented("volume_ec_shards_generate not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let collection = &req.collection;
// Find the volume's directory
let dir = {
let store = self.state.store.read().unwrap();
let (loc_idx, _) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?;
store.locations[loc_idx].directory.clone()
};
crate::storage::erasure_coding::ec_encoder::write_ec_files(&dir, collection, vid)
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(volume_server_pb::VolumeEcShardsGenerateResponse {}))
}
async fn volume_ec_shards_rebuild(

2
seaweed-volume/src/storage/store.rs

@ -69,7 +69,7 @@ impl Store {
}
/// Find which location contains a volume (mutable).
fn find_volume_mut(&mut self, vid: VolumeId) -> Option<(usize, &mut crate::storage::volume::Volume)> {
pub fn find_volume_mut(&mut self, vid: VolumeId) -> Option<(usize, &mut crate::storage::volume::Volume)> {
for (i, loc) in self.locations.iter_mut().enumerate() {
if let Some(v) = loc.find_volume_mut(vid) {
return Some((i, v));

37
seaweed-volume/src/storage/volume.rs

@ -590,6 +590,43 @@ impl Volume {
self.no_write_or_delete || self.no_write_can_delete
}
/// Mark this volume as read-only (no writes or deletes).
pub fn set_read_only(&mut self) {
self.no_write_or_delete = true;
}
/// Mark this volume as writable (allow writes and deletes).
pub fn set_writable(&mut self) {
self.no_write_or_delete = false;
self.no_write_can_delete = false;
}
/// Change the replication placement and rewrite the super block.
pub fn set_replica_placement(&mut self, rp: ReplicaPlacement) -> Result<(), VolumeError> {
self.super_block.replica_placement = rp;
let bytes = self.super_block.to_bytes();
let dat_file = self.dat_file.as_mut().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
dat_file.seek(SeekFrom::Start(0))?;
dat_file.write_all(&bytes)?;
dat_file.sync_all()?;
Ok(())
}
/// Write a raw needle blob at a specific offset in the .dat file.
pub fn write_needle_blob(&mut self, offset: i64, needle_blob: &[u8]) -> Result<(), VolumeError> {
if self.no_write_or_delete {
return Err(VolumeError::ReadOnly);
}
let dat_file = self.dat_file.as_mut().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
dat_file.seek(SeekFrom::Start(offset as u64))?;
dat_file.write_all(needle_blob)?;
Ok(())
}
pub fn needs_replication(&self) -> bool {
self.super_block.replica_placement.get_copy_count() > 1
}

Loading…
Cancel
Save