Browse Source

Use chunk-by-chunk encryption for SSE-S3 copy (consistent with SSE-C/SSE-KMS)

Instead of streaming encryption (which had IV mismatch issues for multi-chunk
files), SSE-S3 now uses the same chunk-by-chunk approach as SSE-C and SSE-KMS:

1. Extended copyMultipartCrossEncryption to handle SSE-S3:
   - Added SSE-S3 source decryption in copyCrossEncryptionChunk
   - Added SSE-S3 destination encryption with per-chunk IVs
   - Added object-level metadata generation for SSE-S3 destinations

2. Updated routing in executeEncryptCopy/executeDecryptCopy/executeReencryptCopy
   to use copyMultipartCrossEncryption for all SSE-S3 scenarios

3. Removed streaming copy functions (shouldUseStreamingCopy,
   executeStreamingReencryptCopy) as they're no longer used

4. Added large file (1MB) integration test to verify chunk-by-chunk copy works

This ensures consistent behavior across all SSE types and fixes data corruption
that occurred with large files in the streaming copy approach.
pull/7598/head
chrislu 2 days ago
parent
commit
e8b57780d2
  1. 78
      test/s3/sse/s3_sse_integration_test.go
  2. 95
      weed/s3api/s3api_object_handlers_copy.go
  3. 130
      weed/s3api/s3api_object_handlers_copy_unified.go

78
test/s3/sse/s3_sse_integration_test.go

@ -2083,13 +2083,77 @@ func TestCopyToBucketDefaultEncryptedRegression(t *testing.T) {
assertDataEqual(t, testData, data, "Data mismatch")
})
// TODO: Large file SSE-S3 copy has a known issue with streaming encryption
// The streaming copy encrypts the entire stream with a single IV, but stores
// data in multiple chunks with calculated per-chunk IVs. This causes decryption
// to fail because each chunk tries to decrypt with its per-chunk IV, but the
// data was encrypted with the base IV. This needs architectural changes to fix:
// either use chunk-by-chunk encryption like SSE-C/SSE-KMS, or store a single IV.
// For now, small inline files work correctly (the original #7562 bug fix).
t.Run("LargeFileCopyEncrypted_ToTemp_ToEncrypted", func(t *testing.T) {
// Test with large file (1MB) to exercise chunk-by-chunk copy path
// This verifies consistent behavior with SSE-C and SSE-KMS
largeTestData := generateTestData(1024 * 1024) // 1MB
objectKey := "large-file-test.bin"
// Step 1: Upload large object to source bucket (will be automatically encrypted)
_, err = client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(srcBucket),
Key: aws.String(objectKey),
Body: bytes.NewReader(largeTestData),
})
require.NoError(t, err, "Failed to upload large file to source bucket")
// Verify source object is encrypted
srcHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(srcBucket),
Key: aws.String(objectKey),
})
require.NoError(t, err, "Failed to HEAD source object")
assert.Equal(t, types.ServerSideEncryptionAes256, srcHead.ServerSideEncryption,
"Source object should be SSE-S3 encrypted")
// Step 2: Copy to temp bucket (unencrypted) - exercises chunk-by-chunk decrypt
_, err = client.CopyObject(ctx, &s3.CopyObjectInput{
Bucket: aws.String(tempBucket),
Key: aws.String(objectKey),
CopySource: aws.String(fmt.Sprintf("%s/%s", srcBucket, objectKey)),
})
require.NoError(t, err, "Failed to copy large file to temp bucket")
// Verify temp object is unencrypted and data is correct
tempGet, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(tempBucket),
Key: aws.String(objectKey),
})
require.NoError(t, err, "Failed to GET temp object")
tempData, err := io.ReadAll(tempGet.Body)
tempGet.Body.Close()
require.NoError(t, err, "Failed to read temp object")
assertDataEqual(t, largeTestData, tempData, "Temp object data mismatch after decrypt")
// Step 3: Copy from temp bucket to dest bucket (with default encryption)
// This exercises chunk-by-chunk encrypt copy
_, err = client.CopyObject(ctx, &s3.CopyObjectInput{
Bucket: aws.String(dstBucket),
Key: aws.String(objectKey),
CopySource: aws.String(fmt.Sprintf("%s/%s", tempBucket, objectKey)),
})
require.NoError(t, err, "Failed to copy large file to destination bucket")
// Verify destination object is encrypted
dstHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(dstBucket),
Key: aws.String(objectKey),
})
require.NoError(t, err, "Failed to HEAD destination object")
assert.Equal(t, types.ServerSideEncryptionAes256, dstHead.ServerSideEncryption,
"Destination object should be SSE-S3 encrypted via bucket default")
// Verify destination object content is correct after re-encryption
dstGet, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(dstBucket),
Key: aws.String(objectKey),
})
require.NoError(t, err, "Failed to GET destination object")
dstData, err := io.ReadAll(dstGet.Body)
dstGet.Body.Close()
require.NoError(t, err, "Failed to read destination object")
assertDataEqual(t, largeTestData, dstData, "Large file data mismatch after re-encryption")
})
}
// REGRESSION TESTS FOR CRITICAL BUGS FIXED

