@ -533,10 +533,166 @@ impl VolumeServer for VolumeGrpcService {
type VolumeCopyStream = BoxStream < volume_server_pb ::VolumeCopyResponse > ;
async fn volume_copy (
& self ,
_ request : Request < volume_server_pb ::VolumeCopyRequest > ,
request : Request < volume_server_pb ::VolumeCopyRequest > ,
) -> Result < Response < Self ::VolumeCopyStream > , Status > {
self . state . check_maintenance ( ) ? ;
Err ( Status ::unimplemented ( "volume_copy not yet implemented" ) )
let req = request . into_inner ( ) ;
let vid = VolumeId ( req . volume_id ) ;
// If volume already exists locally, delete it first
{
let store = self . state . store . read ( ) . unwrap ( ) ;
if store . find_volume ( vid ) . is_some ( ) {
drop ( store ) ;
let mut store = self . state . store . write ( ) . unwrap ( ) ;
store . delete_volume ( vid )
. map_err ( | e | Status ::internal ( format ! ( "failed to delete existing volume {}: {}" , vid , e ) ) ) ? ;
}
}
// Parse source_data_node address: "ip:port.grpcPort" or "ip:port" (grpc = port + 10000)
let source = & req . source_data_node ;
let grpc_addr = parse_grpc_address ( source )
. map_err ( | e | Status ::internal ( format ! ( "VolumeCopy volume {} invalid source_data_node {}: {}" , vid , source , e ) ) ) ? ;
let channel = tonic ::transport ::Channel ::from_shared ( format ! ( "http://{}" , grpc_addr ) )
. map_err ( | e | Status ::internal ( format ! ( "VolumeCopy volume {} parse source: {}" , vid , e ) ) ) ?
. connect ( )
. await
. map_err ( | e | Status ::internal ( format ! ( "VolumeCopy volume {} connect to {}: {}" , vid , grpc_addr , e ) ) ) ? ;
let mut client = volume_server_pb ::volume_server_client ::VolumeServerClient ::new ( channel ) ;
// Get file status from source
let vol_info = client . read_volume_file_status ( volume_server_pb ::ReadVolumeFileStatusRequest {
volume_id : req . volume_id ,
} ) . await
. map_err ( | e | Status ::internal ( format ! ( "read volume file status failed, {}" , e ) ) ) ?
. into_inner ( ) ;
let disk_type = if ! req . disk_type . is_empty ( ) {
& req . disk_type
} else {
& vol_info . disk_type
} ;
// Find a free disk location
let ( data_base , idx_base ) = {
let store = self . state . store . read ( ) . unwrap ( ) ;
let mut found = None ;
for loc in & store . locations {
if format ! ( "{:?}" , loc . disk_type ) = = * disk_type | | disk_type . is_empty ( ) | | disk_type = = "0" {
if loc . available_space . load ( Ordering ::Relaxed ) > vol_info . dat_file_size {
found = Some ( ( loc . directory . clone ( ) , loc . idx_directory . clone ( ) ) ) ;
break ;
}
}
}
// Fallback: use first location if no disk type match
if found . is_none ( ) & & ! store . locations . is_empty ( ) {
let loc = & store . locations [ 0 ] ;
found = Some ( ( loc . directory . clone ( ) , loc . idx_directory . clone ( ) ) ) ;
}
found . ok_or_else ( | | Status ::internal ( format ! ( "no space left {}" , disk_type ) ) ) ?
} ;
let data_base_name = crate ::storage ::volume ::volume_file_name ( & data_base , & vol_info . collection , vid ) ;
let idx_base_name = crate ::storage ::volume ::volume_file_name ( & idx_base , & vol_info . collection , vid ) ;
// Write a .note file to indicate copy in progress
let note_path = format ! ( "{}.note" , data_base_name ) ;
let _ = std ::fs ::write ( & note_path , format ! ( "copying from {}" , source ) ) ;
let has_remote_dat = vol_info . volume_info . as_ref ( )
. map ( | vi | ! vi . files . is_empty ( ) )
. unwrap_or ( false ) ;
let ( tx , rx ) = tokio ::sync ::mpsc ::channel ::< Result < volume_server_pb ::VolumeCopyResponse , Status > > ( 16 ) ;
let state = self . state . clone ( ) ;
tokio ::spawn ( async move {
let result = async {
let report_interval : i64 = 128 * 1024 * 1024 ;
let mut next_report_target : i64 = report_interval ;
// Copy .dat file
if ! has_remote_dat {
let dat_path = format ! ( "{}.dat" , data_base_name ) ;
copy_file_from_source (
& mut client , false , & req . collection , req . volume_id ,
vol_info . compaction_revision , vol_info . dat_file_size ,
& dat_path , ".dat" , false , true ,
Some ( & tx ) , & mut next_report_target , report_interval ,
) . await . map_err ( | e | Status ::internal ( e ) ) ? ;
}
// Copy .idx file
let idx_path = format ! ( "{}.idx" , idx_base_name ) ;
copy_file_from_source (
& mut client , false , & req . collection , req . volume_id ,
vol_info . compaction_revision , vol_info . idx_file_size ,
& idx_path , ".idx" , false , false ,
None , & mut next_report_target , report_interval ,
) . await . map_err ( | e | Status ::internal ( e ) ) ? ;
// Copy .vif file (ignore if not found on source)
let vif_path = format ! ( "{}.vif" , data_base_name ) ;
copy_file_from_source (
& mut client , false , & req . collection , req . volume_id ,
vol_info . compaction_revision , 1024 * 1024 ,
& vif_path , ".vif" , false , true ,
None , & mut next_report_target , report_interval ,
) . await . map_err ( | e | Status ::internal ( e ) ) ? ;
// Remove the .note file
let _ = std ::fs ::remove_file ( & note_path ) ;
// Verify file sizes
if ! has_remote_dat {
let dat_path = format ! ( "{}.dat" , data_base_name ) ;
check_copy_file_size ( & dat_path , vol_info . dat_file_size ) ? ;
}
if vol_info . idx_file_size > 0 {
check_copy_file_size ( & idx_path , vol_info . idx_file_size ) ? ;
}
// Find last_append_at_ns from copied files
let last_append_at_ns = if ! has_remote_dat {
find_last_append_at_ns ( & idx_path , & format ! ( "{}.dat" , data_base_name ) )
. unwrap_or ( vol_info . dat_file_timestamp_seconds * 1_000_000_000 )
} else {
vol_info . dat_file_timestamp_seconds * 1_000_000_000
} ;
// Mount the volume
{
let disk_type_enum = DiskType ::default ( ) ;
let mut store = state . store . write ( ) . unwrap ( ) ;
store . mount_volume ( vid , & vol_info . collection , disk_type_enum )
. map_err ( | e | Status ::internal ( format ! ( "failed to mount volume {}: {}" , vid , e ) ) ) ? ;
}
// Send final response with last_append_at_ns
let _ = tx . send ( Ok ( volume_server_pb ::VolumeCopyResponse {
last_append_at_ns : last_append_at_ns ,
processed_bytes : 0 ,
} ) ) . await ;
Ok ::< ( ) , Status > ( ( ) )
} . await ;
if let Err ( e ) = result {
// Clean up on error
let _ = std ::fs ::remove_file ( format ! ( "{}.dat" , data_base_name ) ) ;
let _ = std ::fs ::remove_file ( format ! ( "{}.idx" , idx_base_name ) ) ;
let _ = std ::fs ::remove_file ( format ! ( "{}.vif" , data_base_name ) ) ;
let _ = std ::fs ::remove_file ( & note_path ) ;
let _ = tx . send ( Err ( e ) ) . await ;
}
} ) ;
let stream = tokio_stream ::wrappers ::ReceiverStream ::new ( rx ) ;
Ok ( Response ::new ( Box ::pin ( stream ) ) )
}
async fn read_volume_file_status (
@ -998,11 +1154,76 @@ impl VolumeServer for VolumeGrpcService {
) -> Result < Response < volume_server_pb ::VolumeTailReceiverResponse > , Status > {
let req = request . into_inner ( ) ;
let vid = VolumeId ( req . volume_id ) ;
let store = self . state . store . read ( ) . unwrap ( ) ;
store . find_volume ( vid )
. ok_or_else ( | | Status ::not_found ( format ! ( "receiver not found volume id {}" , vid ) ) ) ? ;
drop ( store ) ;
Err ( Status ::unimplemented ( "volume_tail_receiver not yet implemented" ) )
// Check volume exists
{
let store = self . state . store . read ( ) . unwrap ( ) ;
store . find_volume ( vid )
. ok_or_else ( | | Status ::not_found ( format ! ( "receiver not found volume id {}" , vid ) ) ) ? ;
}
// Parse source address and connect
let source = & req . source_volume_server ;
let grpc_addr = parse_grpc_address ( source )
. map_err ( | e | Status ::internal ( format ! ( "invalid source address {}: {}" , source , e ) ) ) ? ;
let channel = tonic ::transport ::Channel ::from_shared ( format ! ( "http://{}" , grpc_addr ) )
. map_err ( | e | Status ::internal ( format ! ( "parse source: {}" , e ) ) ) ?
. connect ( )
. await
. map_err ( | e | Status ::internal ( format ! ( "connect to {}: {}" , grpc_addr , e ) ) ) ? ;
let mut client = volume_server_pb ::volume_server_client ::VolumeServerClient ::new ( channel ) ;
// Call VolumeTailSender on source
let mut stream = client . volume_tail_sender ( volume_server_pb ::VolumeTailSenderRequest {
volume_id : req . volume_id ,
since_ns : req . since_ns ,
idle_timeout_seconds : req . idle_timeout_seconds ,
} ) . await
. map_err ( | e | Status ::internal ( format ! ( "volume_tail_sender: {}" , e ) ) ) ?
. into_inner ( ) ;
let state = self . state . clone ( ) ;
// Receive needles from source and write locally
while let Some ( resp ) = stream . message ( ) . await
. map_err ( | e | Status ::internal ( format ! ( "recv from tail sender: {}" , e ) ) ) ? {
let needle_header = resp . needle_header ;
let mut needle_body = resp . needle_body ;
if needle_header . is_empty ( ) {
continue ;
}
// Collect all chunks if not last
if ! resp . is_last_chunk {
// Need to receive remaining chunks
loop {
let chunk = stream . message ( ) . await
. map_err ( | e | Status ::internal ( format ! ( "recv chunk: {}" , e ) ) ) ?
. ok_or_else ( | | Status ::internal ( "unexpected end of tail stream" ) ) ? ;
needle_body . extend_from_slice ( & chunk . needle_body ) ;
if chunk . is_last_chunk {
break ;
}
}
}
// Parse needle from header + body
let mut n = Needle ::default ( ) ;
n . read_header ( & needle_header ) ;
n . read_body_v2 ( & needle_body )
. map_err ( | e | Status ::internal ( format ! ( "parse needle body: {}" , e ) ) ) ? ;
// Write needle to local volume
let mut store = state . store . write ( ) . unwrap ( ) ;
store . write_volume_needle ( vid , & mut n )
. map_err ( | e | Status ::internal ( format ! ( "write needle: {}" , e ) ) ) ? ;
}
Ok ( Response ::new ( volume_server_pb ::VolumeTailReceiverResponse { } ) )
}
// ---- EC operations ----
@ -1950,3 +2171,159 @@ async fn ping_grpc_target(target: &str) -> Result<i64, String> {
Err ( e ) = > Err ( e . to_string ( ) ) ,
}
}
/// Parse a SeaweedFS server address ("ip:port.grpcPort" or "ip:port") into a gRPC address.
fn parse_grpc_address ( source : & str ) -> Result < String , String > {
if let Some ( colon_idx ) = source . rfind ( ':' ) {
let port_part = & source [ colon_idx + 1 . . ] ;
if let Some ( dot_idx ) = port_part . rfind ( '.' ) {
// Format: "ip:port.grpcPort"
let host = & source [ . . colon_idx ] ;
let grpc_port = & port_part [ dot_idx + 1 . . ] ;
grpc_port . parse ::< u16 > ( ) . map_err ( | e | format ! ( "invalid grpc port: {}" , e ) ) ? ;
return Ok ( format ! ( "{}:{}" , host , grpc_port ) ) ;
}
// Format: "ip:port" → grpc = port + 10000
let port : u16 = port_part . parse ( ) . map_err ( | e | format ! ( "invalid port: {}" , e ) ) ? ;
let grpc_port = port as u32 + 10000 ;
let host = & source [ . . colon_idx ] ;
return Ok ( format ! ( "{}:{}" , host , grpc_port ) ) ;
}
Err ( format ! ( "cannot parse address: {}" , source ) )
}
/// Copy a file from a remote volume server via CopyFile streaming RPC.
async fn copy_file_from_source (
client : & mut volume_server_pb ::volume_server_client ::VolumeServerClient < tonic ::transport ::Channel > ,
is_ec_volume : bool ,
collection : & str ,
volume_id : u32 ,
compaction_revision : u32 ,
stop_offset : u64 ,
dest_path : & str ,
ext : & str ,
_is_append : bool ,
ignore_source_not_found : bool ,
progress_tx : Option < & tokio ::sync ::mpsc ::Sender < Result < volume_server_pb ::VolumeCopyResponse , Status > > > ,
next_report_target : & mut i64 ,
report_interval : i64 ,
) -> Result < ( ) , String > {
let copy_req = volume_server_pb ::CopyFileRequest {
volume_id ,
ext : ext . to_string ( ) ,
compaction_revision ,
stop_offset ,
collection : collection . to_string ( ) ,
is_ec_volume ,
ignore_source_file_not_found : ignore_source_not_found ,
} ;
let mut stream = client . copy_file ( copy_req ) . await
. map_err ( | e | format ! ( "failed to start copying volume {} {} file: {}" , volume_id , ext , e ) ) ?
. into_inner ( ) ;
let mut file = std ::fs ::OpenOptions ::new ( )
. write ( true )
. create ( true )
. truncate ( true )
. open ( dest_path )
. map_err ( | e | format ! ( "open file {}: {}" , dest_path , e ) ) ? ;
let mut progressed_bytes : i64 = 0 ;
let mut got_modified_ts = false ;
while let Some ( resp ) = stream . message ( ) . await
. map_err ( | e | format ! ( "receiving {}: {}" , dest_path , e ) ) ? {
if resp . modified_ts_ns ! = 0 {
got_modified_ts = true ;
}
if ! resp . file_content . is_empty ( ) {
use std ::io ::Write ;
file . write_all ( & resp . file_content )
. map_err ( | e | format ! ( "write file {}: {}" , dest_path , e ) ) ? ;
progressed_bytes + = resp . file_content . len ( ) as i64 ;
if let Some ( tx ) = progress_tx {
if progressed_bytes > * next_report_target {
let _ = tx . send ( Ok ( volume_server_pb ::VolumeCopyResponse {
last_append_at_ns : 0 ,
processed_bytes : progressed_bytes ,
} ) ) . await ;
* next_report_target = progressed_bytes + report_interval ;
}
}
}
}
// If source file didn't exist (no modifiedTsNs received), remove empty file
if ! got_modified_ts {
let _ = std ::fs ::remove_file ( dest_path ) ;
}
Ok ( ( ) )
}
/// Verify that a copied file has the expected size.
fn check_copy_file_size ( path : & str , expected : u64 ) -> Result < ( ) , Status > {
match std ::fs ::metadata ( path ) {
Ok ( meta ) = > {
if meta . len ( ) ! = expected {
Err ( Status ::internal ( format ! (
"file {} size [{}] is not same as origin file size [{}]" ,
path , meta . len ( ) , expected
) ) )
} else {
Ok ( ( ) )
}
}
Err ( e ) if e . kind ( ) = = std ::io ::ErrorKind ::NotFound & & expected = = 0 = > Ok ( ( ) ) ,
Err ( e ) = > Err ( Status ::internal ( format ! ( "stat file {} failed: {}" , path , e ) ) ) ,
}
}
/// Find the last append timestamp from copied .idx and .dat files.
fn find_last_append_at_ns ( idx_path : & str , dat_path : & str ) -> Option < u64 > {
use std ::io ::{ Read , Seek , SeekFrom } ;
let mut idx_file = std ::fs ::File ::open ( idx_path ) . ok ( ) ? ;
let idx_size = idx_file . metadata ( ) . ok ( ) ? . len ( ) ;
if idx_size = = 0 | | idx_size % ( NEEDLE_MAP_ENTRY_SIZE as u64 ) ! = 0 {
return None ;
}
// Read the last index entry
let mut buf = [ 0 u8 ; NEEDLE_MAP_ENTRY_SIZE ] ;
idx_file . seek ( SeekFrom ::End ( - ( NEEDLE_MAP_ENTRY_SIZE as i64 ) ) ) . ok ( ) ? ;
idx_file . read_exact ( & mut buf ) . ok ( ) ? ;
let ( _key , offset , _size ) = idx_entry_from_bytes ( & buf ) ;
if offset . is_zero ( ) {
return None ;
}
// Read needle header from .dat to get the append timestamp
let mut dat_file = std ::fs ::File ::open ( dat_path ) . ok ( ) ? ;
let actual_offset = offset . to_actual_offset ( ) ;
// Skip to the needle at the given offset, read header to get size
dat_file . seek ( SeekFrom ::Start ( actual_offset as u64 ) ) . ok ( ) ? ;
// Read cookie (4) + id (8) + size (4) = 16 bytes header
let mut header = [ 0 u8 ; 16 ] ;
dat_file . read_exact ( & mut header ) . ok ( ) ? ;
let needle_size = i32 ::from_be_bytes ( [ header [ 12 ] , header [ 13 ] , header [ 14 ] , header [ 15 ] ] ) ;
if needle_size < = 0 {
return None ;
}
// Seek to tail: offset + 16 (header) + size -> checksum (4) + timestamp (8)
let tail_offset = actual_offset as u64 + 16 + needle_size as u64 ;
dat_file . seek ( SeekFrom ::Start ( tail_offset ) ) . ok ( ) ? ;
let mut tail = [ 0 u8 ; 12 ] ; // 4 bytes checksum + 8 bytes timestamp
dat_file . read_exact ( & mut tail ) . ok ( ) ? ;
// Timestamp is the last 8 bytes, big-endian
let ts = u64 ::from_be_bytes ( [ tail [ 4 ] , tail [ 5 ] , tail [ 6 ] , tail [ 7 ] , tail [ 8 ] , tail [ 9 ] , tail [ 10 ] , tail [ 11 ] ] ) ;
if ts > 0 { Some ( ts ) } else { None }
}