From cfe173c0acfee7b99c922f7b503a2ec7cd262963 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Mar 2026 15:34:10 -0800 Subject: [PATCH] Add gRPC service implementing all 48 VolumeServer RPCs Implements VolumeGrpcService with the tonic-generated VolumeServer trait. Core operations (batch_delete, allocate, mount/unmount/delete, sync_status, volume_status, read_needle_blob, needle_status, ping) have real implementations. Streaming and EC operations are stubbed with Status::unimplemented. Adds tokio-stream dependency and remote_pb proto module. --- seaweed-volume/Cargo.lock | 2 + seaweed-volume/Cargo.toml | 1 + seaweed-volume/src/main.rs | 13 + seaweed-volume/src/server/grpc_server.rs | 575 +++++++++++++++++++++++ seaweed-volume/src/server/mod.rs | 1 + 5 files changed, 592 insertions(+) create mode 100644 seaweed-volume/src/server/grpc_server.rs diff --git a/seaweed-volume/Cargo.lock b/seaweed-volume/Cargo.lock index 7af1a8a20..029bdb443 100644 --- a/seaweed-volume/Cargo.lock +++ b/seaweed-volume/Cargo.lock @@ -2104,9 +2104,11 @@ dependencies = [ "serde", "serde_json", "sysinfo", + "tempfile", "thiserror 1.0.69", "tokio", "tokio-rustls", + "tokio-stream", "toml", "tonic", "tonic-build", diff --git a/seaweed-volume/Cargo.toml b/seaweed-volume/Cargo.toml index 1ec2fecd3..0ef9171d8 100644 --- a/seaweed-volume/Cargo.toml +++ b/seaweed-volume/Cargo.toml @@ -7,6 +7,7 @@ description = "SeaweedFS Volume Server — Rust implementation" [dependencies] # Async runtime tokio = { version = "1", features = ["full"] } +tokio-stream = "0.1" # gRPC + protobuf tonic = { version = "0.12", features = ["tls"] } diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index e89d3bb40..d9c6a3f50 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -3,6 +3,19 @@ mod storage; mod security; mod server; +/// Generated protobuf modules. +pub mod pb { + pub mod remote_pb { + tonic::include_proto!("remote_pb"); + } + pub mod volume_server_pb { + tonic::include_proto!("volume_server_pb"); + } + pub mod master_pb { + tonic::include_proto!("master_pb"); + } +} + fn main() { let cli = config::parse_cli(); println!("SeaweedFS Volume Server (Rust)"); diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs new file mode 100644 index 000000000..d4a61d828 --- /dev/null +++ b/seaweed-volume/src/server/grpc_server.rs @@ -0,0 +1,575 @@ +//! gRPC service implementation for the volume server. +//! +//! Implements the VolumeServer trait generated from volume_server.proto. +//! 48 RPCs: core volume operations are fully implemented, streaming and +//! EC operations are stubbed with appropriate error messages. + +use std::pin::Pin; +use std::sync::Arc; + +use tokio_stream::Stream; +use tonic::{Request, Response, Status, Streaming}; + +use crate::pb::volume_server_pb; +use crate::pb::volume_server_pb::volume_server_server::VolumeServer; +use crate::storage::needle::needle::{self, Needle}; +use crate::storage::types::*; + +use super::volume_server::VolumeServerState; + +type BoxStream = Pin> + Send + 'static>>; + +pub struct VolumeGrpcService { + pub state: Arc, +} + +#[tonic::async_trait] +impl VolumeServer for VolumeGrpcService { + // ---- Core volume operations ---- + + async fn batch_delete( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let mut results = Vec::new(); + + for fid_str in &req.file_ids { + let result = match needle::FileId::parse(fid_str) { + Ok(file_id) => { + let mut n = Needle { + id: file_id.key, + cookie: file_id.cookie, + ..Needle::default() + }; + let mut store = self.state.store.write().unwrap(); + match store.delete_volume_needle(file_id.volume_id, &mut n) { + Ok(size) => volume_server_pb::DeleteResult { + file_id: fid_str.clone(), + status: 0, + error: String::new(), + size: size.0 as u32, + version: 0, + }, + Err(e) => volume_server_pb::DeleteResult { + file_id: fid_str.clone(), + status: 1, + error: e.to_string(), + size: 0, + version: 0, + }, + } + } + Err(e) => volume_server_pb::DeleteResult { + file_id: fid_str.clone(), + status: 1, + error: e, + size: 0, + version: 0, + }, + }; + results.push(result); + } + + Ok(Response::new(volume_server_pb::BatchDeleteResponse { results })) + } + + async fn vacuum_volume_check( + &self, + request: Request, + ) -> Result, Status> { + let vid = VolumeId(request.into_inner().volume_id); + let store = self.state.store.read().unwrap(); + let garbage_ratio = match store.find_volume(vid) { + Some((_, vol)) => vol.garbage_level(), + None => return Err(Status::not_found(format!("volume {} not found", vid))), + }; + Ok(Response::new(volume_server_pb::VacuumVolumeCheckResponse { garbage_ratio })) + } + + type VacuumVolumeCompactStream = BoxStream; + async fn vacuum_volume_compact( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("vacuum_volume_compact not yet implemented")) + } + + async fn vacuum_volume_commit( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("vacuum_volume_commit not yet implemented")) + } + + async fn vacuum_volume_cleanup( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("vacuum_volume_cleanup not yet implemented")) + } + + async fn delete_collection( + &self, + request: Request, + ) -> Result, Status> { + let collection = &request.into_inner().collection; + let mut store = self.state.store.write().unwrap(); + store.delete_collection(collection); + Ok(Response::new(volume_server_pb::DeleteCollectionResponse {})) + } + + async fn allocate_volume( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + let rp = crate::storage::super_block::ReplicaPlacement::from_string(&req.replication) + .map_err(|e| Status::invalid_argument(e.to_string()))?; + let ttl = if req.ttl.is_empty() { + None + } else { + Some(crate::storage::needle::ttl::TTL::read(&req.ttl) + .map_err(|e| Status::invalid_argument(e))?) + }; + let disk_type = DiskType::from_string(&req.disk_type); + + let mut store = self.state.store.write().unwrap(); + store.add_volume(vid, &req.collection, Some(rp), ttl, req.preallocate as u64, disk_type) + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new(volume_server_pb::AllocateVolumeResponse {})) + } + + async fn volume_sync_status( + &self, + request: Request, + ) -> Result, Status> { + let vid = VolumeId(request.into_inner().volume_id); + let store = self.state.store.read().unwrap(); + let (_, vol) = store.find_volume(vid) + .ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?; + + Ok(Response::new(volume_server_pb::VolumeSyncStatusResponse { + volume_id: vid.0, + collection: vol.collection.clone(), + replication: vol.super_block.replica_placement.to_string(), + ttl: String::new(), + tail_offset: vol.content_size(), + compact_revision: vol.super_block.compaction_revision as u32, + idx_file_size: 0, + version: vol.version().0 as u32, + })) + } + + type VolumeIncrementalCopyStream = BoxStream; + async fn volume_incremental_copy( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_incremental_copy not yet implemented")) + } + + async fn volume_mount( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + + let mut store = self.state.store.write().unwrap(); + store.mount_volume(vid, "", DiskType::HardDrive) + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new(volume_server_pb::VolumeMountResponse {})) + } + + async fn volume_unmount( + &self, + request: Request, + ) -> Result, Status> { + let vid = VolumeId(request.into_inner().volume_id); + let mut store = self.state.store.write().unwrap(); + if !store.unmount_volume(vid) { + return Err(Status::not_found(format!("volume {} not found", vid))); + } + Ok(Response::new(volume_server_pb::VolumeUnmountResponse {})) + } + + async fn volume_delete( + &self, + request: Request, + ) -> Result, Status> { + let vid = VolumeId(request.into_inner().volume_id); + let mut store = self.state.store.write().unwrap(); + store.delete_volume(vid) + .map_err(|e| Status::internal(e.to_string()))?; + Ok(Response::new(volume_server_pb::VolumeDeleteResponse {})) + } + + async fn volume_mark_readonly( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_mark_readonly not yet implemented")) + } + + async fn volume_mark_writable( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_mark_writable not yet implemented")) + } + + async fn volume_configure( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_configure not yet implemented")) + } + + async fn volume_status( + &self, + request: Request, + ) -> Result, Status> { + let vid = VolumeId(request.into_inner().volume_id); + let store = self.state.store.read().unwrap(); + let (_, vol) = store.find_volume(vid) + .ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?; + + Ok(Response::new(volume_server_pb::VolumeStatusResponse { + is_read_only: vol.is_read_only(), + volume_size: vol.content_size(), + file_count: vol.file_count() as u64, + file_deleted_count: vol.deleted_count() as u64, + })) + } + + async fn get_state( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(volume_server_pb::GetStateResponse { + state: Some(volume_server_pb::VolumeServerState { + maintenance: false, + version: 0, + }), + })) + } + + async fn set_state( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + Ok(Response::new(volume_server_pb::SetStateResponse { + state: req.state, + })) + } + + type VolumeCopyStream = BoxStream; + async fn volume_copy( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_copy not yet implemented")) + } + + async fn read_volume_file_status( + &self, + request: Request, + ) -> Result, Status> { + let vid = VolumeId(request.into_inner().volume_id); + let store = self.state.store.read().unwrap(); + let (_, vol) = store.find_volume(vid) + .ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?; + + Ok(Response::new(volume_server_pb::ReadVolumeFileStatusResponse { + volume_id: vid.0, + idx_file_timestamp_seconds: 0, + idx_file_size: 0, + dat_file_timestamp_seconds: 0, + dat_file_size: vol.dat_file_size().unwrap_or(0), + file_count: vol.file_count() as u64, + compaction_revision: vol.super_block.compaction_revision as u32, + collection: vol.collection.clone(), + disk_type: String::new(), + volume_info: None, + version: vol.version().0 as u32, + })) + } + + type CopyFileStream = BoxStream; + async fn copy_file( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("copy_file not yet implemented")) + } + + async fn receive_file( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("receive_file not yet implemented")) + } + + async fn read_needle_blob( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + let offset = req.offset; + let size = Size(req.size); + + let store = self.state.store.read().unwrap(); + let (_, vol) = store.find_volume(vid) + .ok_or_else(|| Status::not_found(format!("volume {} not found", vid)))?; + + let blob = vol.read_needle_blob(offset, size) + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new(volume_server_pb::ReadNeedleBlobResponse { + needle_blob: blob, + })) + } + + async fn read_needle_meta( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("read_needle_meta not yet implemented")) + } + + async fn write_needle_blob( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("write_needle_blob not yet implemented")) + } + + type ReadAllNeedlesStream = BoxStream; + async fn read_all_needles( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("read_all_needles not yet implemented")) + } + + type VolumeTailSenderStream = BoxStream; + async fn volume_tail_sender( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_tail_sender not yet implemented")) + } + + async fn volume_tail_receiver( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_tail_receiver not yet implemented")) + } + + // ---- EC operations ---- + + async fn volume_ec_shards_generate( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_ec_shards_generate not yet implemented")) + } + + async fn volume_ec_shards_rebuild( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_ec_shards_rebuild not yet implemented")) + } + + async fn volume_ec_shards_copy( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_ec_shards_copy not yet implemented")) + } + + async fn volume_ec_shards_delete( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_ec_shards_delete not yet implemented")) + } + + async fn volume_ec_shards_mount( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_ec_shards_mount not yet implemented")) + } + + async fn volume_ec_shards_unmount( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_ec_shards_unmount not yet implemented")) + } + + type VolumeEcShardReadStream = BoxStream; + async fn volume_ec_shard_read( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_ec_shard_read not yet implemented")) + } + + async fn volume_ec_blob_delete( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_ec_blob_delete not yet implemented")) + } + + async fn volume_ec_shards_to_volume( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_ec_shards_to_volume not yet implemented")) + } + + async fn volume_ec_shards_info( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_ec_shards_info not yet implemented")) + } + + // ---- Tiered storage ---- + + type VolumeTierMoveDatToRemoteStream = BoxStream; + async fn volume_tier_move_dat_to_remote( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_tier_move_dat_to_remote not yet implemented")) + } + + type VolumeTierMoveDatFromRemoteStream = BoxStream; + async fn volume_tier_move_dat_from_remote( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("volume_tier_move_dat_from_remote not yet implemented")) + } + + // ---- Server management ---- + + async fn volume_server_status( + &self, + _request: Request, + ) -> Result, Status> { + let store = self.state.store.read().unwrap(); + + let mut disk_statuses = Vec::new(); + for loc in &store.locations { + disk_statuses.push(volume_server_pb::DiskStatus { + dir: loc.directory.clone(), + all: 0, + used: 0, + free: loc.available_space.load(std::sync::atomic::Ordering::Relaxed), + percent_free: 0.0, + percent_used: 0.0, + disk_type: loc.disk_type.to_string(), + }); + } + + Ok(Response::new(volume_server_pb::VolumeServerStatusResponse { + disk_statuses, + memory_status: None, + version: env!("CARGO_PKG_VERSION").to_string(), + data_center: String::new(), + rack: String::new(), + state: Some(volume_server_pb::VolumeServerState { + maintenance: false, + version: 0, + }), + })) + } + + async fn volume_server_leave( + &self, + _request: Request, + ) -> Result, Status> { + *self.state.is_stopping.write().unwrap() = true; + Ok(Response::new(volume_server_pb::VolumeServerLeaveResponse {})) + } + + async fn fetch_and_write_needle( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("fetch_and_write_needle not yet implemented")) + } + + async fn scrub_volume( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("scrub_volume not yet implemented")) + } + + async fn scrub_ec_volume( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("scrub_ec_volume not yet implemented")) + } + + type QueryStream = BoxStream; + async fn query( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("query not yet implemented")) + } + + async fn volume_needle_status( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + let needle_id = NeedleId(req.needle_id); + + let store = self.state.store.read().unwrap(); + let mut n = Needle { id: needle_id, ..Needle::default() }; + match store.read_volume_needle(vid, &mut n) { + Ok(_) => Ok(Response::new(volume_server_pb::VolumeNeedleStatusResponse { + needle_id: needle_id.0, + cookie: n.cookie.0, + size: n.data_size, + last_modified: n.last_modified, + crc: n.checksum.0, + ttl: String::new(), + })), + Err(e) => Err(Status::not_found(e.to_string())), + } + } + + async fn ping( + &self, + _request: Request, + ) -> Result, Status> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as i64; + Ok(Response::new(volume_server_pb::PingResponse { + start_time_ns: now, + remote_time_ns: now, + stop_time_ns: now, + })) + } +} diff --git a/seaweed-volume/src/server/mod.rs b/seaweed-volume/src/server/mod.rs index db03f6061..da7e6d012 100644 --- a/seaweed-volume/src/server/mod.rs +++ b/seaweed-volume/src/server/mod.rs @@ -1,2 +1,3 @@ pub mod volume_server; pub mod handlers; +pub mod grpc_server;