You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
328 lines
8.4 KiB
328 lines
8.4 KiB
package s3api
|
|
|
|
import (
|
|
"testing"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
)
|
|
|
|
// TestSSECIsEncrypted tests detection of SSE-C encryption from metadata
|
|
func TestSSECIsEncrypted(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
metadata map[string][]byte
|
|
expected bool
|
|
}{
|
|
{
|
|
name: "Empty metadata",
|
|
metadata: CreateTestMetadata(),
|
|
expected: false,
|
|
},
|
|
{
|
|
name: "Valid SSE-C metadata",
|
|
metadata: CreateTestMetadataWithSSEC(GenerateTestSSECKey(1)),
|
|
expected: true,
|
|
},
|
|
{
|
|
name: "SSE-C algorithm only",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzServerSideEncryptionCustomerAlgorithm: []byte("AES256"),
|
|
},
|
|
expected: true,
|
|
},
|
|
{
|
|
name: "SSE-C key MD5 only",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzServerSideEncryptionCustomerKeyMD5: []byte("somemd5"),
|
|
},
|
|
expected: true,
|
|
},
|
|
{
|
|
name: "Other encryption type (SSE-KMS)",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
|
|
},
|
|
expected: false,
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
result := IsSSECEncrypted(tc.metadata)
|
|
if result != tc.expected {
|
|
t.Errorf("Expected %v, got %v", tc.expected, result)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestSSEKMSIsEncrypted tests detection of SSE-KMS encryption from metadata
|
|
func TestSSEKMSIsEncrypted(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
metadata map[string][]byte
|
|
expected bool
|
|
}{
|
|
{
|
|
name: "Empty metadata",
|
|
metadata: CreateTestMetadata(),
|
|
expected: false,
|
|
},
|
|
{
|
|
name: "Valid SSE-KMS metadata",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
|
|
s3_constants.AmzEncryptedDataKey: []byte("encrypted-key"),
|
|
},
|
|
expected: true,
|
|
},
|
|
{
|
|
name: "SSE-KMS algorithm only",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
|
|
},
|
|
expected: true,
|
|
},
|
|
{
|
|
name: "SSE-KMS encrypted data key only",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzEncryptedDataKey: []byte("encrypted-key"),
|
|
},
|
|
expected: false, // Only encrypted data key without algorithm header should not be considered SSE-KMS
|
|
},
|
|
{
|
|
name: "Other encryption type (SSE-C)",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzServerSideEncryptionCustomerAlgorithm: []byte("AES256"),
|
|
},
|
|
expected: false,
|
|
},
|
|
{
|
|
name: "SSE-S3 (AES256)",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzServerSideEncryption: []byte("AES256"),
|
|
},
|
|
expected: false,
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
result := IsSSEKMSEncrypted(tc.metadata)
|
|
if result != tc.expected {
|
|
t.Errorf("Expected %v, got %v", tc.expected, result)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestSSETypeDiscrimination tests that SSE types don't interfere with each other
|
|
func TestSSETypeDiscrimination(t *testing.T) {
|
|
// Test SSE-C headers don't trigger SSE-KMS detection
|
|
t.Run("SSE-C headers don't trigger SSE-KMS", func(t *testing.T) {
|
|
req := CreateTestHTTPRequest("PUT", "/bucket/object", nil)
|
|
keyPair := GenerateTestSSECKey(1)
|
|
SetupTestSSECHeaders(req, keyPair)
|
|
|
|
// Should detect SSE-C, not SSE-KMS
|
|
if !IsSSECRequest(req) {
|
|
t.Error("Should detect SSE-C request")
|
|
}
|
|
if IsSSEKMSRequest(req) {
|
|
t.Error("Should not detect SSE-KMS request for SSE-C headers")
|
|
}
|
|
})
|
|
|
|
// Test SSE-KMS headers don't trigger SSE-C detection
|
|
t.Run("SSE-KMS headers don't trigger SSE-C", func(t *testing.T) {
|
|
req := CreateTestHTTPRequest("PUT", "/bucket/object", nil)
|
|
SetupTestSSEKMSHeaders(req, "test-key-id")
|
|
|
|
// Should detect SSE-KMS, not SSE-C
|
|
if IsSSECRequest(req) {
|
|
t.Error("Should not detect SSE-C request for SSE-KMS headers")
|
|
}
|
|
if !IsSSEKMSRequest(req) {
|
|
t.Error("Should detect SSE-KMS request")
|
|
}
|
|
})
|
|
|
|
// Test metadata discrimination
|
|
t.Run("Metadata type discrimination", func(t *testing.T) {
|
|
ssecMetadata := CreateTestMetadataWithSSEC(GenerateTestSSECKey(1))
|
|
|
|
// Should detect as SSE-C, not SSE-KMS
|
|
if !IsSSECEncrypted(ssecMetadata) {
|
|
t.Error("Should detect SSE-C encrypted metadata")
|
|
}
|
|
if IsSSEKMSEncrypted(ssecMetadata) {
|
|
t.Error("Should not detect SSE-KMS for SSE-C metadata")
|
|
}
|
|
})
|
|
}
|
|
|
|
// TestSSECParseCorruptedMetadata tests handling of corrupted SSE-C metadata
|
|
func TestSSECParseCorruptedMetadata(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
metadata map[string][]byte
|
|
expectError bool
|
|
errorMessage string
|
|
}{
|
|
{
|
|
name: "Missing algorithm",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzServerSideEncryptionCustomerKeyMD5: []byte("valid-md5"),
|
|
},
|
|
expectError: false, // Detection should still work with partial metadata
|
|
},
|
|
{
|
|
name: "Invalid key MD5 format",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzServerSideEncryptionCustomerAlgorithm: []byte("AES256"),
|
|
s3_constants.AmzServerSideEncryptionCustomerKeyMD5: []byte("invalid-base64!"),
|
|
},
|
|
expectError: false, // Detection should work, validation happens later
|
|
},
|
|
{
|
|
name: "Empty values",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzServerSideEncryptionCustomerAlgorithm: []byte(""),
|
|
s3_constants.AmzServerSideEncryptionCustomerKeyMD5: []byte(""),
|
|
},
|
|
expectError: false,
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
// Test that detection doesn't panic on corrupted metadata
|
|
result := IsSSECEncrypted(tc.metadata)
|
|
// The detection should be robust and not crash
|
|
t.Logf("Detection result for %s: %v", tc.name, result)
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestSSEKMSParseCorruptedMetadata tests handling of corrupted SSE-KMS metadata
|
|
func TestSSEKMSParseCorruptedMetadata(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
metadata map[string][]byte
|
|
}{
|
|
{
|
|
name: "Invalid encrypted data key",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
|
|
s3_constants.AmzEncryptedDataKey: []byte("invalid-base64!"),
|
|
},
|
|
},
|
|
{
|
|
name: "Invalid encryption context",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
|
|
s3_constants.AmzEncryptionContextMeta: []byte("invalid-json"),
|
|
},
|
|
},
|
|
{
|
|
name: "Empty values",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzServerSideEncryption: []byte(""),
|
|
s3_constants.AmzEncryptedDataKey: []byte(""),
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
// Test that detection doesn't panic on corrupted metadata
|
|
result := IsSSEKMSEncrypted(tc.metadata)
|
|
t.Logf("Detection result for %s: %v", tc.name, result)
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestSSEMetadataDeserialization tests SSE-KMS metadata deserialization with various inputs
|
|
func TestSSEMetadataDeserialization(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
data []byte
|
|
expectError bool
|
|
}{
|
|
{
|
|
name: "Empty data",
|
|
data: []byte{},
|
|
expectError: true,
|
|
},
|
|
{
|
|
name: "Invalid JSON",
|
|
data: []byte("invalid-json"),
|
|
expectError: true,
|
|
},
|
|
{
|
|
name: "Valid JSON but wrong structure",
|
|
data: []byte(`{"wrong": "structure"}`),
|
|
expectError: false, // Our deserialization might be lenient
|
|
},
|
|
{
|
|
name: "Null data",
|
|
data: nil,
|
|
expectError: true,
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
_, err := DeserializeSSEKMSMetadata(tc.data)
|
|
if tc.expectError && err == nil {
|
|
t.Error("Expected error but got none")
|
|
}
|
|
if !tc.expectError && err != nil {
|
|
t.Errorf("Expected no error but got: %v", err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestGeneralSSEDetection tests the general SSE detection that works across types
|
|
func TestGeneralSSEDetection(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
metadata map[string][]byte
|
|
expected bool
|
|
}{
|
|
{
|
|
name: "No encryption",
|
|
metadata: CreateTestMetadata(),
|
|
expected: false,
|
|
},
|
|
{
|
|
name: "SSE-C encrypted",
|
|
metadata: CreateTestMetadataWithSSEC(GenerateTestSSECKey(1)),
|
|
expected: true,
|
|
},
|
|
{
|
|
name: "SSE-KMS encrypted",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
|
|
},
|
|
expected: true,
|
|
},
|
|
{
|
|
name: "SSE-S3 encrypted",
|
|
metadata: map[string][]byte{
|
|
s3_constants.AmzServerSideEncryption: []byte("AES256"),
|
|
},
|
|
expected: true,
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
result := IsAnySSEEncrypted(tc.metadata)
|
|
if result != tc.expected {
|
|
t.Errorf("Expected %v, got %v", tc.expected, result)
|
|
}
|
|
})
|
|
}
|
|
}
|