Browse Source

Merge branch 'master' into volume_check_disk_fix_2

pull/7605/head
Chris Lu 3 days ago
committed by GitHub
parent
commit
e4e458d6d2
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 72
      test/s3/sse/s3_sse_integration_test.go
  2. 117
      weed/s3api/s3api_object_handlers_copy.go
  3. 106
      weed/s3api/s3api_object_handlers_copy_unified.go
  4. 60
      weed/s3api/s3api_streaming_copy.go
  5. 28
      weed/shell/command_volume_check_disk.go
  6. 8
      weed/shell/common.go

72
test/s3/sse/s3_sse_integration_test.go

@ -2082,6 +2082,78 @@ func TestCopyToBucketDefaultEncryptedRegression(t *testing.T) {
require.NoError(t, err, "Failed to read object") require.NoError(t, err, "Failed to read object")
assertDataEqual(t, testData, data, "Data mismatch") assertDataEqual(t, testData, data, "Data mismatch")
}) })
t.Run("LargeFileCopyEncrypted_ToTemp_ToEncrypted", func(t *testing.T) {
// Test with large file (1MB) to exercise chunk-by-chunk copy path
// This verifies consistent behavior with SSE-C and SSE-KMS
largeTestData := generateTestData(1024 * 1024) // 1MB
objectKey := "large-file-test.bin"
// Step 1: Upload large object to source bucket (will be automatically encrypted)
_, err = client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(srcBucket),
Key: aws.String(objectKey),
Body: bytes.NewReader(largeTestData),
})
require.NoError(t, err, "Failed to upload large file to source bucket")
// Verify source object is encrypted
srcHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(srcBucket),
Key: aws.String(objectKey),
})
require.NoError(t, err, "Failed to HEAD source object")
assert.Equal(t, types.ServerSideEncryptionAes256, srcHead.ServerSideEncryption,
"Source object should be SSE-S3 encrypted")
// Step 2: Copy to temp bucket (unencrypted) - exercises chunk-by-chunk decrypt
_, err = client.CopyObject(ctx, &s3.CopyObjectInput{
Bucket: aws.String(tempBucket),
Key: aws.String(objectKey),
CopySource: aws.String(fmt.Sprintf("%s/%s", srcBucket, objectKey)),
})
require.NoError(t, err, "Failed to copy large file to temp bucket")
// Verify temp object is unencrypted and data is correct
tempGet, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(tempBucket),
Key: aws.String(objectKey),
})
require.NoError(t, err, "Failed to GET temp object")
tempData, err := io.ReadAll(tempGet.Body)
tempGet.Body.Close()
require.NoError(t, err, "Failed to read temp object")
assertDataEqual(t, largeTestData, tempData, "Temp object data mismatch after decrypt")
// Step 3: Copy from temp bucket to dest bucket (with default encryption)
// This exercises chunk-by-chunk encrypt copy
_, err = client.CopyObject(ctx, &s3.CopyObjectInput{
Bucket: aws.String(dstBucket),
Key: aws.String(objectKey),
CopySource: aws.String(fmt.Sprintf("%s/%s", tempBucket, objectKey)),
})
require.NoError(t, err, "Failed to copy large file to destination bucket")
// Verify destination object is encrypted
dstHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(dstBucket),
Key: aws.String(objectKey),
})
require.NoError(t, err, "Failed to HEAD destination object")
assert.Equal(t, types.ServerSideEncryptionAes256, dstHead.ServerSideEncryption,
"Destination object should be SSE-S3 encrypted via bucket default")
// Verify destination object content is correct after re-encryption
dstGet, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(dstBucket),
Key: aws.String(objectKey),
})
require.NoError(t, err, "Failed to GET destination object")
dstData, err := io.ReadAll(dstGet.Body)
dstGet.Body.Close()
require.NoError(t, err, "Failed to read destination object")
assertDataEqual(t, largeTestData, dstData, "Large file data mismatch after re-encryption")
})
} }
// REGRESSION TESTS FOR CRITICAL BUGS FIXED // REGRESSION TESTS FOR CRITICAL BUGS FIXED

117
weed/s3api/s3api_object_handlers_copy.go

