Browse Source

Match Go gRPC: tail header first-chunk-only, EC cleanup on failure, copy append mode, ecx rebuild, compact cancellation

rust-volume-server
Chris Lu 3 days ago
parent
commit
7790843f5b
  1. 59
      seaweed-volume/src/server/grpc_server.rs

59
seaweed-volume/src/server/grpc_server.rs

@ -378,7 +378,10 @@ impl VolumeServer for VolumeGrpcService {
processed_bytes: processed,
load_avg_1m: 0.0,
};
let _ = tx_clone.blocking_send(Ok(resp));
// If send fails (client disconnected), stop compaction
if tx_clone.blocking_send(Ok(resp)).is_err() {
return false;
}
next_report.store(
processed + report_interval,
std::sync::atomic::Ordering::Relaxed,
@ -1597,16 +1600,19 @@ impl VolumeServer for VolumeGrpcService {
}
sent_any = true;
// Send body in chunks of BUFFER_SIZE_LIMIT
// Go sends needle_header only in the first chunk per needle
let mut i = 0;
let mut first_chunk = true;
while i < body.len() {
let end = std::cmp::min(i + BUFFER_SIZE_LIMIT, body.len());
let is_last_chunk = end >= body.len();
let msg = volume_server_pb::VolumeTailSenderResponse {
needle_header: header.clone(),
needle_header: if first_chunk { header.clone() } else { vec![] },
needle_body: body[i..end].to_vec(),
is_last_chunk,
version,
};
first_chunk = false;
if tx.send(Ok(msg)).await.is_err() {
return;
}
@ -1797,15 +1803,24 @@ impl VolumeServer for VolumeGrpcService {
&dir, collection, vid,
);
crate::storage::erasure_coding::ec_encoder::write_ec_files(
if let Err(e) = crate::storage::erasure_coding::ec_encoder::write_ec_files(
&dir,
&idx_dir,
collection,
vid,
data_shards as usize,
parity_shards as usize,
)
.map_err(|e| Status::internal(e.to_string()))?;
) {
// Cleanup partially-created .ecNN and .ecx files on failure (matching Go defer)
let base = crate::storage::volume::volume_file_name(&dir, collection, vid);
let total_shards = data_shards + parity_shards;
for i in 0..total_shards {
let shard_path = format!("{}.ec{:02}", base, i);
let _ = std::fs::remove_file(&shard_path);
}
let _ = std::fs::remove_file(format!("{}.ecx", base));
return Err(Status::internal(e.to_string()));
}
// Write .vif file with EC shard metadata
{
@ -1893,6 +1908,15 @@ impl VolumeServer for VolumeGrpcService {
)
.map_err(|e| Status::internal(format!("rebuild ec shards: {}", e)))?;
// Rebuild the .ecx index file from the data shards (matching Go's RebuildEcxFile)
crate::storage::erasure_coding::ec_encoder::rebuild_ecx_file(
&dir,
collection,
vid,
data_shards as usize,
)
.map_err(|e| Status::internal(format!("rebuild ecx file: {}", e)))?;
Ok(Response::new(
volume_server_pb::VolumeEcShardsRebuildResponse {
rebuilt_shard_ids: missing,
@ -3482,7 +3506,7 @@ async fn copy_file_from_source<T>(
stop_offset: u64,
dest_path: &str,
ext: &str,
_is_append: bool,
is_append: bool,
ignore_source_not_found: bool,
progress_tx: Option<
&tokio::sync::mpsc::Sender<Result<volume_server_pb::VolumeCopyResponse, Status>>,
@ -3518,12 +3542,20 @@ where
})?
.into_inner();
let mut file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(dest_path)
.map_err(|e| format!("open file {}: {}", dest_path, e))?;
let mut file = if is_append {
std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(dest_path)
.map_err(|e| format!("open file {}: {}", dest_path, e))?
} else {
std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(dest_path)
.map_err(|e| format!("open file {}: {}", dest_path, e))?
};
let mut progressed_bytes: i64 = 0;
let mut modified_ts_ns: i64 = 0;
@ -3560,7 +3592,8 @@ where
}
// If source file didn't exist (no modifiedTsNs received), remove empty file
if modified_ts_ns == 0 {
// Go only removes when !isAppend
if modified_ts_ns == 0 && !is_append {
let _ = std::fs::remove_file(dest_path);
}

Loading…
Cancel
Save