95
weed/s3api/s3api_object_handlers_copy.go

@ -1524,7 +1524,7 @@ func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySo
}
// copyMultipartCrossEncryption handles all cross-encryption and decrypt-only copy scenarios
// This unified function supports: SSE-C↔SSE-KMS, SSE-C→Plain, SSE-KMS→Plain
// This unified function supports: SSE-C↔SSE-KMS↔SSE-S3, and any→Plain
func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
var dstChunks []*filer_pb.FileChunk
@ -1533,6 +1533,7 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
var destKMSKeyID string
var destKMSEncryptionContext map[string]string
var destKMSBucketKeyEnabled bool
var destSSES3Key *SSES3Key
if state.DstSSEC {
var err error
@ -1546,7 +1547,13 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
if err != nil {
return nil, nil, fmt.Errorf("failed to parse destination SSE-KMS headers: %w", err)
}
} else {
} else if state.DstSSES3 {
// Generate SSE-S3 key for destination
var err error
destSSES3Key, err = GenerateSSES3Key()
if err != nil {
return nil, nil, fmt.Errorf("failed to generate SSE-S3 key: %w", err)
}
}
// Parse source encryption parameters
@ -1565,12 +1572,18 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
var err error
if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, sourceSSECKey, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state)
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, sourceSSECKey, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
} else if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS {
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state)
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
} else if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 {
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
} else {
// Unencrypted chunk, copy directly
copiedChunk, err = s3a.copySingleChunk(chunk, dstPath)
// Unencrypted chunk - may need encryption if destination requires it
if state.DstSSEC || state.DstSSEKMS || state.DstSSES3 {
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
} else {
copiedChunk, err = s3a.copySingleChunk(chunk, dstPath)
}
}
if err != nil {
@ -1621,6 +1634,20 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
} else {
glog.Errorf("Failed to serialize SSE-KMS metadata: %v", serErr)
}
} else if state.DstSSES3 && destSSES3Key != nil {
// For SSE-S3 destination, create object-level metadata using first chunk's IV
if len(dstChunks) > 0 && dstChunks[0].GetSseType() == filer_pb.SSEType_SSE_S3 && len(dstChunks[0].GetSseMetadata()) > 0 {
keyManager := GetSSES3KeyManager()
if sses3Metadata, err := DeserializeSSES3Metadata(dstChunks[0].GetSseMetadata(), keyManager); err == nil {
// Use the first chunk's key with its IV for object-level metadata
keyData, serErr := SerializeSSES3Metadata(sses3Metadata)
if serErr != nil {
return nil, nil, fmt.Errorf("failed to serialize SSE-S3 metadata: %w", serErr)
}
dstMetadata[s3_constants.SeaweedFSSSES3Key] = keyData
dstMetadata[s3_constants.AmzServerSideEncryption] = []byte("AES256")
}
}
}
// For unencrypted destination, no metadata needed (dstMetadata remains empty)
@ -1628,7 +1655,7 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
}
// copyCrossEncryptionChunk handles copying a single chunk with cross-encryption support
func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sourceSSECKey *SSECustomerKey, destSSECKey *SSECustomerKey, destKMSKeyID string, destKMSEncryptionContext map[string]string, destKMSBucketKeyEnabled bool, dstPath, dstBucket string, state *EncryptionState) (*filer_pb.FileChunk, error) {
func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sourceSSECKey *SSECustomerKey, destSSECKey *SSECustomerKey, destKMSKeyID string, destKMSEncryptionContext map[string]string, destKMSBucketKeyEnabled bool, destSSES3Key *SSES3Key, dstPath, dstBucket string, state *EncryptionState) (*filer_pb.FileChunk, error) {
// Create destination chunk
dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
@ -1728,6 +1755,30 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour
previewLen = len(finalData)
}
} else if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 {
// Decrypt SSE-S3 source
if len(chunk.GetSseMetadata()) == 0 {
return nil, fmt.Errorf("SSE-S3 chunk missing per-chunk metadata")
}
keyManager := GetSSES3KeyManager()
sourceSSEKey, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager)
if err != nil {
return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err)
}
decryptedReader, decErr := CreateSSES3DecryptedReader(bytes.NewReader(encryptedData), sourceSSEKey, sourceSSEKey.IV)
if decErr != nil {
return nil, fmt.Errorf("create SSE-S3 decrypted reader: %w", decErr)
}
decryptedData, readErr := io.ReadAll(decryptedReader)
if readErr != nil {
return nil, fmt.Errorf("decrypt SSE-S3 chunk data: %w", readErr)
}
finalData = decryptedData
glog.V(4).Infof("Decrypted SSE-S3 chunk, size: %d", len(finalData))
} else {
// Source is unencrypted
finalData = encryptedData
@ -1789,6 +1840,36 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour
dstChunk.SseMetadata = kmsMetadata
glog.V(4).Infof("Re-encrypted chunk with SSE-KMS")
} else if state.DstSSES3 && destSSES3Key != nil {
// Encrypt with SSE-S3
encryptedReader, iv, encErr := CreateSSES3EncryptedReader(bytes.NewReader(finalData), destSSES3Key)
if encErr != nil {
return nil, fmt.Errorf("create SSE-S3 encrypted reader: %w", encErr)
}
reencryptedData, readErr := io.ReadAll(encryptedReader)
if readErr != nil {
return nil, fmt.Errorf("re-encrypt with SSE-S3: %w", readErr)
}
finalData = reencryptedData
// Create per-chunk SSE-S3 metadata with chunk-specific IV
chunkSSEKey := &SSES3Key{
Key: destSSES3Key.Key,
KeyID: destSSES3Key.KeyID,
Algorithm: destSSES3Key.Algorithm,
IV: iv,
}
sses3Metadata, err := SerializeSSES3Metadata(chunkSSEKey)
if err != nil {
return nil, fmt.Errorf("serialize SSE-S3 metadata: %w", err)
}
dstChunk.SseType = filer_pb.SSEType_SSE_S3
dstChunk.SseMetadata = sses3Metadata
glog.V(4).Infof("Re-encrypted chunk with SSE-S3")
}
// For unencrypted destination, finalData remains as decrypted plaintext