@ -199,7 +199,9 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
} }
// Process metadata and tags and apply to destination // Process metadata and tags and apply to destination
processedMetadata, tagErr := processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging)
// Use dstEntry.Extended (already filtered) as the source, not entry.Extended,
// to preserve the encryption header filtering. Fixes GitHub #7562.
processedMetadata, tagErr := processMetadataBytes(r.Header, dstEntry.Extended, replaceMeta, replaceTagging)
if tagErr != nil { if tagErr != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return return
@ -1522,7 +1524,7 @@ func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySo
} }
// copyMultipartCrossEncryption handles all cross-encryption and decrypt-only copy scenarios // copyMultipartCrossEncryption handles all cross-encryption and decrypt-only copy scenarios
// This unified function supports: SSE-C↔SSE-KMS, SSE-C→Plain, SSE-KMS→Plain
// This unified function supports: SSE-C↔SSE-KMS↔SSE-S3, and any→Plain
func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
var dstChunks []*filer_pb.FileChunk var dstChunks []*filer_pb.FileChunk
@ -1531,6 +1533,7 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
var destKMSKeyID string var destKMSKeyID string
var destKMSEncryptionContext map[string]string var destKMSEncryptionContext map[string]string
var destKMSBucketKeyEnabled bool var destKMSBucketKeyEnabled bool
var destSSES3Key *SSES3Key
if state.DstSSEC { if state.DstSSEC {
var err error var err error
@ -1544,7 +1547,13 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to parse destination SSE-KMS headers: %w", err) return nil, nil, fmt.Errorf("failed to parse destination SSE-KMS headers: %w", err)
} }
} else {
} else if state.DstSSES3 {
// Generate SSE-S3 key for destination
var err error
destSSES3Key, err = GenerateSSES3Key()
if err != nil {
return nil, nil, fmt.Errorf("failed to generate SSE-S3 key: %w", err)
}
} }
// Parse source encryption parameters // Parse source encryption parameters
@ -1563,13 +1572,19 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
var err error var err error
if chunk.GetSseType() == filer_pb.SSEType_SSE_C { if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, sourceSSECKey, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state)
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, sourceSSECKey, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
} else if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS { } else if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS {
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state)
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
} else if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 {
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
} else {
// Unencrypted chunk - may need encryption if destination requires it
if state.DstSSEC || state.DstSSEKMS || state.DstSSES3 {
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
} else { } else {
// Unencrypted chunk, copy directly
copiedChunk, err = s3a.copySingleChunk(chunk, dstPath) copiedChunk, err = s3a.copySingleChunk(chunk, dstPath)
} }
}
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to copy chunk %s: %w", chunk.GetFileIdString(), err) return nil, nil, fmt.Errorf("failed to copy chunk %s: %w", chunk.GetFileIdString(), err)
@ -1619,6 +1634,40 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
} else { } else {
glog.Errorf("Failed to serialize SSE-KMS metadata: %v", serErr) glog.Errorf("Failed to serialize SSE-KMS metadata: %v", serErr)
} }
} else if state.DstSSES3 && destSSES3Key != nil {
// For SSE-S3 destination, create object-level metadata
var sses3Metadata *SSES3Key
if len(dstChunks) == 0 {
// Handle 0-byte files - generate IV for metadata even though there's no content to encrypt
if entry.Attributes.FileSize != 0 {
return nil, nil, fmt.Errorf("internal error: no chunks created for non-empty SSE-S3 destination object")
}
// Generate IV for 0-byte object metadata
iv := make([]byte, s3_constants.AESBlockSize)
if _, err := io.ReadFull(rand.Reader, iv); err != nil {
return nil, nil, fmt.Errorf("generate IV for 0-byte object: %w", err)
}
destSSES3Key.IV = iv
sses3Metadata = destSSES3Key
} else {
// For non-empty objects, use the first chunk's metadata
if dstChunks[0].GetSseType() != filer_pb.SSEType_SSE_S3 || len(dstChunks[0].GetSseMetadata()) == 0 {
return nil, nil, fmt.Errorf("internal error: first chunk is missing expected SSE-S3 metadata for destination object")
}
keyManager := GetSSES3KeyManager()
var err error
sses3Metadata, err = DeserializeSSES3Metadata(dstChunks[0].GetSseMetadata(), keyManager)
if err != nil {
return nil, nil, fmt.Errorf("failed to deserialize SSE-S3 metadata from first chunk: %w", err)
}
}
// Use the derived key with its IV for object-level metadata
keyData, serErr := SerializeSSES3Metadata(sses3Metadata)
if serErr != nil {
return nil, nil, fmt.Errorf("failed to serialize SSE-S3 metadata: %w", serErr)
}
dstMetadata[s3_constants.SeaweedFSSSES3Key] = keyData
dstMetadata[s3_constants.AmzServerSideEncryption] = []byte("AES256")
} }
// For unencrypted destination, no metadata needed (dstMetadata remains empty) // For unencrypted destination, no metadata needed (dstMetadata remains empty)
@ -1626,7 +1675,7 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
} }
// copyCrossEncryptionChunk handles copying a single chunk with cross-encryption support // copyCrossEncryptionChunk handles copying a single chunk with cross-encryption support
func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sourceSSECKey *SSECustomerKey, destSSECKey *SSECustomerKey, destKMSKeyID string, destKMSEncryptionContext map[string]string, destKMSBucketKeyEnabled bool, dstPath, dstBucket string, state *EncryptionState) (*filer_pb.FileChunk, error) {
func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sourceSSECKey *SSECustomerKey, destSSECKey *SSECustomerKey, destKMSKeyID string, destKMSEncryptionContext map[string]string, destKMSBucketKeyEnabled bool, destSSES3Key *SSES3Key, dstPath, dstBucket string, state *EncryptionState) (*filer_pb.FileChunk, error) {
// Create destination chunk // Create destination chunk
dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
@ -1726,6 +1775,30 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour
previewLen = len(finalData) previewLen = len(finalData)
} }
} else if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 {
// Decrypt SSE-S3 source
if len(chunk.GetSseMetadata()) == 0 {
return nil, fmt.Errorf("SSE-S3 chunk missing per-chunk metadata")
}
keyManager := GetSSES3KeyManager()
sourceSSEKey, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager)
if err != nil {
return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err)
}
decryptedReader, decErr := CreateSSES3DecryptedReader(bytes.NewReader(encryptedData), sourceSSEKey, sourceSSEKey.IV)
if decErr != nil {
return nil, fmt.Errorf("create SSE-S3 decrypted reader: %w", decErr)
}
decryptedData, readErr := io.ReadAll(decryptedReader)
if readErr != nil {
return nil, fmt.Errorf("decrypt SSE-S3 chunk data: %w", readErr)
}
finalData = decryptedData
glog.V(4).Infof("Decrypted SSE-S3 chunk, size: %d", len(finalData))
} else { } else {
// Source is unencrypted // Source is unencrypted
finalData = encryptedData finalData = encryptedData
@ -1787,6 +1860,36 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour
dstChunk.SseMetadata = kmsMetadata dstChunk.SseMetadata = kmsMetadata
glog.V(4).Infof("Re-encrypted chunk with SSE-KMS") glog.V(4).Infof("Re-encrypted chunk with SSE-KMS")
} else if state.DstSSES3 && destSSES3Key != nil {
// Encrypt with SSE-S3
encryptedReader, iv, encErr := CreateSSES3EncryptedReader(bytes.NewReader(finalData), destSSES3Key)
if encErr != nil {
return nil, fmt.Errorf("create SSE-S3 encrypted reader: %w", encErr)
}
reencryptedData, readErr := io.ReadAll(encryptedReader)
if readErr != nil {
return nil, fmt.Errorf("re-encrypt with SSE-S3: %w", readErr)
}
finalData = reencryptedData
// Create per-chunk SSE-S3 metadata with chunk-specific IV
chunkSSEKey := &SSES3Key{
Key: destSSES3Key.Key,
KeyID: destSSES3Key.KeyID,
Algorithm: destSSES3Key.Algorithm,
IV: iv,
}
sses3Metadata, err := SerializeSSES3Metadata(chunkSSEKey)
if err != nil {
return nil, fmt.Errorf("serialize SSE-S3 metadata: %w", err)
}
dstChunk.SseType = filer_pb.SSEType_SSE_S3
dstChunk.SseMetadata = sses3Metadata
glog.V(4).Infof("Re-encrypted chunk with SSE-S3")
} }
// For unencrypted destination, finalData remains as decrypted plaintext // For unencrypted destination, finalData remains as decrypted plaintext

