From eae129e8183246476f203edd665eeb3a468148ba Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 14 Nov 2025 00:20:10 -0800 Subject: [PATCH] fix sse --- weed/s3api/s3api_object_handlers.go | 228 ++++++++++++++++++++++++---- 1 file changed, 199 insertions(+), 29 deletions(-) diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 9c1ba1c8a..836832c0e 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -380,8 +380,6 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) // Fetch the correct entry for SSE processing (respects versionId) // This consolidates entry lookups to avoid multiple filer calls var objectEntryForSSE *filer_pb.Entry - originalRangeHeader := r.Header.Get("Range") - var sseObject = false // Optimization: Reuse already-fetched entry to avoid redundant metadata fetches if versioningConfigured { @@ -416,35 +414,13 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) // NEW OPTIMIZATION: Stream directly from volume servers, bypassing filer proxy // This eliminates the 19ms filer proxy overhead + // SSE decryption is handled inline during streaming - // Check if this is an SSE object for Range request handling + // Detect SSE encryption type primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE) - if originalRangeHeader != "" && (primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS) { - sseObject = true - // Temporarily remove Range header to get full encrypted data - r.Header.Del("Range") - } - - // Add SSE response headers before streaming - if objectEntryForSSE != nil { - // Create a fake response to get SSE headers - fakeResp := &http.Response{Header: make(http.Header)} - s3a.addSSEHeadersToResponse(fakeResp, objectEntryForSSE) - // Copy SSE headers to actual response - for k, v := range fakeResp.Header { - if strings.HasPrefix(k, "X-Amz-Server-Side-Encryption") { - w.Header()[k] = v - } - } - } - - // Restore the original Range header for SSE processing - if sseObject && originalRangeHeader != "" { - r.Header.Set("Range", originalRangeHeader) - } - - // Stream directly from volume servers - err = s3a.streamFromVolumeServers(w, r, objectEntryForSSE, primarySSEType) + + // Stream directly from volume servers with SSE support + err = s3a.streamFromVolumeServersWithSSE(w, r, objectEntryForSSE, primarySSEType) if err != nil { glog.Errorf("GetObjectHandler: failed to stream from volume servers: %v", err) // Don't write error response - headers already sent @@ -600,6 +576,200 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R return streamFn(w) } +// streamFromVolumeServersWithSSE handles streaming with inline SSE decryption +func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error { + // If not encrypted, use fast path without decryption + if sseType == "" || sseType == "None" { + return s3a.streamFromVolumeServers(w, r, entry, sseType) + } + + glog.V(2).Infof("streamFromVolumeServersWithSSE: Handling %s encrypted object", sseType) + + // Add SSE response headers before streaming + s3a.addSSEResponseHeadersFromEntry(w, r, entry, sseType) + + // For encrypted objects, we need to stream encrypted data through a decryption wrapper + // Create a pipe: encrypted data goes into pipe writer, decrypted data comes from pipe reader + pipeReader, pipeWriter := io.Pipe() + + // Start goroutine to stream encrypted data from volume servers to pipe + encryptedStreamErr := make(chan error, 1) + go func() { + defer pipeWriter.Close() + err := s3a.streamFromVolumeServers(&pipeWriterWrapper{pipeWriter}, r, entry, sseType) + encryptedStreamErr <- err + }() + + // Create decrypted reader based on SSE type + var decryptedReader io.Reader + var decryptErr error + + switch sseType { + case s3_constants.SSETypeC: + decryptedReader, decryptErr = s3a.createSSECDecryptedReaderFromEntry(r, pipeReader, entry) + case s3_constants.SSETypeKMS: + decryptedReader, decryptErr = s3a.createSSEKMSDecryptedReaderFromEntry(r, pipeReader, entry) + case s3_constants.SSETypeS3: + decryptedReader, decryptErr = s3a.createSSES3DecryptedReaderFromEntry(r, pipeReader, entry) + default: + decryptErr = fmt.Errorf("unsupported SSE type: %s", sseType) + } + + if decryptErr != nil { + pipeReader.Close() + return fmt.Errorf("failed to create decrypted reader for %s: %v", sseType, decryptErr) + } + + // Stream decrypted data to response + buf := make([]byte, 128*1024) + _, copyErr := io.CopyBuffer(w, decryptedReader, buf) + + // Check if encrypted streaming had errors + if streamErr := <-encryptedStreamErr; streamErr != nil { + return fmt.Errorf("encrypted stream error: %v", streamErr) + } + + return copyErr +} + +// addSSEResponseHeadersFromEntry adds appropriate SSE response headers based on entry metadata +func (s3a *S3ApiServer) addSSEResponseHeadersFromEntry(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) { + if entry == nil || entry.Extended == nil { + return + } + + switch sseType { + case s3_constants.SSETypeC: + // SSE-C: Echo back algorithm and key MD5 + if algo, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; exists { + w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, string(algo)) + } + if keyMD5, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; exists { + w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, string(keyMD5)) + } + + case s3_constants.SSETypeKMS: + // SSE-KMS: Return algorithm and key ID + w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms") + if kmsMetadataB64, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKeyHeader]; exists { + kmsMetadataBytes, err := base64.StdEncoding.DecodeString(string(kmsMetadataB64)) + if err == nil { + sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) + if err == nil { + AddSSEKMSResponseHeaders(w, sseKMSKey) + } + } + } + + case s3_constants.SSETypeS3: + // SSE-S3: Return algorithm + w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) + } +} + +// pipeWriterWrapper wraps io.PipeWriter to implement http.ResponseWriter interface +type pipeWriterWrapper struct { + *io.PipeWriter +} + +func (pw *pipeWriterWrapper) Header() http.Header { + // Headers are already set on the real ResponseWriter, ignore here + return make(http.Header) +} + +func (pw *pipeWriterWrapper) WriteHeader(statusCode int) { + // Status is already set on the real ResponseWriter, ignore here +} + +// createSSECDecryptedReaderFromEntry creates an SSE-C decrypted reader from entry metadata +func (s3a *S3ApiServer) createSSECDecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) { + // Parse SSE-C headers from request + customerKey, err := ParseSSECHeaders(r) + if err != nil { + return nil, fmt.Errorf("failed to parse SSE-C headers: %w", err) + } + + if customerKey == nil { + return nil, fmt.Errorf("SSE-C key required but not provided") + } + + // Validate key MD5 from entry metadata + if entry.Extended != nil { + storedKeyMD5 := string(entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]) + if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 { + return nil, fmt.Errorf("SSE-C key mismatch") + } + } + + // Get IV from entry metadata + ivBase64 := string(entry.Extended[s3_constants.SeaweedFSSSEIVHeader]) + if ivBase64 == "" { + return nil, fmt.Errorf("SSE-C IV not found in metadata") + } + + iv, err := base64.StdEncoding.DecodeString(ivBase64) + if err != nil { + return nil, fmt.Errorf("failed to decode IV: %w", err) + } + + // Create decrypted reader + return CreateSSECDecryptedReader(encryptedReader, customerKey, iv) +} + +// createSSEKMSDecryptedReaderFromEntry creates an SSE-KMS decrypted reader from entry metadata +func (s3a *S3ApiServer) createSSEKMSDecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) { + // Extract SSE-KMS metadata from entry + if entry.Extended == nil { + return nil, fmt.Errorf("no extended metadata found") + } + + kmsMetadataB64, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKeyHeader] + if !exists { + return nil, fmt.Errorf("SSE-KMS metadata not found") + } + + kmsMetadataBytes, err := base64.StdEncoding.DecodeString(string(kmsMetadataB64)) + if err != nil { + return nil, fmt.Errorf("failed to decode SSE-KMS metadata: %w", err) + } + + sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err) + } + + // Create decrypted reader + return CreateSSEKMSDecryptedReader(encryptedReader, sseKMSKey) +} + +// createSSES3DecryptedReaderFromEntry creates an SSE-S3 decrypted reader from entry metadata +func (s3a *S3ApiServer) createSSES3DecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) { + // Extract SSE-S3 metadata from entry + if entry.Extended == nil { + return nil, fmt.Errorf("no extended metadata found") + } + + keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key] + if !exists { + return nil, fmt.Errorf("SSE-S3 metadata not found") + } + + keyManager := GetSSES3KeyManager() + sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err) + } + + // Get IV + iv, err := GetSSES3IV(entry, sseS3Key, keyManager) + if err != nil { + return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err) + } + + // Create decrypted reader + return CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv) +} + // setResponseHeaders sets all standard HTTP response headers from entry metadata func (s3a *S3ApiServer) setResponseHeaders(w http.ResponseWriter, entry *filer_pb.Entry, totalSize int64) { // Set content length and accept ranges