130
weed/s3api/s3api_object_handlers_copy_unified.go

@ -1,14 +1,12 @@
package s3api
import (
"context"
"errors"
"fmt"
"net/http"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
weed_server "github.com/seaweedfs/seaweedfs/weed/server"
)
@ -134,30 +132,9 @@ func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Reques
}
if state.DstSSES3 {
// Use streaming copy for SSE-S3 encryption
chunks, encSpec, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
if err != nil {
return nil, nil, err
}
// Generate SSE-S3 destination metadata from the encryption spec
dstMetadata := make(map[string][]byte)
if encSpec != nil && encSpec.DestinationKey != nil {
if sseKey, ok := encSpec.DestinationKey.(*SSES3Key); ok {
// Store the IV on the key before serialization
if len(encSpec.DestinationIV) > 0 {
sseKey.IV = encSpec.DestinationIV
}
keyData, serErr := SerializeSSES3Metadata(sseKey)
if serErr != nil {
return nil, nil, fmt.Errorf("failed to serialize SSE-S3 metadata: %w", serErr)
}
dstMetadata[s3_constants.SeaweedFSSSES3Key] = keyData
dstMetadata[s3_constants.AmzServerSideEncryption] = []byte("AES256")
glog.V(3).Infof("Generated SSE-S3 metadata for streaming encrypt copy: %s", dstPath)
}
}
return chunks, dstMetadata, nil
// Use chunk-by-chunk copy for SSE-S3 encryption (consistent with SSE-C and SSE-KMS)
glog.V(2).Infof("Plain→SSE-S3 copy: using unified multipart encrypt copy")
return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
}
return nil, nil, fmt.Errorf("unknown target encryption type")
@ -165,30 +142,18 @@ func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Reques
// executeDecryptCopy handles encrypted → plain copies
func (s3a *S3ApiServer) executeDecryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
// Use unified multipart-aware decrypt copy for all encryption types
if state.SrcSSEC || state.SrcSSEKMS {
// Use unified multipart-aware decrypt copy for all encryption types (consistent chunk-by-chunk)
if state.SrcSSEC || state.SrcSSEKMS || state.SrcSSES3 {
glog.V(2).Infof("Encrypted→Plain copy: using unified multipart decrypt copy")
return s3a.copyMultipartCrossEncryption(entry, r, state, "", dstPath)
}
if state.SrcSSES3 {
// Use streaming copy for SSE-S3 decryption
chunks, _, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
return chunks, nil, err
}
return nil, nil, fmt.Errorf("unknown source encryption type")
}
// executeReencryptCopy handles encrypted → encrypted copies with different keys/methods
func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
// Check if we should use streaming copy for better performance
if s3a.shouldUseStreamingCopy(entry, state) {
chunks, _, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
return chunks, nil, err
}
// Fallback to chunk-by-chunk approach for compatibility
// Use chunk-by-chunk approach for all cross-encryption scenarios (consistent behavior)
if state.SrcSSEC && state.DstSSEC {
return s3a.copyChunksWithSSEC(entry, r)
}
@ -199,84 +164,9 @@ func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Requ
return chunks, dstMetadata, err
}
if state.SrcSSEC && state.DstSSEKMS {
// SSE-C → SSE-KMS: use unified multipart-aware cross-encryption copy
glog.V(2).Infof("SSE-C→SSE-KMS cross-encryption copy: using unified multipart copy")
return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
}
if state.SrcSSEKMS && state.DstSSEC {
// SSE-KMS → SSE-C: use unified multipart-aware cross-encryption copy
glog.V(2).Infof("SSE-KMS→SSE-C cross-encryption copy: using unified multipart copy")
return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
}
// Handle SSE-S3 cross-encryption scenarios
if state.SrcSSES3 || state.DstSSES3 {
// Any scenario involving SSE-S3 uses streaming copy
chunks, _, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
return chunks, nil, err
}
return nil, nil, fmt.Errorf("unsupported cross-encryption scenario")
// All other cross-encryption scenarios use unified multipart copy
// This includes: SSE-C↔SSE-KMS, SSE-C↔SSE-S3, SSE-KMS↔SSE-S3, SSE-S3↔SSE-S3
glog.V(2).Infof("Cross-encryption copy: using unified multipart copy")
return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
}
// shouldUseStreamingCopy determines if streaming copy should be used
func (s3a *S3ApiServer) shouldUseStreamingCopy(entry *filer_pb.Entry, state *EncryptionState) bool {
// Use streaming copy for large files or when beneficial
fileSize := entry.Attributes.FileSize
// Use streaming for files larger than 10MB
if fileSize > 10*1024*1024 {
return true
}
// Check if this is a multipart encrypted object
isMultipartEncrypted := false
if state.IsSourceEncrypted() {
encryptedChunks := 0
for _, chunk := range entry.GetChunks() {
if chunk.GetSseType() != filer_pb.SSEType_NONE {
encryptedChunks++
}
}
isMultipartEncrypted = encryptedChunks > 1
}
// For multipart encrypted objects, avoid streaming copy to use per-chunk metadata approach
if isMultipartEncrypted {
glog.V(3).Infof("Multipart encrypted object detected, using chunk-by-chunk approach")
return false
}
// Use streaming for cross-encryption scenarios (for single-part objects only)
if state.IsSourceEncrypted() && state.IsTargetEncrypted() {
srcType := s3a.getEncryptionTypeString(state.SrcSSEC, state.SrcSSEKMS, state.SrcSSES3)
dstType := s3a.getEncryptionTypeString(state.DstSSEC, state.DstSSEKMS, state.DstSSES3)
if srcType != dstType {
return true
}
}
// Use streaming for compressed files
if isCompressedEntry(entry) {
return true
}
// Use streaming for SSE-S3 scenarios (always)
if state.SrcSSES3 || state.DstSSES3 {
return true
}
return false
}
// executeStreamingReencryptCopy performs streaming re-encryption copy and returns encryption spec
// The encryption spec is needed for SSE-S3 to properly set destination metadata (fixes GitHub #7562)
func (s3a *S3ApiServer) executeStreamingReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, *EncryptionSpec, error) {
// Create streaming copy manager
streamingManager := NewStreamingCopyManager(s3a)
// Execute streaming copy
return streamingManager.ExecuteStreamingCopy(context.Background(), entry, r, dstPath, state)
}
Loading…
Cancel
Save