@ -101,6 +101,15 @@ struct StreamingBody {
/// For download throttling — released on drop.
/// For download throttling — released on drop.
state : Option < Arc < VolumeServerState > > ,
state : Option < Arc < VolumeServerState > > ,
tracked_bytes : i64 ,
tracked_bytes : i64 ,
/// Server state used to re-lookup needle offset if compaction occurs during streaming.
server_state : Arc < VolumeServerState > ,
/// Volume ID for compaction-revision re-lookup.
volume_id : crate ::storage ::types ::VolumeId ,
/// Needle ID for compaction-revision re-lookup.
needle_id : crate ::storage ::types ::NeedleId ,
/// Compaction revision at the time of the initial read; if the volume's revision
/// changes between chunks, the needle may have moved and we must re-lookup its offset.
compaction_revision : u16 ,
}
}
impl http_body ::Body for StreamingBody {
impl http_body ::Body for StreamingBody {
@ -143,6 +152,36 @@ impl http_body::Body for StreamingBody {
return std ::task ::Poll ::Ready ( None ) ;
return std ::task ::Poll ::Ready ( None ) ;
}
}
// Check if compaction has changed the needle's disk location (Go parity:
// readNeedleDataInto re-reads the needle offset when CompactionRevision changes).
let relookup_result = {
let store = self . server_state . store . read ( ) . unwrap ( ) ;
if let Some ( ( _ , vol ) ) = store . find_volume ( self . volume_id ) {
if vol . super_block . compaction_revision ! = self . compaction_revision {
// Compaction occurred — re-lookup the needle's data offset
Some ( vol . re_lookup_needle_data_offset ( self . needle_id ) )
} else {
None
}
} else {
None
}
} ;
if let Some ( result ) = relookup_result {
match result {
Ok ( ( new_offset , new_rev ) ) = > {
self . data_offset = new_offset ;
self . compaction_revision = new_rev ;
}
Err ( _ ) = > {
return std ::task ::Poll ::Ready ( Some ( Err ( std ::io ::Error ::new (
std ::io ::ErrorKind ::NotFound ,
"needle not found after compaction" ,
) ) ) ) ;
}
}
}
let chunk_len = std ::cmp ::min ( self . chunk_size , total - self . pos ) ;
let chunk_len = std ::cmp ::min ( self . chunk_size , total - self . pos ) ;
let file_offset = self . data_offset + self . pos as u64 ;
let file_offset = self . data_offset + self . pos as u64 ;
@ -1038,6 +1077,10 @@ async fn get_or_head_handler_inner(
pending : None ,
pending : None ,
state : tracking_state ,
state : tracking_state ,
tracked_bytes ,
tracked_bytes ,
server_state : state . clone ( ) ,
volume_id : info . volume_id ,
needle_id : info . needle_id ,
compaction_revision : info . compaction_revision ,
} ;
} ;
let body = Body ::new ( streaming ) ;
let body = Body ::new ( streaming ) ;
@ -2138,7 +2181,7 @@ pub async fn status_handler(
let json_value = serde_json ::Value ::Object ( m ) ;
let json_value = serde_json ::Value ::Object ( m ) ;
let is_pretty = params . pretty . as_de ref ( ) = = Some ( "y" ) ;
let is_pretty = params . pretty . as_ref ( ) . is_some_and ( | s | ! s . is_empty ( ) ) ;
let json_body = if is_pretty {
let json_body = if is_pretty {
serde_json ::to_string_pretty ( & json_value ) . unwrap ( )
serde_json ::to_string_pretty ( & json_value ) . unwrap ( )
} else {
} else {
@ -2146,7 +2189,7 @@ pub async fn status_handler(
} ;
} ;
if let Some ( ref cb ) = params . callback {
if let Some ( ref cb ) = params . callback {
let jsonp = format ! ( "{}({});\n " , cb , json_body ) ;
let jsonp = format ! ( "{}({})" , cb , json_body ) ;
Response ::builder ( )
Response ::builder ( )
. header ( header ::CONTENT_TYPE , "application/javascript" )
. header ( header ::CONTENT_TYPE , "application/javascript" )
. body ( Body ::from ( jsonp ) )
. body ( Body ::from ( jsonp ) )
@ -2427,7 +2470,7 @@ fn try_expand_chunk_manifest(
// ============================================================================
// ============================================================================
/// Return a JSON error response with optional query string for pretty/JSONP support.
/// Return a JSON error response with optional query string for pretty/JSONP support.
/// Supports `?pretty=y` for pretty-printed JSON and `?callback=fn` for JSONP,
/// Supports `?pretty=<an y non-empty value> ` for pretty-printed JSON and `?callback=fn` for JSONP,
/// matching Go's writeJsonError behavior.
/// matching Go's writeJsonError behavior.
fn json_error_with_query (
fn json_error_with_query (
status : StatusCode ,
status : StatusCode ,
@ -2437,7 +2480,9 @@ fn json_error_with_query(
let body = serde_json ::json ! ( { "error" : msg . into ( ) } ) ;
let body = serde_json ::json ! ( { "error" : msg . into ( ) } ) ;
let ( is_pretty , callback ) = if let Some ( q ) = query {
let ( is_pretty , callback ) = if let Some ( q ) = query {
let pretty = q . split ( '&' ) . any ( | p | p = = "pretty=y" ) ;
let pretty = q . split ( '&' ) . any ( | p | {
p . starts_with ( "pretty=" ) & & p . len ( ) > "pretty=" . len ( )
} ) ;
let cb = q
let cb = q
. split ( '&' )
. split ( '&' )
. find_map ( | p | p . strip_prefix ( "callback=" ) )
. find_map ( | p | p . strip_prefix ( "callback=" ) )
@ -2454,7 +2499,7 @@ fn json_error_with_query(
} ;
} ;
if let Some ( cb ) = callback {
if let Some ( cb ) = callback {
let jsonp = format ! ( "{}({});\n " , cb , json_body ) ;
let jsonp = format ! ( "{}({})" , cb , json_body ) ;
Response ::builder ( )
Response ::builder ( )
. status ( status )
. status ( status )
. header ( header ::CONTENT_TYPE , "application/javascript" )
. header ( header ::CONTENT_TYPE , "application/javascript" )