From 7790843f5b7a45dd6f6004ba75b4181d8aed8c48 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Mar 2026 21:03:06 -0700 Subject: [PATCH] Match Go gRPC: tail header first-chunk-only, EC cleanup on failure, copy append mode, ecx rebuild, compact cancellation --- seaweed-volume/src/server/grpc_server.rs | 59 ++++++++++++++++++------ 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 0888170f9..4ab295322 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -378,7 +378,10 @@ impl VolumeServer for VolumeGrpcService { processed_bytes: processed, load_avg_1m: 0.0, }; - let _ = tx_clone.blocking_send(Ok(resp)); + // If send fails (client disconnected), stop compaction + if tx_clone.blocking_send(Ok(resp)).is_err() { + return false; + } next_report.store( processed + report_interval, std::sync::atomic::Ordering::Relaxed, @@ -1597,16 +1600,19 @@ impl VolumeServer for VolumeGrpcService { } sent_any = true; // Send body in chunks of BUFFER_SIZE_LIMIT + // Go sends needle_header only in the first chunk per needle let mut i = 0; + let mut first_chunk = true; while i < body.len() { let end = std::cmp::min(i + BUFFER_SIZE_LIMIT, body.len()); let is_last_chunk = end >= body.len(); let msg = volume_server_pb::VolumeTailSenderResponse { - needle_header: header.clone(), + needle_header: if first_chunk { header.clone() } else { vec![] }, needle_body: body[i..end].to_vec(), is_last_chunk, version, }; + first_chunk = false; if tx.send(Ok(msg)).await.is_err() { return; } @@ -1797,15 +1803,24 @@ impl VolumeServer for VolumeGrpcService { &dir, collection, vid, ); - crate::storage::erasure_coding::ec_encoder::write_ec_files( + if let Err(e) = crate::storage::erasure_coding::ec_encoder::write_ec_files( &dir, &idx_dir, collection, vid, data_shards as usize, parity_shards as usize, - ) - .map_err(|e| Status::internal(e.to_string()))?; + ) { + // Cleanup partially-created .ecNN and .ecx files on failure (matching Go defer) + let base = crate::storage::volume::volume_file_name(&dir, collection, vid); + let total_shards = data_shards + parity_shards; + for i in 0..total_shards { + let shard_path = format!("{}.ec{:02}", base, i); + let _ = std::fs::remove_file(&shard_path); + } + let _ = std::fs::remove_file(format!("{}.ecx", base)); + return Err(Status::internal(e.to_string())); + } // Write .vif file with EC shard metadata { @@ -1893,6 +1908,15 @@ impl VolumeServer for VolumeGrpcService { ) .map_err(|e| Status::internal(format!("rebuild ec shards: {}", e)))?; + // Rebuild the .ecx index file from the data shards (matching Go's RebuildEcxFile) + crate::storage::erasure_coding::ec_encoder::rebuild_ecx_file( + &dir, + collection, + vid, + data_shards as usize, + ) + .map_err(|e| Status::internal(format!("rebuild ecx file: {}", e)))?; + Ok(Response::new( volume_server_pb::VolumeEcShardsRebuildResponse { rebuilt_shard_ids: missing, @@ -3482,7 +3506,7 @@ async fn copy_file_from_source( stop_offset: u64, dest_path: &str, ext: &str, - _is_append: bool, + is_append: bool, ignore_source_not_found: bool, progress_tx: Option< &tokio::sync::mpsc::Sender>, @@ -3518,12 +3542,20 @@ where })? .into_inner(); - let mut file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(dest_path) - .map_err(|e| format!("open file {}: {}", dest_path, e))?; + let mut file = if is_append { + std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(dest_path) + .map_err(|e| format!("open file {}: {}", dest_path, e))? + } else { + std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(dest_path) + .map_err(|e| format!("open file {}: {}", dest_path, e))? + }; let mut progressed_bytes: i64 = 0; let mut modified_ts_ns: i64 = 0; @@ -3560,7 +3592,8 @@ where } // If source file didn't exist (no modifiedTsNs received), remove empty file - if modified_ts_ns == 0 { + // Go only removes when !isAppend + if modified_ts_ns == 0 && !is_append { let _ = std::fs::remove_file(dest_path); }