From f7bd75ef3bf993a8f4aa5b647617556ab7567596 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 22 Oct 2025 14:12:31 -0700 Subject: [PATCH] S3: Avoid in-memory map concurrent writes in SSE-S3 key manager (#7358) * Fix concurrent map writes in SSE-S3 key manager This commit fixes issue #7352 where parallel uploads to SSE-S3 enabled buckets were causing 'fatal error: concurrent map writes' crashes. The SSES3KeyManager struct had an unsynchronized map that was being accessed from multiple goroutines during concurrent PUT operations. Changes: - Added sync.RWMutex to SSES3KeyManager struct - Protected StoreKey() with write lock - Protected GetKey() with read lock - Updated GetOrCreateKey() with proper read/write locking pattern including double-check to prevent race conditions All existing SSE tests pass successfully. Fixes #7352 * Improve SSE-S3 key manager with envelope encryption Replace in-memory key storage with envelope encryption using a super key (KEK). Instead of storing DEKs in a map, the key manager now: - Uses a randomly generated 256-bit super key (KEK) - Encrypts each DEK with the super key using AES-GCM - Stores the encrypted DEK in object metadata - Decrypts the DEK on-demand when reading objects Benefits: - Eliminates unbounded memory growth from caching DEKs - Provides better security with authenticated encryption (AES-GCM) - Follows envelope encryption best practices (similar to AWS KMS) - No need for mutex-protected map lookups on reads - Each object's encrypted DEK is self-contained in its metadata This approach matches the design pattern used in the local KMS provider and is more suitable for production use. * Persist SSE-S3 KEK in filer for multi-server support Store the SSE-S3 super key (KEK) in the filer at /.seaweedfs/s3/kek instead of generating it per-server. This ensures: 1. **Multi-server consistency**: All S3 API servers use the same KEK 2. **Persistence across restarts**: KEK survives server restarts 3. **Centralized management**: KEK stored in filer, accessible to all servers 4. **Automatic initialization**: KEK is created on first startup if it doesn't exist The KEK is: - Stored as hex-encoded bytes in filer - Protected with file mode 0600 (read/write for owner only) - Located in /.seaweedfs/s3/ directory (mode 0700) - Loaded on S3 API server startup - Reused across all S3 API server instances This matches the architecture of centralized configuration in SeaweedFS and enables proper SSE-S3 support in multi-server deployments. * Change KEK storage location to /etc/s3/kek Move SSE-S3 KEK from /.seaweedfs/s3/kek to /etc/s3/kek for better organization and consistency with other SeaweedFS configuration files. The /etc directory is the standard location for configuration files in SeaweedFS. * use global sse-se key manager when copying * Update volume_growth_reservation_test.go * Rename KEK file to sse_kek for clarity Changed /etc/s3/kek to /etc/s3/sse_kek to make it clear this key is specifically for SSE-S3 encryption, not for other KMS purposes. This improves clarity and avoids potential confusion with the separate KMS provider system used for SSE-KMS. * Use constants for SSE-S3 KEK directory and file name Refactored to use named constants instead of string literals: - SSES3KEKDirectory = "/etc/s3" - SSES3KEKParentDir = "/etc" - SSES3KEKDirName = "s3" - SSES3KEKFileName = "sse_kek" This improves maintainability and makes it easier to change the storage location if needed in the future. * Address PR review: Improve error handling and robustness Addresses review comments from https://github.com/seaweedfs/seaweedfs/pull/7358#pullrequestreview-3367476264 Critical fixes: 1. Distinguish between 'not found' and other errors when loading KEK - Only generate new KEK if ErrNotFound - Fail fast on connectivity/permission errors to prevent data loss - Prevents creating new KEK that would make existing data undecryptable 2. Make SSE-S3 initialization failure fatal - Return error instead of warning when initialization fails - Prevents server from running in broken state 3. Improve directory creation error handling - Only ignore 'file exists' errors - Fail on permission/connectivity errors These changes ensure the SSE-S3 key manager is robust against transient errors and prevents accidental data loss. * Fix KEK path conflict with /etc/s3 file Changed KEK storage from /etc/s3/sse_kek to /etc/seaweedfs/s3_sse_kek to avoid conflict with the circuit breaker config at /etc/s3. The /etc/s3 path is used by CircuitBreakerConfigDir and may exist as a file (circuit_breaker.json), causing the error: 'CreateEntry /etc/s3/sse_kek: /etc/s3 should be a directory' New KEK location: /etc/seaweedfs/s3_sse_kek This uses the seaweedfs subdirectory which is more appropriate for internal SeaweedFS configuration files. Fixes startup failure when /etc/s3 exists as a file. * Revert KEK path back to /etc/s3/sse_kek Changed back from /etc/seaweedfs/s3_sse_kek to /etc/s3/sse_kek as requested. The /etc/s3 directory will be created properly when it doesn't exist. * Fix directory creation with proper ModeDir flag Set FileMode to uint32(0755 | os.ModeDir) when creating /etc/s3 directory to ensure it's created as a directory, not a file. Without the os.ModeDir flag, the entry was being created as a file, which caused the error 'CreateEntry: /etc/s3 is a file' when trying to create the KEK file inside it. Uses 0755 permissions (rwxr-xr-x) for the directory and adds os import for os.ModeDir constant. --- weed/s3api/s3_sse_s3.go | 270 +++++++++++++++--- weed/s3api/s3api_server.go | 5 + weed/s3api/s3api_streaming_copy.go | 6 +- .../volume_growth_reservation_test.go | 6 + 4 files changed, 247 insertions(+), 40 deletions(-) diff --git a/weed/s3api/s3_sse_s3.go b/weed/s3api/s3_sse_s3.go index 6471e04fd..bb563eee5 100644 --- a/weed/s3api/s3_sse_s3.go +++ b/weed/s3api/s3_sse_s3.go @@ -1,18 +1,26 @@ package s3api import ( + "context" "crypto/aes" "crypto/cipher" "crypto/rand" "encoding/base64" + "encoding/hex" "encoding/json" + "errors" "fmt" "io" mathrand "math/rand" "net/http" + "os" + "strings" + "sync" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/util" ) // SSE-S3 uses AES-256 encryption with server-managed keys @@ -112,19 +120,24 @@ func GetSSES3Headers() map[string]string { } } -// SerializeSSES3Metadata serializes SSE-S3 metadata for storage +// SerializeSSES3Metadata serializes SSE-S3 metadata for storage using envelope encryption func SerializeSSES3Metadata(key *SSES3Key) ([]byte, error) { if err := ValidateSSES3Key(key); err != nil { return nil, err } - // For SSE-S3, we typically don't store the actual key in metadata - // Instead, we store a key ID or reference that can be used to retrieve the key - // from a secure key management system + // Encrypt the DEK using the global key manager's super key + keyManager := GetSSES3KeyManager() + encryptedDEK, nonce, err := keyManager.encryptKeyWithSuperKey(key.Key) + if err != nil { + return nil, fmt.Errorf("failed to encrypt DEK: %w", err) + } metadata := map[string]string{ - "algorithm": key.Algorithm, - "keyId": key.KeyID, + "algorithm": key.Algorithm, + "keyId": key.KeyID, + "encryptedDEK": base64.StdEncoding.EncodeToString(encryptedDEK), + "nonce": base64.StdEncoding.EncodeToString(nonce), } // Include IV if present (needed for chunk-level decryption) @@ -141,13 +154,13 @@ func SerializeSSES3Metadata(key *SSES3Key) ([]byte, error) { return data, nil } -// DeserializeSSES3Metadata deserializes SSE-S3 metadata from storage and retrieves the actual key +// DeserializeSSES3Metadata deserializes SSE-S3 metadata from storage and decrypts the DEK func DeserializeSSES3Metadata(data []byte, keyManager *SSES3KeyManager) (*SSES3Key, error) { if len(data) == 0 { return nil, fmt.Errorf("empty SSE-S3 metadata") } - // Parse the JSON metadata to extract keyId + // Parse the JSON metadata var metadata map[string]string if err := json.Unmarshal(data, &metadata); err != nil { return nil, fmt.Errorf("failed to parse SSE-S3 metadata: %w", err) @@ -163,19 +176,40 @@ func DeserializeSSES3Metadata(data []byte, keyManager *SSES3KeyManager) (*SSES3K algorithm = s3_constants.SSEAlgorithmAES256 // Default algorithm } - // Retrieve the actual key using the keyId + // Decode the encrypted DEK and nonce + encryptedDEKStr, exists := metadata["encryptedDEK"] + if !exists { + return nil, fmt.Errorf("encryptedDEK not found in SSE-S3 metadata") + } + encryptedDEK, err := base64.StdEncoding.DecodeString(encryptedDEKStr) + if err != nil { + return nil, fmt.Errorf("failed to decode encrypted DEK: %w", err) + } + + nonceStr, exists := metadata["nonce"] + if !exists { + return nil, fmt.Errorf("nonce not found in SSE-S3 metadata") + } + nonce, err := base64.StdEncoding.DecodeString(nonceStr) + if err != nil { + return nil, fmt.Errorf("failed to decode nonce: %w", err) + } + + // Decrypt the DEK using the key manager if keyManager == nil { return nil, fmt.Errorf("key manager is required for SSE-S3 key retrieval") } - key, err := keyManager.GetOrCreateKey(keyID) + dekBytes, err := keyManager.decryptKeyWithSuperKey(encryptedDEK, nonce) if err != nil { - return nil, fmt.Errorf("failed to retrieve SSE-S3 key with ID %s: %w", keyID, err) + return nil, fmt.Errorf("failed to decrypt DEK: %w", err) } - // Verify the algorithm matches - if key.Algorithm != algorithm { - return nil, fmt.Errorf("algorithm mismatch: expected %s, got %s", algorithm, key.Algorithm) + // Reconstruct the key + key := &SSES3Key{ + Key: dekBytes, + KeyID: keyID, + Algorithm: algorithm, } // Restore IV if present in metadata (for chunk-level decryption) @@ -190,52 +224,211 @@ func DeserializeSSES3Metadata(data []byte, keyManager *SSES3KeyManager) (*SSES3K return key, nil } -// SSES3KeyManager manages SSE-S3 encryption keys +// SSES3KeyManager manages SSE-S3 encryption keys using envelope encryption +// Instead of storing keys in memory, it uses a super key (KEK) to encrypt/decrypt DEKs type SSES3KeyManager struct { - // In a production system, this would interface with a secure key management system - keys map[string]*SSES3Key + mu sync.RWMutex + superKey []byte // 256-bit master key (KEK - Key Encryption Key) + filerClient filer_pb.FilerClient // Filer client for KEK persistence + kekPath string // Path in filer where KEK is stored (e.g., /etc/s3/sse_kek) } -// NewSSES3KeyManager creates a new SSE-S3 key manager +const ( + // KEK storage directory and file name in filer + SSES3KEKDirectory = "/etc/s3" + SSES3KEKParentDir = "/etc" + SSES3KEKDirName = "s3" + SSES3KEKFileName = "sse_kek" + + // Full KEK path in filer + defaultKEKPath = SSES3KEKDirectory + "/" + SSES3KEKFileName +) + +// NewSSES3KeyManager creates a new SSE-S3 key manager with envelope encryption func NewSSES3KeyManager() *SSES3KeyManager { + // This will be initialized properly when attached to an S3ApiServer return &SSES3KeyManager{ - keys: make(map[string]*SSES3Key), + kekPath: defaultKEKPath, + } +} + +// InitializeWithFiler initializes the key manager with a filer client +func (km *SSES3KeyManager) InitializeWithFiler(filerClient filer_pb.FilerClient) error { + km.mu.Lock() + defer km.mu.Unlock() + + km.filerClient = filerClient + + // Try to load existing KEK from filer + if err := km.loadSuperKeyFromFiler(); err != nil { + // Only generate a new key if it does not exist. + // For other errors (e.g. connectivity), we should fail fast to prevent creating a new key + // and making existing data undecryptable. + if errors.Is(err, filer_pb.ErrNotFound) { + glog.V(1).Infof("SSE-S3 KeyManager: KEK not found, generating new KEK (load from filer %s: %v)", km.kekPath, err) + if genErr := km.generateAndSaveSuperKeyToFiler(); genErr != nil { + return fmt.Errorf("failed to generate and save SSE-S3 super key: %w", genErr) + } + } else { + // A different error occurred (e.g., network issue, permission denied). + // Return the error to prevent starting with a broken state. + return fmt.Errorf("failed to load SSE-S3 super key from %s: %w", km.kekPath, err) + } + } else { + glog.V(1).Infof("SSE-S3 KeyManager: Loaded KEK from filer %s", km.kekPath) + } + + return nil +} + +// loadSuperKeyFromFiler loads the KEK from the filer +func (km *SSES3KeyManager) loadSuperKeyFromFiler() error { + if km.filerClient == nil { + return fmt.Errorf("filer client not initialized") + } + + // Get the entry from filer + entry, err := filer_pb.GetEntry(context.Background(), km.filerClient, util.FullPath(km.kekPath)) + if err != nil { + return fmt.Errorf("failed to get KEK entry from filer: %w", err) + } + + // Read the content + if len(entry.Content) == 0 { + return fmt.Errorf("KEK entry is empty") + } + + // Decode hex-encoded key + key, err := hex.DecodeString(string(entry.Content)) + if err != nil { + return fmt.Errorf("failed to decode KEK: %w", err) + } + + if len(key) != SSES3KeySize { + return fmt.Errorf("invalid KEK size: expected %d bytes, got %d", SSES3KeySize, len(key)) } + + km.superKey = key + return nil +} + +// generateAndSaveSuperKeyToFiler generates a new KEK and saves it to the filer +func (km *SSES3KeyManager) generateAndSaveSuperKeyToFiler() error { + if km.filerClient == nil { + return fmt.Errorf("filer client not initialized") + } + + // Generate a random 256-bit super key (KEK) + superKey := make([]byte, SSES3KeySize) + if _, err := io.ReadFull(rand.Reader, superKey); err != nil { + return fmt.Errorf("failed to generate KEK: %w", err) + } + + // Encode as hex for storage + encodedKey := []byte(hex.EncodeToString(superKey)) + + // Create the entry in filer + // First ensure the parent directory exists + if err := filer_pb.Mkdir(context.Background(), km.filerClient, SSES3KEKParentDir, SSES3KEKDirName, func(entry *filer_pb.Entry) { + // Set appropriate permissions for the directory + entry.Attributes.FileMode = uint32(0700 | os.ModeDir) + }); err != nil { + // Only ignore "file exists" errors. + if !strings.Contains(err.Error(), "file exists") { + return fmt.Errorf("failed to create KEK directory %s: %w", SSES3KEKDirectory, err) + } + glog.V(3).Infof("Parent directory %s already exists, continuing.", SSES3KEKDirectory) + } + + // Create the KEK file + if err := filer_pb.MkFile(context.Background(), km.filerClient, SSES3KEKDirectory, SSES3KEKFileName, nil, func(entry *filer_pb.Entry) { + entry.Content = encodedKey + entry.Attributes.FileMode = 0600 // Read/write for owner only + entry.Attributes.FileSize = uint64(len(encodedKey)) + }); err != nil { + return fmt.Errorf("failed to create KEK file in filer: %w", err) + } + + km.superKey = superKey + glog.Infof("SSE-S3 KeyManager: Generated and saved new KEK to filer %s", km.kekPath) + return nil } // GetOrCreateKey gets an existing key or creates a new one +// With envelope encryption, we always generate a new DEK since we don't store them func (km *SSES3KeyManager) GetOrCreateKey(keyID string) (*SSES3Key, error) { - if keyID == "" { - // Generate new key - return GenerateSSES3Key() + // Always generate a new key - we use envelope encryption so no need to cache DEKs + return GenerateSSES3Key() +} + +// encryptKeyWithSuperKey encrypts a DEK using the super key (KEK) with AES-GCM +func (km *SSES3KeyManager) encryptKeyWithSuperKey(dek []byte) ([]byte, []byte, error) { + km.mu.RLock() + defer km.mu.RUnlock() + + block, err := aes.NewCipher(km.superKey) + if err != nil { + return nil, nil, fmt.Errorf("failed to create cipher: %w", err) } - // Check if key exists - if key, exists := km.keys[keyID]; exists { - return key, nil + gcm, err := cipher.NewGCM(block) + if err != nil { + return nil, nil, fmt.Errorf("failed to create GCM: %w", err) } - // Create new key - key, err := GenerateSSES3Key() + // Generate random nonce + nonce := make([]byte, gcm.NonceSize()) + if _, err := io.ReadFull(rand.Reader, nonce); err != nil { + return nil, nil, fmt.Errorf("failed to generate nonce: %w", err) + } + + // Encrypt the DEK + encryptedDEK := gcm.Seal(nil, nonce, dek, nil) + + return encryptedDEK, nonce, nil +} + +// decryptKeyWithSuperKey decrypts a DEK using the super key (KEK) with AES-GCM +func (km *SSES3KeyManager) decryptKeyWithSuperKey(encryptedDEK, nonce []byte) ([]byte, error) { + km.mu.RLock() + defer km.mu.RUnlock() + + block, err := aes.NewCipher(km.superKey) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create cipher: %w", err) } - key.KeyID = keyID - km.keys[keyID] = key + gcm, err := cipher.NewGCM(block) + if err != nil { + return nil, fmt.Errorf("failed to create GCM: %w", err) + } - return key, nil + if len(nonce) != gcm.NonceSize() { + return nil, fmt.Errorf("invalid nonce size: expected %d, got %d", gcm.NonceSize(), len(nonce)) + } + + // Decrypt the DEK + dek, err := gcm.Open(nil, nonce, encryptedDEK, nil) + if err != nil { + return nil, fmt.Errorf("failed to decrypt DEK: %w", err) + } + + return dek, nil } -// StoreKey stores a key in the manager +// StoreKey is now a no-op since we use envelope encryption and don't cache DEKs +// The encrypted DEK is stored in the object metadata, not in the key manager func (km *SSES3KeyManager) StoreKey(key *SSES3Key) { - km.keys[key.KeyID] = key + // No-op: With envelope encryption, we don't need to store keys in memory + // The DEK is encrypted with the super key and stored in object metadata } -// GetKey retrieves a key by ID +// GetKey is now a no-op since we don't cache keys +// Keys are retrieved by decrypting the encrypted DEK from object metadata func (km *SSES3KeyManager) GetKey(keyID string) (*SSES3Key, bool) { - key, exists := km.keys[keyID] - return key, exists + // No-op: With envelope encryption, keys are not cached + // Each object's metadata contains the encrypted DEK + return nil, false } // Global SSE-S3 key manager instance @@ -246,6 +439,11 @@ func GetSSES3KeyManager() *SSES3KeyManager { return globalSSES3KeyManager } +// InitializeGlobalSSES3KeyManager initializes the global key manager with filer access +func InitializeGlobalSSES3KeyManager(s3ApiServer *S3ApiServer) error { + return globalSSES3KeyManager.InitializeWithFiler(s3ApiServer) +} + // ProcessSSES3Request processes an SSE-S3 request and returns encryption metadata func ProcessSSES3Request(r *http.Request) (map[string][]byte, error) { if !IsSSES3RequestInternal(r) { diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 7f5b88566..62a3121f2 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -147,6 +147,11 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl s3ApiServer.registerRouter(router) + // Initialize the global SSE-S3 key manager with filer access + if err := InitializeGlobalSSES3KeyManager(s3ApiServer); err != nil { + return nil, fmt.Errorf("failed to initialize SSE-S3 key manager: %w", err) + } + go s3ApiServer.subscribeMetaEvents("s3", startTsNs, filer.DirectoryEtcRoot, []string{option.BucketsPath}) return s3ApiServer, nil } diff --git a/weed/s3api/s3api_streaming_copy.go b/weed/s3api/s3api_streaming_copy.go index c996e6188..7c52a918c 100644 --- a/weed/s3api/s3api_streaming_copy.go +++ b/weed/s3api/s3api_streaming_copy.go @@ -140,10 +140,8 @@ func (scm *StreamingCopyManager) createEncryptionSpec(entry *filer_pb.Entry, r * spec.SourceType = EncryptionTypeSSES3 // Extract SSE-S3 key from metadata if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]; exists { - // TODO: This should use a proper SSE-S3 key manager from S3ApiServer - // For now, create a temporary key manager to handle deserialization - tempKeyManager := NewSSES3KeyManager() - sseKey, err := DeserializeSSES3Metadata(keyData, tempKeyManager) + keyManager := GetSSES3KeyManager() + sseKey, err := DeserializeSSES3Metadata(keyData, keyManager) if err != nil { return nil, fmt.Errorf("deserialize SSE-S3 metadata: %w", err) } diff --git a/weed/topology/volume_growth_reservation_test.go b/weed/topology/volume_growth_reservation_test.go index 36b57a49c..1f545b9bb 100644 --- a/weed/topology/volume_growth_reservation_test.go +++ b/weed/topology/volume_growth_reservation_test.go @@ -81,11 +81,14 @@ func TestVolumeGrowth_ReservationBasedAllocation(t *testing.T) { } // Simulate successful volume creation + // Must acquire lock before accessing children map to prevent race condition + dn.Lock() disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk) deltaDiskUsage := &DiskUsageCounts{ volumeCount: 1, } disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage) + dn.Unlock() // Release reservation after successful creation reservation.releaseAllReservations() @@ -153,11 +156,14 @@ func TestVolumeGrowth_ConcurrentAllocationPreventsRaceCondition(t *testing.T) { // Simulate completion: increment volume count BEFORE releasing reservation if reservation != nil { // First, increment the volume count to reflect the created volume + // Must acquire lock before accessing children map to prevent race condition + dn.Lock() disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk) deltaDiskUsage := &DiskUsageCounts{ volumeCount: 1, } disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage) + dn.Unlock() // Then release the reservation reservation.releaseAllReservations()