You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

328 lines
8.4 KiB

package s3api
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
)
// TestSSECIsEncrypted tests detection of SSE-C encryption from metadata
func TestSSECIsEncrypted(t *testing.T) {
testCases := []struct {
name string
metadata map[string][]byte
expected bool
}{
{
name: "Empty metadata",
metadata: CreateTestMetadata(),
expected: false,
},
{
name: "Valid SSE-C metadata",
metadata: CreateTestMetadataWithSSEC(GenerateTestSSECKey(1)),
expected: true,
},
{
name: "SSE-C algorithm only",
metadata: map[string][]byte{
s3_constants.AmzServerSideEncryptionCustomerAlgorithm: []byte("AES256"),
},
expected: true,
},
{
name: "SSE-C key MD5 only",
metadata: map[string][]byte{
s3_constants.AmzServerSideEncryptionCustomerKeyMD5: []byte("somemd5"),
},
expected: true,
},
{
name: "Other encryption type (SSE-KMS)",
metadata: map[string][]byte{
s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
},
expected: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := IsSSECEncrypted(tc.metadata)
if result != tc.expected {
t.Errorf("Expected %v, got %v", tc.expected, result)
}
})
}
}
// TestSSEKMSIsEncrypted tests detection of SSE-KMS encryption from metadata
func TestSSEKMSIsEncrypted(t *testing.T) {
testCases := []struct {
name string
metadata map[string][]byte
expected bool
}{
{
name: "Empty metadata",
metadata: CreateTestMetadata(),
expected: false,
},
{
name: "Valid SSE-KMS metadata",
metadata: map[string][]byte{
s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
s3_constants.AmzEncryptedDataKey: []byte("encrypted-key"),
},
expected: true,
},
{
name: "SSE-KMS algorithm only",
metadata: map[string][]byte{
s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
},
expected: true,
},
{
name: "SSE-KMS encrypted data key only",
metadata: map[string][]byte{
s3_constants.AmzEncryptedDataKey: []byte("encrypted-key"),
},
expected: false, // Only encrypted data key without algorithm header should not be considered SSE-KMS
},
{
name: "Other encryption type (SSE-C)",
metadata: map[string][]byte{
s3_constants.AmzServerSideEncryptionCustomerAlgorithm: []byte("AES256"),
},
expected: false,
},
{
name: "SSE-S3 (AES256)",
metadata: map[string][]byte{
s3_constants.AmzServerSideEncryption: []byte("AES256"),
},
expected: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := IsSSEKMSEncrypted(tc.metadata)
if result != tc.expected {
t.Errorf("Expected %v, got %v", tc.expected, result)
}
})
}
}
// TestSSETypeDiscrimination tests that SSE types don't interfere with each other
func TestSSETypeDiscrimination(t *testing.T) {
// Test SSE-C headers don't trigger SSE-KMS detection
t.Run("SSE-C headers don't trigger SSE-KMS", func(t *testing.T) {
req := CreateTestHTTPRequest("PUT", "/bucket/object", nil)
keyPair := GenerateTestSSECKey(1)
SetupTestSSECHeaders(req, keyPair)
// Should detect SSE-C, not SSE-KMS
if !IsSSECRequest(req) {
t.Error("Should detect SSE-C request")
}
if IsSSEKMSRequest(req) {
t.Error("Should not detect SSE-KMS request for SSE-C headers")
}
})
// Test SSE-KMS headers don't trigger SSE-C detection
t.Run("SSE-KMS headers don't trigger SSE-C", func(t *testing.T) {
req := CreateTestHTTPRequest("PUT", "/bucket/object", nil)
SetupTestSSEKMSHeaders(req, "test-key-id")
// Should detect SSE-KMS, not SSE-C
if IsSSECRequest(req) {
t.Error("Should not detect SSE-C request for SSE-KMS headers")
}
if !IsSSEKMSRequest(req) {
t.Error("Should detect SSE-KMS request")
}
})
// Test metadata discrimination
t.Run("Metadata type discrimination", func(t *testing.T) {
ssecMetadata := CreateTestMetadataWithSSEC(GenerateTestSSECKey(1))
// Should detect as SSE-C, not SSE-KMS
if !IsSSECEncrypted(ssecMetadata) {
t.Error("Should detect SSE-C encrypted metadata")
}
if IsSSEKMSEncrypted(ssecMetadata) {
t.Error("Should not detect SSE-KMS for SSE-C metadata")
}
})
}
// TestSSECParseCorruptedMetadata tests handling of corrupted SSE-C metadata
func TestSSECParseCorruptedMetadata(t *testing.T) {
testCases := []struct {
name string
metadata map[string][]byte
expectError bool
errorMessage string
}{
{
name: "Missing algorithm",
metadata: map[string][]byte{
s3_constants.AmzServerSideEncryptionCustomerKeyMD5: []byte("valid-md5"),
},
expectError: false, // Detection should still work with partial metadata
},
{
name: "Invalid key MD5 format",
metadata: map[string][]byte{
s3_constants.AmzServerSideEncryptionCustomerAlgorithm: []byte("AES256"),
s3_constants.AmzServerSideEncryptionCustomerKeyMD5: []byte("invalid-base64!"),
},
expectError: false, // Detection should work, validation happens later
},
{
name: "Empty values",
metadata: map[string][]byte{
s3_constants.AmzServerSideEncryptionCustomerAlgorithm: []byte(""),
s3_constants.AmzServerSideEncryptionCustomerKeyMD5: []byte(""),
},
expectError: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Test that detection doesn't panic on corrupted metadata
result := IsSSECEncrypted(tc.metadata)
// The detection should be robust and not crash
t.Logf("Detection result for %s: %v", tc.name, result)
})
}
}
// TestSSEKMSParseCorruptedMetadata tests handling of corrupted SSE-KMS metadata
func TestSSEKMSParseCorruptedMetadata(t *testing.T) {
testCases := []struct {
name string
metadata map[string][]byte
}{
{
name: "Invalid encrypted data key",
metadata: map[string][]byte{
s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
s3_constants.AmzEncryptedDataKey: []byte("invalid-base64!"),
},
},
{
name: "Invalid encryption context",
metadata: map[string][]byte{
s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
s3_constants.AmzEncryptionContextMeta: []byte("invalid-json"),
},
},
{
name: "Empty values",
metadata: map[string][]byte{
s3_constants.AmzServerSideEncryption: []byte(""),
s3_constants.AmzEncryptedDataKey: []byte(""),
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Test that detection doesn't panic on corrupted metadata
result := IsSSEKMSEncrypted(tc.metadata)
t.Logf("Detection result for %s: %v", tc.name, result)
})
}
}
// TestSSEMetadataDeserialization tests SSE-KMS metadata deserialization with various inputs
func TestSSEMetadataDeserialization(t *testing.T) {
testCases := []struct {
name string
data []byte
expectError bool
}{
{
name: "Empty data",
data: []byte{},
expectError: true,
},
{
name: "Invalid JSON",
data: []byte("invalid-json"),
expectError: true,
},
{
name: "Valid JSON but wrong structure",
data: []byte(`{"wrong": "structure"}`),
expectError: false, // Our deserialization might be lenient
},
{
name: "Null data",
data: nil,
expectError: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, err := DeserializeSSEKMSMetadata(tc.data)
if tc.expectError && err == nil {
t.Error("Expected error but got none")
}
if !tc.expectError && err != nil {
t.Errorf("Expected no error but got: %v", err)
}
})
}
}
// TestGeneralSSEDetection tests the general SSE detection that works across types
func TestGeneralSSEDetection(t *testing.T) {
testCases := []struct {
name string
metadata map[string][]byte
expected bool
}{
{
name: "No encryption",
metadata: CreateTestMetadata(),
expected: false,
},
{
name: "SSE-C encrypted",
metadata: CreateTestMetadataWithSSEC(GenerateTestSSECKey(1)),
expected: true,
},
{
name: "SSE-KMS encrypted",
metadata: map[string][]byte{
s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
},
expected: true,
},
{
name: "SSE-S3 encrypted",
metadata: map[string][]byte{
s3_constants.AmzServerSideEncryption: []byte("AES256"),
},
expected: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := IsAnySSEEncrypted(tc.metadata)
if result != tc.expected {
t.Errorf("Expected %v, got %v", tc.expected, result)
}
})
}
}