106
weed/s3api/s3api_object_handlers_copy_unified.go

@ -1,7 +1,6 @@
package s3api package s3api
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
@ -133,9 +132,9 @@ func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Reques
} }
if state.DstSSES3 { if state.DstSSES3 {
// Use streaming copy for SSE-S3 encryption
chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
return chunks, nil, err
// Use chunk-by-chunk copy for SSE-S3 encryption (consistent with SSE-C and SSE-KMS)
glog.V(2).Infof("Plain→SSE-S3 copy: using unified multipart encrypt copy")
return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
} }
return nil, nil, fmt.Errorf("unknown target encryption type") return nil, nil, fmt.Errorf("unknown target encryption type")
@ -143,30 +142,18 @@ func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Reques
// executeDecryptCopy handles encrypted → plain copies // executeDecryptCopy handles encrypted → plain copies
func (s3a *S3ApiServer) executeDecryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { func (s3a *S3ApiServer) executeDecryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
// Use unified multipart-aware decrypt copy for all encryption types
if state.SrcSSEC || state.SrcSSEKMS {
// Use unified multipart-aware decrypt copy for all encryption types (consistent chunk-by-chunk)
if state.SrcSSEC || state.SrcSSEKMS || state.SrcSSES3 {
glog.V(2).Infof("Encrypted→Plain copy: using unified multipart decrypt copy") glog.V(2).Infof("Encrypted→Plain copy: using unified multipart decrypt copy")
return s3a.copyMultipartCrossEncryption(entry, r, state, "", dstPath) return s3a.copyMultipartCrossEncryption(entry, r, state, "", dstPath)
} }
if state.SrcSSES3 {
// Use streaming copy for SSE-S3 decryption
chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
return chunks, nil, err
}
return nil, nil, fmt.Errorf("unknown source encryption type") return nil, nil, fmt.Errorf("unknown source encryption type")
} }
// executeReencryptCopy handles encrypted → encrypted copies with different keys/methods // executeReencryptCopy handles encrypted → encrypted copies with different keys/methods
func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
// Check if we should use streaming copy for better performance
if s3a.shouldUseStreamingCopy(entry, state) {
chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
return chunks, nil, err
}
// Fallback to chunk-by-chunk approach for compatibility
// Use chunk-by-chunk approach for all cross-encryption scenarios (consistent behavior)
if state.SrcSSEC && state.DstSSEC { if state.SrcSSEC && state.DstSSEC {
return s3a.copyChunksWithSSEC(entry, r) return s3a.copyChunksWithSSEC(entry, r)
} }
@ -177,83 +164,8 @@ func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Requ
return chunks, dstMetadata, err return chunks, dstMetadata, err
} }
if state.SrcSSEC && state.DstSSEKMS {
// SSE-C → SSE-KMS: use unified multipart-aware cross-encryption copy
glog.V(2).Infof("SSE-C→SSE-KMS cross-encryption copy: using unified multipart copy")
// All other cross-encryption scenarios use unified multipart copy
// This includes: SSE-C↔SSE-KMS, SSE-C↔SSE-S3, SSE-KMS↔SSE-S3, SSE-S3↔SSE-S3
glog.V(2).Infof("Cross-encryption copy: using unified multipart copy")
return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath) return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
} }
if state.SrcSSEKMS && state.DstSSEC {
// SSE-KMS → SSE-C: use unified multipart-aware cross-encryption copy
glog.V(2).Infof("SSE-KMS→SSE-C cross-encryption copy: using unified multipart copy")
return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
}
// Handle SSE-S3 cross-encryption scenarios
if state.SrcSSES3 || state.DstSSES3 {
// Any scenario involving SSE-S3 uses streaming copy
chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
return chunks, nil, err
}
return nil, nil, fmt.Errorf("unsupported cross-encryption scenario")
}
// shouldUseStreamingCopy determines if streaming copy should be used
func (s3a *S3ApiServer) shouldUseStreamingCopy(entry *filer_pb.Entry, state *EncryptionState) bool {
// Use streaming copy for large files or when beneficial
fileSize := entry.Attributes.FileSize
// Use streaming for files larger than 10MB
if fileSize > 10*1024*1024 {
return true
}
// Check if this is a multipart encrypted object
isMultipartEncrypted := false
if state.IsSourceEncrypted() {
encryptedChunks := 0
for _, chunk := range entry.GetChunks() {
if chunk.GetSseType() != filer_pb.SSEType_NONE {
encryptedChunks++
}
}
isMultipartEncrypted = encryptedChunks > 1
}
// For multipart encrypted objects, avoid streaming copy to use per-chunk metadata approach
if isMultipartEncrypted {
glog.V(3).Infof("Multipart encrypted object detected, using chunk-by-chunk approach")
return false
}
// Use streaming for cross-encryption scenarios (for single-part objects only)
if state.IsSourceEncrypted() && state.IsTargetEncrypted() {
srcType := s3a.getEncryptionTypeString(state.SrcSSEC, state.SrcSSEKMS, state.SrcSSES3)
dstType := s3a.getEncryptionTypeString(state.DstSSEC, state.DstSSEKMS, state.DstSSES3)
if srcType != dstType {
return true
}
}
// Use streaming for compressed files
if isCompressedEntry(entry) {
return true
}
// Use streaming for SSE-S3 scenarios (always)
if state.SrcSSES3 || state.DstSSES3 {
return true
}
return false
}
// executeStreamingReencryptCopy performs streaming re-encryption copy
func (s3a *S3ApiServer) executeStreamingReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, error) {
// Create streaming copy manager
streamingManager := NewStreamingCopyManager(s3a)
// Execute streaming copy
return streamingManager.ExecuteStreamingCopy(context.Background(), entry, r, dstPath, state)
}

