Browse Source

Match Go gRPC: EC multi-disk placement, per-shard mount/unmount, no auto-mount on reconstruct, streaming ReadAll/EcShardRead, ReceiveFile cleanup, version check, proxy streaming, redirect Content-Type

rust-volume-server
Chris Lu 3 days ago
parent
commit
b3cf5ee896
  1. 504
      seaweed-volume/src/server/grpc_server.rs

504
seaweed-volume/src/server/grpc_server.rs

@ -1114,7 +1114,7 @@ impl VolumeServer for VolumeGrpcService {
// Find last_append_at_ns from copied files
let last_append_at_ns = if !has_remote_dat {
find_last_append_at_ns(&idx_path, &format!("{}.dat", data_base_name))
find_last_append_at_ns(&idx_path, &format!("{}.dat", data_base_name), vol_info.version)
.unwrap_or(vol_info.dat_file_timestamp_seconds * 1_000_000_000)
} else {
vol_info.dat_file_timestamp_seconds * 1_000_000_000
@ -1328,73 +1328,91 @@ impl VolumeServer for VolumeGrpcService {
let mut stream = request.into_inner();
let mut target_file: Option<std::fs::File> = None;
let mut file_path;
let mut file_path: Option<String> = None;
let mut bytes_written: u64 = 0;
while let Some(req) = stream.message().await? {
match req.data {
Some(volume_server_pb::receive_file_request::Data::Info(info)) => {
// Determine file path
if info.is_ec_volume {
let store = self.state.store.read().unwrap();
let dir = store
.locations
.first()
.map(|loc| loc.directory.clone())
.unwrap_or_default();
drop(store);
let ec_base = if info.collection.is_empty() {
format!("{}", info.volume_id)
let result: Result<(), Status> = async {
while let Some(req) = stream.message().await? {
match req.data {
Some(volume_server_pb::receive_file_request::Data::Info(info)) => {
// Determine file path
let path = if info.is_ec_volume {
let store = self.state.store.read().unwrap();
let dir = store
.locations
.first()
.map(|loc| loc.directory.clone())
.unwrap_or_default();
drop(store);
let ec_base = if info.collection.is_empty() {
format!("{}", info.volume_id)
} else {
format!("{}_{}", info.collection, info.volume_id)
};
format!("{}/{}{}", dir, ec_base, info.ext)
} else {
format!("{}_{}", info.collection, info.volume_id)
let store = self.state.store.read().unwrap();
let (_, v) =
store.find_volume(VolumeId(info.volume_id)).ok_or_else(|| {
Status::not_found(format!(
"volume {} not found",
info.volume_id
))
})?;
let p = v.file_name(&info.ext);
drop(store);
p
};
file_path = format!("{}/{}{}", dir, ec_base, info.ext);
} else {
let store = self.state.store.read().unwrap();
let (_, v) =
store.find_volume(VolumeId(info.volume_id)).ok_or_else(|| {
Status::not_found(format!("volume {} not found", info.volume_id))
target_file =
Some(std::fs::File::create(&path).map_err(|e| {
Status::internal(format!("failed to create file: {}", e))
})?);
file_path = Some(path);
}
Some(volume_server_pb::receive_file_request::Data::FileContent(content)) => {
if let Some(ref mut f) = target_file {
use std::io::Write;
let n = f.write(&content).map_err(|e| {
Status::internal(format!("failed to write file: {}", e))
})?;
file_path = v.file_name(&info.ext);
drop(store);
bytes_written += n as u64;
} else {
return Err(Status::invalid_argument(
"file info must be sent first",
));
}
}
target_file =
Some(std::fs::File::create(&file_path).map_err(|e| {
Status::internal(format!("failed to create file: {}", e))
})?);
}
Some(volume_server_pb::receive_file_request::Data::FileContent(content)) => {
if let Some(ref mut f) = target_file {
use std::io::Write;
let n = f.write(&content).map_err(|e| {
Status::internal(format!("failed to write file: {}", e))
})?;
bytes_written += n as u64;
} else {
return Ok(Response::new(volume_server_pb::ReceiveFileResponse {
error: "file info must be sent first".to_string(),
bytes_written: 0,
}));
None => {
return Err(Status::invalid_argument("unknown message type"));
}
}
None => {
return Ok(Response::new(volume_server_pb::ReceiveFileResponse {
error: "unknown message type".to_string(),
bytes_written: 0,
}));
}
}
Ok(())
}
.await;
if let Some(ref f) = target_file {
let _ = f.sync_all();
match result {
Ok(()) => {
if let Some(ref f) = target_file {
let _ = f.sync_all();
}
Ok(Response::new(volume_server_pb::ReceiveFileResponse {
error: String::new(),
bytes_written,
}))
}
Err(e) => {
// Clean up partial file on stream error (Go parity: closes file, removes it)
if let Some(f) = target_file.take() {
drop(f);
}
if let Some(ref p) = file_path {
let _ = std::fs::remove_file(p);
}
Err(e)
}
}
Ok(Response::new(volume_server_pb::ReceiveFileResponse {
error: String::new(),
bytes_written,
}))
}
async fn read_needle_blob(
@ -1489,49 +1507,57 @@ impl VolumeServer for VolumeGrpcService {
request: Request<volume_server_pb::ReadAllNeedlesRequest>,
) -> Result<Response<Self::ReadAllNeedlesStream>, Status> {
let req = request.into_inner();
let mut results: Vec<Result<volume_server_pb::ReadAllNeedlesResponse, Status>> = Vec::new();
let state = self.state.clone();
let store = self.state.store.read().unwrap();
for &raw_vid in &req.volume_ids {
let vid = VolumeId(raw_vid);
let v = match store.find_volume(vid) {
Some((_, v)) => v,
None => {
// Push error as last item so previous volumes' data is streamed first
results.push(Err(Status::not_found(format!(
"not found volume id {}",
vid
))));
break;
}
};
let (tx, rx) = tokio::sync::mpsc::channel(32);
let needles = match v.read_all_needles() {
Ok(n) => n,
Err(e) => {
results.push(Err(Status::internal(e.to_string())));
break;
}
};
// Stream needles lazily via a blocking task (matches Go's scanner pattern)
tokio::task::spawn_blocking(move || {
let store = state.store.read().unwrap();
for &raw_vid in &req.volume_ids {
let vid = VolumeId(raw_vid);
let v = match store.find_volume(vid) {
Some((_, v)) => v,
None => {
let _ = tx.blocking_send(Err(Status::not_found(format!(
"not found volume id {}",
vid
))));
return;
}
};
for n in needles {
let compressed = n.is_compressed();
results.push(Ok(volume_server_pb::ReadAllNeedlesResponse {
volume_id: raw_vid,
needle_id: n.id.into(),
cookie: n.cookie.0,
needle_blob: n.data,
needle_blob_compressed: compressed,
last_modified: n.last_modified,
crc: n.checksum.0,
name: n.name,
mime: n.mime,
}));
let needles = match v.read_all_needles() {
Ok(n) => n,
Err(e) => {
let _ = tx.blocking_send(Err(Status::internal(e.to_string())));
return;
}
};
for n in needles {
let compressed = n.is_compressed();
if tx
.blocking_send(Ok(volume_server_pb::ReadAllNeedlesResponse {
volume_id: raw_vid,
needle_id: n.id.into(),
cookie: n.cookie.0,
needle_blob: n.data,
needle_blob_compressed: compressed,
last_modified: n.last_modified,
crc: n.checksum.0,
name: n.name,
mime: n.mime,
}))
.is_err()
{
return; // receiver dropped
}
}
}
}
drop(store);
});
let stream = tokio_stream::iter(results);
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream)))
}
@ -1856,13 +1882,89 @@ impl VolumeServer for VolumeGrpcService {
let vid = VolumeId(req.volume_id);
let collection = &req.collection;
// Find the directory with EC files
// Search ALL locations for shards, pick the best rebuild location
// (most shards + has .ecx), collect additional dirs.
// Matches Go's multi-location search in VolumeEcShardsRebuild.
let base_name = if collection.is_empty() {
format!("{}", vid.0)
} else {
format!("{}_{}", collection, vid.0)
};
struct LocInfo {
dir: String,
idx_dir: String,
shard_count: usize,
has_ecx: bool,
}
let store = self.state.store.read().unwrap();
let dir = store.find_ec_dir(vid, collection);
let mut loc_infos: Vec<LocInfo> = Vec::new();
for loc in &store.locations {
// Count shards in this location's directory
let mut shard_count = 0usize;
if let Ok(entries) = std::fs::read_dir(&loc.directory) {
for entry in entries.flatten() {
let name = entry.file_name();
let name = name.to_string_lossy();
if name.starts_with(&format!("{}.ec", base_name)) {
let suffix = &name[base_name.len() + 3..];
if suffix.len() == 2 && suffix.chars().all(|c| c.is_ascii_digit()) {
shard_count += 1;
}
}
}
}
// Check for .ecx in idx_directory first, then data directory
let idx_base = format!("{}/{}", loc.idx_directory, base_name);
let data_base = format!("{}/{}", loc.directory, base_name);
let has_ecx = std::path::Path::new(&format!("{}.ecx", idx_base)).exists()
|| (loc.idx_directory != loc.directory
&& std::path::Path::new(&format!("{}.ecx", data_base)).exists());
if shard_count == 0 && !has_ecx {
continue;
}
loc_infos.push(LocInfo {
dir: loc.directory.clone(),
idx_dir: loc.idx_directory.clone(),
shard_count,
has_ecx,
});
}
drop(store);
let dir = match dir {
Some(d) => d,
if loc_infos.is_empty() {
return Ok(Response::new(
volume_server_pb::VolumeEcShardsRebuildResponse {
rebuilt_shard_ids: vec![],
},
));
}
// Pick rebuild location: has .ecx and most shards
let mut rebuild_loc_idx: Option<usize> = None;
let mut other_dirs: Vec<String> = Vec::new();
for (i, info) in loc_infos.iter().enumerate() {
if info.has_ecx
&& (rebuild_loc_idx.is_none()
|| info.shard_count > loc_infos[rebuild_loc_idx.unwrap()].shard_count)
{
if let Some(prev) = rebuild_loc_idx {
other_dirs.push(loc_infos[prev].dir.clone());
}
rebuild_loc_idx = Some(i);
} else {
other_dirs.push(info.dir.clone());
}
}
let rebuild_loc_idx = match rebuild_loc_idx {
Some(i) => i,
None => {
return Ok(Response::new(
volume_server_pb::VolumeEcShardsRebuildResponse {
@ -1872,19 +1974,35 @@ impl VolumeServer for VolumeGrpcService {
}
};
// Check which shards are missing
let rebuild_dir = loc_infos[rebuild_loc_idx].dir.clone();
let rebuild_idx_dir = loc_infos[rebuild_loc_idx].idx_dir.clone();
// Determine data/parity shard config from rebuild dir
let (data_shards, parity_shards) =
crate::storage::erasure_coding::ec_volume::read_ec_shard_config(
&dir, collection, vid,
&rebuild_dir, collection, vid,
);
let total_shards = data_shards + parity_shards;
// Check which shards are missing (check rebuild dir and all other dirs)
let mut missing: Vec<u32> = Vec::new();
for shard_id in 0..total_shards as u8 {
let shard = crate::storage::erasure_coding::ec_shard::EcVolumeShard::new(
&dir, collection, vid, shard_id,
&rebuild_dir, collection, vid, shard_id,
);
if !std::path::Path::new(&shard.file_name()).exists() {
let mut found = std::path::Path::new(&shard.file_name()).exists();
if !found {
for other_dir in &other_dirs {
let other_shard = crate::storage::erasure_coding::ec_shard::EcVolumeShard::new(
other_dir, collection, vid, shard_id,
);
if std::path::Path::new(&other_shard.file_name()).exists() {
found = true;
break;
}
}
}
if !found {
missing.push(shard_id as u32);
}
}
@ -1897,25 +2015,34 @@ impl VolumeServer for VolumeGrpcService {
));
}
// Rebuild missing shards by regenerating all missing EC files via Reed-Solomon reconstruct
// Rebuild missing shards, searching all locations for input shards
crate::storage::erasure_coding::ec_encoder::rebuild_ec_files(
&dir,
&rebuild_dir,
collection,
vid,
&missing,
data_shards as usize,
parity_shards as usize,
)
.map_err(|e| Status::internal(format!("rebuild ec shards: {}", e)))?;
.map_err(|e| Status::internal(format!("RebuildEcFiles: {}", e)))?;
// Rebuild .ecx; use idx_directory with fallback to data directory
let ecx_base = format!("{}/{}", rebuild_idx_dir, base_name);
let ecx_rebuild_dir = if std::path::Path::new(&format!("{}.ecx", ecx_base)).exists() {
rebuild_idx_dir
} else if rebuild_idx_dir != rebuild_dir {
rebuild_dir.clone()
} else {
rebuild_idx_dir
};
// Rebuild the .ecx index file from the data shards (matching Go's RebuildEcxFile)
crate::storage::erasure_coding::ec_encoder::rebuild_ecx_file(
&dir,
&ecx_rebuild_dir,
collection,
vid,
data_shards as usize,
)
.map_err(|e| Status::internal(format!("rebuild ecx file: {}", e)))?;
.map_err(|e| Status::internal(format!("RebuildEcxFile: {}", e)))?;
Ok(Response::new(
volume_server_pb::VolumeEcShardsRebuildResponse {
@ -1932,19 +2059,47 @@ impl VolumeServer for VolumeGrpcService {
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
// Validate disk_id
let (_loc_count, dest_dir) = {
// Select target location matching Go's 3-tier fallback:
// When disk_id > 0: use that specific location
// When disk_id == 0 (unset): (1) location with existing EC shards, (2) any HDD, (3) any
let (dest_dir, dest_idx_dir) = {
let store = self.state.store.read().unwrap();
let count = store.locations.len();
let dir = if (req.disk_id as usize) < count {
store.locations[req.disk_id as usize].directory.clone()
if req.disk_id > 0 {
// Explicit disk selection
if (req.disk_id as usize) >= count {
return Err(Status::invalid_argument(format!(
"invalid disk_id {}: only have {} disks",
req.disk_id, count
)));
}
let loc = &store.locations[req.disk_id as usize];
(loc.directory.clone(), loc.idx_directory.clone())
} else {
return Err(Status::invalid_argument(format!(
"invalid disk_id {}: only have {} disks",
req.disk_id, count
)));
};
(count, dir)
// Auto-select: prefer location with existing EC shards for this volume
let loc_idx = store
.find_free_location_predicate(|loc| loc.has_ec_volume(vid))
.or_else(|| {
// Fall back to any HDD location
store.find_free_location_predicate(|loc| {
loc.disk_type == DiskType::HardDrive
})
})
.or_else(|| {
// Fall back to any location
store.find_free_location_predicate(|_| true)
});
match loc_idx {
Some(i) => {
let loc = &store.locations[i];
(loc.directory.clone(), loc.idx_directory.clone())
}
None => {
return Err(Status::internal("no space left".to_string()));
}
}
}
};
// Connect to source and copy shard files via CopyFile
@ -2045,7 +2200,7 @@ impl VolumeServer for VolumeGrpcService {
let file_path = {
let base =
crate::storage::volume::volume_file_name(&dest_dir, &req.collection, vid);
crate::storage::volume::volume_file_name(&dest_idx_dir, &req.collection, vid);
format!("{}.ecx", base)
};
let mut file = std::fs::File::create(&file_path)
@ -2086,7 +2241,7 @@ impl VolumeServer for VolumeGrpcService {
let file_path = {
let base =
crate::storage::volume::volume_file_name(&dest_dir, &req.collection, vid);
crate::storage::volume::volume_file_name(&dest_idx_dir, &req.collection, vid);
format!("{}.ecj", base)
};
let mut file = std::fs::File::create(&file_path)
@ -2168,26 +2323,16 @@ impl VolumeServer for VolumeGrpcService {
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
// Check all requested shards exist on disk
{
let store = self.state.store.read().unwrap();
for &shard_id in &req.shard_ids {
if store
.find_ec_shard_dir(vid, &req.collection, shard_id as u8)
.is_none()
{
return Err(Status::not_found(format!(
"ec volume {} shards {:?} not found",
req.volume_id, req.shard_ids
)));
}
}
}
// Mount one shard at a time, returning error on first failure.
// Matches Go: for _, shardId := range req.ShardIds { err = vs.store.MountEcShards(...) }
let mut store = self.state.store.write().unwrap();
store
.mount_ec_shards(vid, &req.collection, &req.shard_ids)
.map_err(|e| Status::internal(e.to_string()))?;
for &shard_id in &req.shard_ids {
store
.mount_ec_shard(vid, &req.collection, shard_id)
.map_err(|e| {
Status::internal(format!("mount {}.{}: {}", req.volume_id, shard_id, e))
})?;
}
Ok(Response::new(
volume_server_pb::VolumeEcShardsMountResponse {},
@ -2200,8 +2345,15 @@ impl VolumeServer for VolumeGrpcService {
) -> Result<Response<volume_server_pb::VolumeEcShardsUnmountResponse>, Status> {
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
// Unmount one shard at a time, returning error on first failure.
// Matches Go: for _, shardId := range req.ShardIds { err = vs.store.UnmountEcShards(...) }
let mut store = self.state.store.write().unwrap();
store.unmount_ec_shards(vid, &req.shard_ids);
for &shard_id in &req.shard_ids {
store.unmount_ec_shard(vid, shard_id).map_err(|e| {
Status::internal(format!("unmount {}.{}: {}", req.volume_id, shard_id, e))
})?;
}
Ok(Response::new(
volume_server_pb::VolumeEcShardsUnmountResponse {},
))
@ -2250,21 +2402,40 @@ impl VolumeServer for VolumeGrpcService {
))
})?;
let read_size = if req.size > 0 {
let total_size = if req.size > 0 {
req.size as usize
} else {
1024 * 1024
};
let mut buf = vec![0u8; read_size];
let n = shard
.read_at(&mut buf, req.offset as u64)
.map_err(|e| Status::internal(e.to_string()))?;
buf.truncate(n);
let results = vec![Ok(volume_server_pb::VolumeEcShardReadResponse {
data: buf,
is_deleted: false,
})];
// Stream in 2MB chunks (matching Go's BufferSizeLimit)
const BUFFER_SIZE_LIMIT: usize = 2 * 1024 * 1024;
let mut results: Vec<Result<volume_server_pb::VolumeEcShardReadResponse, Status>> =
Vec::new();
let mut bytes_read: usize = 0;
let mut current_offset = req.offset as u64;
while bytes_read < total_size {
let chunk_size = std::cmp::min(BUFFER_SIZE_LIMIT, total_size - bytes_read);
let mut buf = vec![0u8; chunk_size];
let n = shard
.read_at(&mut buf, current_offset)
.map_err(|e| Status::internal(e.to_string()))?;
if n == 0 {
break;
}
buf.truncate(n);
bytes_read += n;
current_offset += n as u64;
results.push(Ok(volume_server_pb::VolumeEcShardReadResponse {
data: buf,
is_deleted: false,
}));
if n < chunk_size {
break; // short read means EOF
}
}
Ok(Response::new(Box::pin(tokio_stream::iter(results))))
}
@ -2277,8 +2448,19 @@ impl VolumeServer for VolumeGrpcService {
let vid = VolumeId(req.volume_id);
let needle_id = NeedleId(req.file_key);
// Go checks if needle is already deleted (via ecx) before journaling.
// Search all locations for the EC volume.
let mut store = self.state.store.write().unwrap();
if let Some(ec_vol) = store.find_ec_volume_mut(vid) {
// Check if already deleted via ecx index
if let Ok(Some((_offset, size))) = ec_vol.find_needle_from_ecx(needle_id) {
if size.is_deleted() {
// Already deleted, no-op
return Ok(Response::new(
volume_server_pb::VolumeEcBlobDeleteResponse {},
));
}
}
ec_vol
.journal_delete(needle_id)
.map_err(|e| Status::internal(e.to_string()))?;
@ -2379,19 +2561,8 @@ impl VolumeServer for VolumeGrpcService {
)
.map_err(|e| Status::internal(format!("WriteIdxFileFromEcIndex: {}", e)))?;
// Unmount EC shards and mount the reconstructed volume
{
let mut store = self.state.store.write().unwrap();
// Remove EC volume
if let Some(mut ec_vol) = store.remove_ec_volume(vid) {
ec_vol.close();
}
// Unmount existing volume if any, then mount fresh
store.unmount_volume(vid);
store
.mount_volume(vid, &collection, DiskType::HardDrive)
.map_err(|e| Status::internal(format!("mount volume: {}", e)))?;
}
// Go does NOT unmount EC shards or mount the volume here.
// The caller (ec.balance / ec.decode) handles mount/unmount separately.
Ok(Response::new(
volume_server_pb::VolumeEcShardsToVolumeResponse {},
@ -3624,7 +3795,12 @@ fn check_copy_file_size(path: &str, expected: u64) -> Result<(), Status> {
}
/// Find the last append timestamp from copied .idx and .dat files.
fn find_last_append_at_ns(idx_path: &str, dat_path: &str) -> Option<u64> {
/// Go returns (0, nil) for versions < Version3 since timestamps only exist in V3.
fn find_last_append_at_ns(idx_path: &str, dat_path: &str, version: u32) -> Option<u64> {
// Only Version3 has the append timestamp in the needle tail
if version < VERSION_3.0 as u32 {
return None;
}
use std::io::{Read, Seek, SeekFrom};
let mut idx_file = std::fs::File::open(idx_path).ok()?;

Loading…
Cancel
Save