From 41637f703951bf31b5be359a8f7f61ecb47224d8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Mar 2026 04:45:38 -0800 Subject: [PATCH] implement VolumeCopy and VolumeTailReceiver gRPC RPCs VolumeCopy: connects to source as gRPC client, copies .dat/.idx/.vif files via CopyFile streaming, verifies sizes, mounts the volume. Needed for volume rebalancing and migration between servers. VolumeTailReceiver: connects to source's VolumeTailSender, receives needle header+body chunks, reassembles multi-chunk needles, writes them locally. Needed for live replication during volume moves. Also adds helper functions: parse_grpc_address (SeaweedFS address format parsing), copy_file_from_source (streaming file copy with progress reporting), find_last_append_at_ns (timestamp extraction from copied files). All 128/130 integration tests still pass (same 2 known unfixable). --- seaweed-volume/DEV_PLAN.md | 25 +- seaweed-volume/src/server/grpc_server.rs | 391 ++++++++++++++++++++++- 2 files changed, 391 insertions(+), 25 deletions(-) diff --git a/seaweed-volume/DEV_PLAN.md b/seaweed-volume/DEV_PLAN.md index 73857480f..3ee509626 100644 --- a/seaweed-volume/DEV_PLAN.md +++ b/seaweed-volume/DEV_PLAN.md @@ -5,7 +5,7 @@ **HTTP tests**: 54/55 pass (98.2%) — 1 unfixable: CONNECT method is a hyper/axum limitation **gRPC tests**: 74/75 pass (98.7%) — 1 Go-only: TestVolumeMoveHandlesInFlightWrites uses Go binaries exclusively **Total**: 128/130 (98.5%) -**Rust unit tests**: 111 lib + 7 integration = 118 +**Rust unit tests**: 112 lib + 7 integration = 119 ## Completed Features @@ -20,7 +20,7 @@ All phases from the original plan are complete: - **Phase 3** — gRPC: maintenance mode, error message parity, ping routing, batch delete, VolumeServerStatus, ReadVolumeFileStatus - **Phase 4** — Streaming gRPC: VolumeIncrementalCopy, CopyFile, ReceiveFile, ReadAllNeedles, - VolumeTailSender, VolumeCopy (partial), VacuumVolumeCheck + VolumeTailSender, VolumeCopy, VolumeTailReceiver, VacuumVolumeCheck - **Phase 5** — EC Shards: mount/unmount, delete, read, blob delete, rebuild, shards-to-volume, copy, info - **Phase 6** — Advanced gRPC: ScrubVolume, ScrubEcVolume, Query, FetchAndWriteNeedle (stub), @@ -30,31 +30,20 @@ All phases from the original plan are complete: ## Remaining Work (Production Readiness) -These features are not covered by existing integration tests but are needed for -production use in a full SeaweedFS cluster: - -### High Priority (needed for cluster operations) - -1. **VacuumVolumeCompact/Commit/Cleanup** — Volume compaction/garbage collection. - Without this, deleted space is never reclaimed. The master triggers this periodically. - -2. **VolumeCopy** — Full volume copy from peer. Currently stubbed. Needed for volume - rebalancing and migration between volume servers. - -3. **VolumeTailReceiver** — Live replication from source volume. Needed for volume moves - (copy + tail + switchover). Currently returns unimplemented. +All high-priority cluster operations are implemented (VacuumVolumeCompact/Commit/Cleanup, +VolumeCopy, VolumeTailReceiver). Remaining items are lower priority: ### Medium Priority (nice to have) -4. **Remote Storage** — `FetchAndWriteNeedle` with actual remote backend support. +1. **Remote Storage** — `FetchAndWriteNeedle` with actual remote backend support. Currently returns "remote storage not configured". Needed only if using tiered storage. -5. **VolumeTierMoveDatToRemote/FromRemote** — Move volume data to/from remote storage +2. **VolumeTierMoveDatToRemote/FromRemote** — Move volume data to/from remote storage backends (S3, etc.). Currently returns error paths only. ### Low Priority -6. **TestUnsupportedMethodConnectParity** — HTTP CONNECT method returns 400 in Go but +3. **TestUnsupportedMethodConnectParity** — HTTP CONNECT method returns 400 in Go but hyper rejects it before reaching the router. Would need a custom hyper service wrapper. ## Test Commands diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index a98ce0079..7013dcfc5 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -533,10 +533,166 @@ impl VolumeServer for VolumeGrpcService { type VolumeCopyStream = BoxStream; async fn volume_copy( &self, - _request: Request, + request: Request, ) -> Result, Status> { self.state.check_maintenance()?; - Err(Status::unimplemented("volume_copy not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + + // If volume already exists locally, delete it first + { + let store = self.state.store.read().unwrap(); + if store.find_volume(vid).is_some() { + drop(store); + let mut store = self.state.store.write().unwrap(); + store.delete_volume(vid) + .map_err(|e| Status::internal(format!("failed to delete existing volume {}: {}", vid, e)))?; + } + } + + // Parse source_data_node address: "ip:port.grpcPort" or "ip:port" (grpc = port + 10000) + let source = &req.source_data_node; + let grpc_addr = parse_grpc_address(source) + .map_err(|e| Status::internal(format!("VolumeCopy volume {} invalid source_data_node {}: {}", vid, source, e)))?; + + let channel = tonic::transport::Channel::from_shared(format!("http://{}", grpc_addr)) + .map_err(|e| Status::internal(format!("VolumeCopy volume {} parse source: {}", vid, e)))? + .connect() + .await + .map_err(|e| Status::internal(format!("VolumeCopy volume {} connect to {}: {}", vid, grpc_addr, e)))?; + + let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel); + + // Get file status from source + let vol_info = client.read_volume_file_status(volume_server_pb::ReadVolumeFileStatusRequest { + volume_id: req.volume_id, + }).await + .map_err(|e| Status::internal(format!("read volume file status failed, {}", e)))? + .into_inner(); + + let disk_type = if !req.disk_type.is_empty() { + &req.disk_type + } else { + &vol_info.disk_type + }; + + // Find a free disk location + let (data_base, idx_base) = { + let store = self.state.store.read().unwrap(); + let mut found = None; + for loc in &store.locations { + if format!("{:?}", loc.disk_type) == *disk_type || disk_type.is_empty() || disk_type == "0" { + if loc.available_space.load(Ordering::Relaxed) > vol_info.dat_file_size { + found = Some((loc.directory.clone(), loc.idx_directory.clone())); + break; + } + } + } + // Fallback: use first location if no disk type match + if found.is_none() && !store.locations.is_empty() { + let loc = &store.locations[0]; + found = Some((loc.directory.clone(), loc.idx_directory.clone())); + } + found.ok_or_else(|| Status::internal(format!("no space left {}", disk_type)))? + }; + + let data_base_name = crate::storage::volume::volume_file_name(&data_base, &vol_info.collection, vid); + let idx_base_name = crate::storage::volume::volume_file_name(&idx_base, &vol_info.collection, vid); + + // Write a .note file to indicate copy in progress + let note_path = format!("{}.note", data_base_name); + let _ = std::fs::write(¬e_path, format!("copying from {}", source)); + + let has_remote_dat = vol_info.volume_info.as_ref() + .map(|vi| !vi.files.is_empty()) + .unwrap_or(false); + + let (tx, rx) = tokio::sync::mpsc::channel::>(16); + let state = self.state.clone(); + + tokio::spawn(async move { + let result = async { + let report_interval: i64 = 128 * 1024 * 1024; + let mut next_report_target: i64 = report_interval; + + // Copy .dat file + if !has_remote_dat { + let dat_path = format!("{}.dat", data_base_name); + copy_file_from_source( + &mut client, false, &req.collection, req.volume_id, + vol_info.compaction_revision, vol_info.dat_file_size, + &dat_path, ".dat", false, true, + Some(&tx), &mut next_report_target, report_interval, + ).await.map_err(|e| Status::internal(e))?; + } + + // Copy .idx file + let idx_path = format!("{}.idx", idx_base_name); + copy_file_from_source( + &mut client, false, &req.collection, req.volume_id, + vol_info.compaction_revision, vol_info.idx_file_size, + &idx_path, ".idx", false, false, + None, &mut next_report_target, report_interval, + ).await.map_err(|e| Status::internal(e))?; + + // Copy .vif file (ignore if not found on source) + let vif_path = format!("{}.vif", data_base_name); + copy_file_from_source( + &mut client, false, &req.collection, req.volume_id, + vol_info.compaction_revision, 1024 * 1024, + &vif_path, ".vif", false, true, + None, &mut next_report_target, report_interval, + ).await.map_err(|e| Status::internal(e))?; + + // Remove the .note file + let _ = std::fs::remove_file(¬e_path); + + // Verify file sizes + if !has_remote_dat { + let dat_path = format!("{}.dat", data_base_name); + check_copy_file_size(&dat_path, vol_info.dat_file_size)?; + } + if vol_info.idx_file_size > 0 { + check_copy_file_size(&idx_path, vol_info.idx_file_size)?; + } + + // Find last_append_at_ns from copied files + let last_append_at_ns = if !has_remote_dat { + find_last_append_at_ns(&idx_path, &format!("{}.dat", data_base_name)) + .unwrap_or(vol_info.dat_file_timestamp_seconds * 1_000_000_000) + } else { + vol_info.dat_file_timestamp_seconds * 1_000_000_000 + }; + + // Mount the volume + { + let disk_type_enum = DiskType::default(); + let mut store = state.store.write().unwrap(); + store.mount_volume(vid, &vol_info.collection, disk_type_enum) + .map_err(|e| Status::internal(format!("failed to mount volume {}: {}", vid, e)))?; + } + + // Send final response with last_append_at_ns + let _ = tx.send(Ok(volume_server_pb::VolumeCopyResponse { + last_append_at_ns: last_append_at_ns, + processed_bytes: 0, + })).await; + + Ok::<(), Status>(()) + }.await; + + if let Err(e) = result { + // Clean up on error + let _ = std::fs::remove_file(format!("{}.dat", data_base_name)); + let _ = std::fs::remove_file(format!("{}.idx", idx_base_name)); + let _ = std::fs::remove_file(format!("{}.vif", data_base_name)); + let _ = std::fs::remove_file(¬e_path); + let _ = tx.send(Err(e)).await; + } + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(Response::new(Box::pin(stream))) } async fn read_volume_file_status( @@ -998,11 +1154,76 @@ impl VolumeServer for VolumeGrpcService { ) -> Result, Status> { let req = request.into_inner(); let vid = VolumeId(req.volume_id); - let store = self.state.store.read().unwrap(); - store.find_volume(vid) - .ok_or_else(|| Status::not_found(format!("receiver not found volume id {}", vid)))?; - drop(store); - Err(Status::unimplemented("volume_tail_receiver not yet implemented")) + + // Check volume exists + { + let store = self.state.store.read().unwrap(); + store.find_volume(vid) + .ok_or_else(|| Status::not_found(format!("receiver not found volume id {}", vid)))?; + } + + // Parse source address and connect + let source = &req.source_volume_server; + let grpc_addr = parse_grpc_address(source) + .map_err(|e| Status::internal(format!("invalid source address {}: {}", source, e)))?; + + let channel = tonic::transport::Channel::from_shared(format!("http://{}", grpc_addr)) + .map_err(|e| Status::internal(format!("parse source: {}", e)))? + .connect() + .await + .map_err(|e| Status::internal(format!("connect to {}: {}", grpc_addr, e)))?; + + let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel); + + // Call VolumeTailSender on source + let mut stream = client.volume_tail_sender(volume_server_pb::VolumeTailSenderRequest { + volume_id: req.volume_id, + since_ns: req.since_ns, + idle_timeout_seconds: req.idle_timeout_seconds, + }).await + .map_err(|e| Status::internal(format!("volume_tail_sender: {}", e)))? + .into_inner(); + + let state = self.state.clone(); + + // Receive needles from source and write locally + while let Some(resp) = stream.message().await + .map_err(|e| Status::internal(format!("recv from tail sender: {}", e)))? { + + let needle_header = resp.needle_header; + let mut needle_body = resp.needle_body; + + if needle_header.is_empty() { + continue; + } + + // Collect all chunks if not last + if !resp.is_last_chunk { + // Need to receive remaining chunks + loop { + let chunk = stream.message().await + .map_err(|e| Status::internal(format!("recv chunk: {}", e)))? + .ok_or_else(|| Status::internal("unexpected end of tail stream"))?; + needle_body.extend_from_slice(&chunk.needle_body); + if chunk.is_last_chunk { + break; + } + } + } + + // Parse needle from header + body + let mut n = Needle::default(); + n.read_header(&needle_header); + n.read_body_v2(&needle_body) + .map_err(|e| Status::internal(format!("parse needle body: {}", e)))?; + + // Write needle to local volume + let mut store = state.store.write().unwrap(); + store.write_volume_needle(vid, &mut n) + .map_err(|e| Status::internal(format!("write needle: {}", e)))?; + } + + Ok(Response::new(volume_server_pb::VolumeTailReceiverResponse {})) } // ---- EC operations ---- @@ -1950,3 +2171,159 @@ async fn ping_grpc_target(target: &str) -> Result { Err(e) => Err(e.to_string()), } } + +/// Parse a SeaweedFS server address ("ip:port.grpcPort" or "ip:port") into a gRPC address. +fn parse_grpc_address(source: &str) -> Result { + if let Some(colon_idx) = source.rfind(':') { + let port_part = &source[colon_idx + 1..]; + if let Some(dot_idx) = port_part.rfind('.') { + // Format: "ip:port.grpcPort" + let host = &source[..colon_idx]; + let grpc_port = &port_part[dot_idx + 1..]; + grpc_port.parse::().map_err(|e| format!("invalid grpc port: {}", e))?; + return Ok(format!("{}:{}", host, grpc_port)); + } + // Format: "ip:port" → grpc = port + 10000 + let port: u16 = port_part.parse().map_err(|e| format!("invalid port: {}", e))?; + let grpc_port = port as u32 + 10000; + let host = &source[..colon_idx]; + return Ok(format!("{}:{}", host, grpc_port)); + } + Err(format!("cannot parse address: {}", source)) +} + +/// Copy a file from a remote volume server via CopyFile streaming RPC. +async fn copy_file_from_source( + client: &mut volume_server_pb::volume_server_client::VolumeServerClient, + is_ec_volume: bool, + collection: &str, + volume_id: u32, + compaction_revision: u32, + stop_offset: u64, + dest_path: &str, + ext: &str, + _is_append: bool, + ignore_source_not_found: bool, + progress_tx: Option<&tokio::sync::mpsc::Sender>>, + next_report_target: &mut i64, + report_interval: i64, +) -> Result<(), String> { + let copy_req = volume_server_pb::CopyFileRequest { + volume_id, + ext: ext.to_string(), + compaction_revision, + stop_offset, + collection: collection.to_string(), + is_ec_volume, + ignore_source_file_not_found: ignore_source_not_found, + }; + + let mut stream = client.copy_file(copy_req).await + .map_err(|e| format!("failed to start copying volume {} {} file: {}", volume_id, ext, e))? + .into_inner(); + + let mut file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(dest_path) + .map_err(|e| format!("open file {}: {}", dest_path, e))?; + + let mut progressed_bytes: i64 = 0; + let mut got_modified_ts = false; + + while let Some(resp) = stream.message().await + .map_err(|e| format!("receiving {}: {}", dest_path, e))? { + if resp.modified_ts_ns != 0 { + got_modified_ts = true; + } + if !resp.file_content.is_empty() { + use std::io::Write; + file.write_all(&resp.file_content) + .map_err(|e| format!("write file {}: {}", dest_path, e))?; + progressed_bytes += resp.file_content.len() as i64; + + if let Some(tx) = progress_tx { + if progressed_bytes > *next_report_target { + let _ = tx.send(Ok(volume_server_pb::VolumeCopyResponse { + last_append_at_ns: 0, + processed_bytes: progressed_bytes, + })).await; + *next_report_target = progressed_bytes + report_interval; + } + } + } + } + + // If source file didn't exist (no modifiedTsNs received), remove empty file + if !got_modified_ts { + let _ = std::fs::remove_file(dest_path); + } + + Ok(()) +} + +/// Verify that a copied file has the expected size. +fn check_copy_file_size(path: &str, expected: u64) -> Result<(), Status> { + match std::fs::metadata(path) { + Ok(meta) => { + if meta.len() != expected { + Err(Status::internal(format!( + "file {} size [{}] is not same as origin file size [{}]", + path, meta.len(), expected + ))) + } else { + Ok(()) + } + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound && expected == 0 => Ok(()), + Err(e) => Err(Status::internal(format!("stat file {} failed: {}", path, e))), + } +} + +/// Find the last append timestamp from copied .idx and .dat files. +fn find_last_append_at_ns(idx_path: &str, dat_path: &str) -> Option { + use std::io::{Read, Seek, SeekFrom}; + + let mut idx_file = std::fs::File::open(idx_path).ok()?; + let idx_size = idx_file.metadata().ok()?.len(); + if idx_size == 0 || idx_size % (NEEDLE_MAP_ENTRY_SIZE as u64) != 0 { + return None; + } + + // Read the last index entry + let mut buf = [0u8; NEEDLE_MAP_ENTRY_SIZE]; + idx_file.seek(SeekFrom::End(-(NEEDLE_MAP_ENTRY_SIZE as i64))).ok()?; + idx_file.read_exact(&mut buf).ok()?; + + let (_key, offset, _size) = idx_entry_from_bytes(&buf); + if offset.is_zero() { + return None; + } + + // Read needle header from .dat to get the append timestamp + let mut dat_file = std::fs::File::open(dat_path).ok()?; + let actual_offset = offset.to_actual_offset(); + + // Skip to the needle at the given offset, read header to get size + dat_file.seek(SeekFrom::Start(actual_offset as u64)).ok()?; + + // Read cookie (4) + id (8) + size (4) = 16 bytes header + let mut header = [0u8; 16]; + dat_file.read_exact(&mut header).ok()?; + let needle_size = i32::from_be_bytes([header[12], header[13], header[14], header[15]]); + if needle_size <= 0 { + return None; + } + + // Seek to tail: offset + 16 (header) + size -> checksum (4) + timestamp (8) + let tail_offset = actual_offset as u64 + 16 + needle_size as u64; + dat_file.seek(SeekFrom::Start(tail_offset)).ok()?; + + let mut tail = [0u8; 12]; // 4 bytes checksum + 8 bytes timestamp + dat_file.read_exact(&mut tail).ok()?; + + // Timestamp is the last 8 bytes, big-endian + let ts = u64::from_be_bytes([tail[4], tail[5], tail[6], tail[7], tail[8], tail[9], tail[10], tail[11]]); + if ts > 0 { Some(ts) } else { None } +}