@ -61,16 +61,23 @@ func NewStreamingCopyManager(s3a *S3ApiServer) *StreamingCopyManager {
// ExecuteStreamingCopy performs a streaming copy operation
// ExecuteStreamingCopy performs a streaming copy operation
func ( scm * StreamingCopyManager ) ExecuteStreamingCopy ( ctx context . Context , entry * filer_pb . Entry , r * http . Request , dstPath string , state * EncryptionState ) ( [ ] * filer_pb . FileChunk , error ) {
func ( scm * StreamingCopyManager ) ExecuteStreamingCopy ( ctx context . Context , entry * filer_pb . Entry , r * http . Request , dstPath string , state * EncryptionState ) ( [ ] * filer_pb . FileChunk , error ) {
chunks , _ , err := scm . ExecuteStreamingCopyWithMetadata ( ctx , entry , r , dstPath , state )
return chunks , err
}
// ExecuteStreamingCopyWithMetadata performs a streaming copy operation and returns the encryption spec
// This is needed for SSE-S3 to properly set destination metadata (fixes GitHub #7562)
func ( scm * StreamingCopyManager ) ExecuteStreamingCopyWithMetadata ( ctx context . Context , entry * filer_pb . Entry , r * http . Request , dstPath string , state * EncryptionState ) ( [ ] * filer_pb . FileChunk , * EncryptionSpec , error ) {
// Create streaming copy specification
// Create streaming copy specification
spec , err := scm . createStreamingSpec ( entry , r , state )
spec , err := scm . createStreamingSpec ( entry , r , state )
if err != nil {
if err != nil {
return nil , fmt . Errorf ( "create streaming spec: %w" , err )
return nil , nil , fmt . Errorf ( "create streaming spec: %w" , err )
}
}
// Create source reader from entry
// Create source reader from entry
sourceReader , err := scm . createSourceReader ( entry )
sourceReader , err := scm . createSourceReader ( entry )
if err != nil {
if err != nil {
return nil , fmt . Errorf ( "create source reader: %w" , err )
return nil , nil , fmt . Errorf ( "create source reader: %w" , err )
}
}
defer sourceReader . Close ( )
defer sourceReader . Close ( )
@ -79,11 +86,16 @@ func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry
// Create processing pipeline
// Create processing pipeline
processedReader , err := scm . createProcessingPipeline ( spec )
processedReader , err := scm . createProcessingPipeline ( spec )
if err != nil {
if err != nil {
return nil , fmt . Errorf ( "create processing pipeline: %w" , err )
return nil , nil , fmt . Errorf ( "create processing pipeline: %w" , err )
}
}
// Stream to destination
// Stream to destination
return scm . streamToDestination ( ctx , processedReader , spec , dstPath )
chunks , err := scm . streamToDestination ( ctx , processedReader , spec , dstPath )
if err != nil {
return nil , nil , err
}
return chunks , spec . EncryptionSpec , nil
}
}
// createStreamingSpec creates a streaming specification based on copy parameters
// createStreamingSpec creates a streaming specification based on copy parameters
@ -453,8 +465,8 @@ func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.R
for {
for {
n , err := reader . Read ( buffer )
n , err := reader . Read ( buffer )
if n > 0 {
if n > 0 {
// Create chunk for this data
chunk , chunkErr := scm . createChunkFromData ( buffer [ : n ] , offset , dstPath )
// Create chunk for this data, passing encryption spec for SSE type
chunk , chunkErr := scm . createChunkFromData ( buffer [ : n ] , offset , dstPath , spec . EncryptionSpec )
if chunkErr != nil {
if chunkErr != nil {
return nil , fmt . Errorf ( "create chunk from data: %w" , chunkErr )
return nil , fmt . Errorf ( "create chunk from data: %w" , chunkErr )
}
}
@ -474,7 +486,7 @@ func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.R
}
}
// createChunkFromData creates a chunk from streaming data
// createChunkFromData creates a chunk from streaming data
func ( scm * StreamingCopyManager ) createChunkFromData ( data [ ] byte , offset int64 , dstPath string ) ( * filer_pb . FileChunk , error ) {
func ( scm * StreamingCopyManager ) createChunkFromData ( data [ ] byte , offset int64 , dstPath string , encSpec * EncryptionSpec ) ( * filer_pb . FileChunk , error ) {
// Assign new volume
// Assign new volume
assignResult , err := scm . s3a . assignNewVolume ( dstPath )
assignResult , err := scm . s3a . assignNewVolume ( dstPath )
if err != nil {
if err != nil {
@ -487,6 +499,39 @@ func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64,
Size : uint64 ( len ( data ) ) ,
Size : uint64 ( len ( data ) ) ,
}
}
// Set SSE type and metadata on chunk if destination is encrypted
// This is critical for GetObject to know to decrypt the data - fixes GitHub #7562
if encSpec != nil && encSpec . NeedsEncryption {
switch encSpec . DestinationType {
case EncryptionTypeSSEC :
chunk . SseType = filer_pb . SSEType_SSE_C
// SSE-C metadata is handled at object level, not per-chunk for streaming copy
case EncryptionTypeSSEKMS :
chunk . SseType = filer_pb . SSEType_SSE_KMS
// SSE-KMS metadata is handled at object level, not per-chunk for streaming copy
case EncryptionTypeSSES3 :
chunk . SseType = filer_pb . SSEType_SSE_S3
// Create per-chunk SSE-S3 metadata with chunk-specific IV
if sseKey , ok := encSpec . DestinationKey . ( * SSES3Key ) ; ok {
// Calculate chunk-specific IV using base IV and chunk offset
baseIV := encSpec . DestinationIV
if len ( baseIV ) > 0 {
chunkIV , _ := calculateIVWithOffset ( baseIV , offset )
// Create chunk key with the chunk-specific IV
chunkSSEKey := & SSES3Key {
Key : sseKey . Key ,
KeyID : sseKey . KeyID ,
Algorithm : sseKey . Algorithm ,
IV : chunkIV ,
}
if chunkMetadata , serErr := SerializeSSES3Metadata ( chunkSSEKey ) ; serErr == nil {
chunk . SseMetadata = chunkMetadata
}
}
}
}
}
// Set file ID
// Set file ID
if err := scm . s3a . setChunkFileId ( chunk , assignResult ) ; err != nil {
if err := scm . s3a . setChunkFileId ( chunk , assignResult ) ; err != nil {
return nil , err
return nil , err