60
weed/s3api/s3api_streaming_copy.go

@ -59,18 +59,19 @@ func NewStreamingCopyManager(s3a *S3ApiServer) *StreamingCopyManager {
} }
} }
// ExecuteStreamingCopy performs a streaming copy operation
func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, error) {
// ExecuteStreamingCopy performs a streaming copy operation and returns the encryption spec
// The encryption spec is needed for SSE-S3 to properly set destination metadata (fixes GitHub #7562)
func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, *EncryptionSpec, error) {
// Create streaming copy specification // Create streaming copy specification
spec, err := scm.createStreamingSpec(entry, r, state) spec, err := scm.createStreamingSpec(entry, r, state)
if err != nil { if err != nil {
return nil, fmt.Errorf("create streaming spec: %w", err)
return nil, nil, fmt.Errorf("create streaming spec: %w", err)
} }
// Create source reader from entry // Create source reader from entry
sourceReader, err := scm.createSourceReader(entry) sourceReader, err := scm.createSourceReader(entry)
if err != nil { if err != nil {
return nil, fmt.Errorf("create source reader: %w", err)
return nil, nil, fmt.Errorf("create source reader: %w", err)
} }
defer sourceReader.Close() defer sourceReader.Close()
@ -79,11 +80,16 @@ func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry
// Create processing pipeline // Create processing pipeline
processedReader, err := scm.createProcessingPipeline(spec) processedReader, err := scm.createProcessingPipeline(spec)
if err != nil { if err != nil {
return nil, fmt.Errorf("create processing pipeline: %w", err)
return nil, nil, fmt.Errorf("create processing pipeline: %w", err)
} }
// Stream to destination // Stream to destination
return scm.streamToDestination(ctx, processedReader, spec, dstPath)
chunks, err := scm.streamToDestination(ctx, processedReader, spec, dstPath)
if err != nil {
return nil, nil, err
}
return chunks, spec.EncryptionSpec, nil
} }
// createStreamingSpec creates a streaming specification based on copy parameters // createStreamingSpec creates a streaming specification based on copy parameters
@ -453,8 +459,8 @@ func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.R
for { for {
n, err := reader.Read(buffer) n, err := reader.Read(buffer)
if n > 0 { if n > 0 {
// Create chunk for this data
chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath)
// Create chunk for this data, setting SSE type and per-chunk metadata (including chunk-specific IVs for SSE-S3)
chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath, spec.EncryptionSpec)
if chunkErr != nil { if chunkErr != nil {
return nil, fmt.Errorf("create chunk from data: %w", chunkErr) return nil, fmt.Errorf("create chunk from data: %w", chunkErr)
} }
@ -474,7 +480,7 @@ func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.R
} }
// createChunkFromData creates a chunk from streaming data // createChunkFromData creates a chunk from streaming data
func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string) (*filer_pb.FileChunk, error) {
func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string, encSpec *EncryptionSpec) (*filer_pb.FileChunk, error) {
// Assign new volume // Assign new volume
assignResult, err := scm.s3a.assignNewVolume(dstPath) assignResult, err := scm.s3a.assignNewVolume(dstPath)
if err != nil { if err != nil {
@ -487,6 +493,42 @@ func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64,
Size: uint64(len(data)), Size: uint64(len(data)),
} }
// Set SSE type and metadata on chunk if destination is encrypted
// This is critical for GetObject to know to decrypt the data - fixes GitHub #7562
if encSpec != nil && encSpec.NeedsEncryption {
switch encSpec.DestinationType {
case EncryptionTypeSSEC:
chunk.SseType = filer_pb.SSEType_SSE_C
// SSE-C metadata is handled at object level, not per-chunk for streaming copy
case EncryptionTypeSSEKMS:
chunk.SseType = filer_pb.SSEType_SSE_KMS
// SSE-KMS metadata is handled at object level, not per-chunk for streaming copy
case EncryptionTypeSSES3:
chunk.SseType = filer_pb.SSEType_SSE_S3
// Create per-chunk SSE-S3 metadata with chunk-specific IV
if sseKey, ok := encSpec.DestinationKey.(*SSES3Key); ok {
// Calculate chunk-specific IV using base IV and chunk offset
baseIV := encSpec.DestinationIV
if len(baseIV) == 0 {
return nil, fmt.Errorf("SSE-S3 encryption requires DestinationIV to be set for chunk at offset %d", offset)
}
chunkIV, _ := calculateIVWithOffset(baseIV, offset)
// Create chunk key with the chunk-specific IV
chunkSSEKey := &SSES3Key{
Key: sseKey.Key,
KeyID: sseKey.KeyID,
Algorithm: sseKey.Algorithm,
IV: chunkIV,
}
chunkMetadata, serErr := SerializeSSES3Metadata(chunkSSEKey)
if serErr != nil {
return nil, fmt.Errorf("failed to serialize chunk SSE-S3 metadata: %w", serErr)
}
chunk.SseMetadata = chunkMetadata
}
}
}
// Set file ID // Set file ID
if err := scm.s3a.setChunkFileId(chunk, assignResult); err != nil { if err := scm.s3a.setChunkFileId(chunk, assignResult); err != nil {
return nil, err return nil, err

28
weed/shell/command_volume_check_disk.go

@ -43,6 +43,8 @@ type volumeCheckDisk struct {
syncDeletions bool syncDeletions bool
fixReadOnly bool fixReadOnly bool
nonRepairThreshold float64 nonRepairThreshold float64
ewg *ErrorWaitGroup
} }
func (c *commandVolumeCheckDisk) Name() string { func (c *commandVolumeCheckDisk) Name() string {
@ -95,6 +97,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
applyChangesAlias := fsckCommand.Bool("force", false, "apply the fix (alias for -apply)") applyChangesAlias := fsckCommand.Bool("force", false, "apply the fix (alias for -apply)")
fixReadOnly := fsckCommand.Bool("fixReadOnly", false, "apply the fix even on readonly volumes (EXPERIMENTAL!)") fixReadOnly := fsckCommand.Bool("fixReadOnly", false, "apply the fix even on readonly volumes (EXPERIMENTAL!)")
syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix") syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix")
maxParallelization := fsckCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit") nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit")
if err = fsckCommand.Parse(args); err != nil { if err = fsckCommand.Parse(args); err != nil {
return nil return nil
@ -118,6 +121,8 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
syncDeletions: *syncDeletions, syncDeletions: *syncDeletions,
fixReadOnly: *fixReadOnly, fixReadOnly: *fixReadOnly,
nonRepairThreshold: *nonRepairThreshold, nonRepairThreshold: *nonRepairThreshold,
ewg: NewErrorWaitGroup(*maxParallelization),
} }
// collect topology information // collect topology information
@ -140,11 +145,9 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
if err := vcd.checkWritableVolumes(volumeReplicas); err != nil { if err := vcd.checkWritableVolumes(volumeReplicas); err != nil {
return err return err
} }
if err := vcd.checkReadOnlyVolumes(volumeReplicas); err != nil {
return err
}
vcd.checkReadOnlyVolumes(volumeReplicas)
return nil
return vcd.ewg.Wait()
} }
// checkWritableVolumes fixes volume replicas which are not read-only. // checkWritableVolumes fixes volume replicas which are not read-only.
@ -231,9 +234,9 @@ func (vcd *volumeCheckDisk) makeVolumeReadonly(vid uint32, vr *VolumeReplica) er
return nil return nil
} }
func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) error {
func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) {
if !vcd.fixReadOnly { if !vcd.fixReadOnly {
return nil
return
} }
vcd.write("Pass #2 (read-only volumes)") vcd.write("Pass #2 (read-only volumes)")
@ -264,13 +267,14 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo
skip, err := vcd.shouldSkipVolume(r, source) skip, err := vcd.shouldSkipVolume(r, source)
if err != nil { if err != nil {
vcd.write("error checking if volume %d should be skipped: %v", r.info.Id, err)
vcd.ewg.AddErrorf("failed to check if volume %d should be skipped: %v\n", r.info.Id, err)
continue continue
} }
if skip { if skip {
continue continue
} }
vcd.ewg.Add(func() error {
// make volume writable... // make volume writable...
if err := vcd.makeVolumeWritable(vid, r); err != nil { if err := vcd.makeVolumeWritable(vid, r); err != nil {
return err return err
@ -279,20 +283,22 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo
// ...fix it... // ...fix it...
// TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes. // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes.
if err := vcd.syncTwoReplicas(source, r, false); err != nil { if err := vcd.syncTwoReplicas(source, r, false); err != nil {
vcd.write("sync read-only volume %d on %s from %s: %v", vid, r.location.dataNode.Id, source.location.dataNode.Id, err)
vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err)
// ...or revert it back to read-only, if something went wrong. // ...or revert it back to read-only, if something went wrong.
// TODO: we should keep unchanged volumes as read-only, so we don't modify valid volumes which are full.
if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil { if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil {
return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr) return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr)
} }
vcd.write("volume %d on %s is now read-only", vid, r.location.dataNode.Id)
vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id)
return err return err
} }
}
}
return nil return nil
})
}
}
} }
func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption { func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption {

8
weed/shell/common.go

@ -2,6 +2,7 @@ package shell
import ( import (
"errors" "errors"
"fmt"
"sync" "sync"
) )
@ -64,6 +65,13 @@ func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) {
}() }()
} }
// AddErrorf adds an error to an ErrorWaitGroupTask result, without queueing any goroutines.
func (ewg *ErrorWaitGroup) AddErrorf(format string, a ...interface{}) {
ewg.errorsMu.Lock()
ewg.errors = append(ewg.errors, fmt.Errorf(format, a...))
ewg.errorsMu.Unlock()
}
// Wait sleeps until all ErrorWaitGroupTasks are completed, then returns errors for them. // Wait sleeps until all ErrorWaitGroupTasks are completed, then returns errors for them.
func (ewg *ErrorWaitGroup) Wait() error { func (ewg *ErrorWaitGroup) Wait() error {
ewg.wg.Wait() ewg.wg.Wait()

Loading…
Cancel
Save