From b77fd536cb148cb069799f6def4da55599de2253 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 16 Nov 2025 16:47:36 -0800 Subject: [PATCH] multipart sse-s3 --- weed/s3api/s3api_object_handlers.go | 123 ++++++++++++++++++++++++---- 1 file changed, 109 insertions(+), 14 deletions(-) diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index c435f08db..94f9d3afd 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -1023,23 +1023,41 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r } case s3_constants.SSETypeS3: - // SSE-S3 typically doesn't use multipart per-chunk encryption, always use stream - tStreamFetch := time.Now() - encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) - streamFetchTime = time.Since(tStreamFetch) - if streamErr != nil { - return streamErr + sseS3Key := decryptionKey.(*SSES3Key) + + // Check if this is a multipart object (multiple chunks with SSE-S3 metadata) + isMultipartSSES3 := false + sses3Chunks := 0 + for _, chunk := range entry.GetChunks() { + if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(chunk.GetSseMetadata()) > 0 { + sses3Chunks++ + } } - defer encryptedReader.Close() + isMultipartSSES3 = sses3Chunks > 1 + glog.V(3).Infof("SSE-S3 decryption: isMultipart=%v, sses3Chunks=%d", isMultipartSSES3, sses3Chunks) - sseS3Key := decryptionKey.(*SSES3Key) - keyManager := GetSSES3KeyManager() - iv, ivErr := GetSSES3IV(entry, sseS3Key, keyManager) - if ivErr != nil { - return fmt.Errorf("failed to get SSE-S3 IV: %w", ivErr) + if isMultipartSSES3 { + // For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly + decryptedReader, err = s3a.createMultipartSSES3DecryptedReaderDirect(r.Context(), nil, entry) + glog.V(2).Infof("Using multipart SSE-S3 decryption for object with %d chunks (no prefetch)", len(entry.GetChunks())) + } else { + // For single-part, get encrypted stream and decrypt + tStreamFetch := time.Now() + encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) + streamFetchTime = time.Since(tStreamFetch) + if streamErr != nil { + return streamErr + } + defer encryptedReader.Close() + + keyManager := GetSSES3KeyManager() + iv, ivErr := GetSSES3IV(entry, sseS3Key, keyManager) + if ivErr != nil { + return fmt.Errorf("failed to get SSE-S3 IV: %w", ivErr) + } + glog.V(2).Infof("SSE-S3 decryption: KeyID=%s, IV length=%d", sseS3Key.KeyID, len(iv)) + decryptedReader, err = CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv) } - glog.V(2).Infof("SSE-S3 decryption: KeyID=%s, IV length=%d", sseS3Key.KeyID, len(iv)) - decryptedReader, err = CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv) } decryptSetupTime = time.Since(tDecryptSetup) @@ -2672,6 +2690,83 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(ctx context.C return NewMultipartSSEReader(readers), nil } +// createMultipartSSES3DecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-S3 objects (direct volume path) +func (s3a *S3ApiServer) createMultipartSSES3DecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) { + // Sort chunks by offset to ensure correct order + chunks := entry.GetChunks() + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].GetOffset() < chunks[j].GetOffset() + }) + + // Create readers for each chunk, decrypting them independently + var readers []io.Reader + + // Get key manager and SSE-S3 key from entry metadata + keyManager := GetSSES3KeyManager() + keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key] + sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-S3 key from entry metadata: %v", err) + } + + for _, chunk := range chunks { + // Get this chunk's encrypted data + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) + if err != nil { + return nil, fmt.Errorf("failed to create chunk reader: %v", err) + } + + // Handle based on chunk's encryption type + if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 { + // Check if this chunk has per-chunk SSE-S3 metadata + if len(chunk.GetSseMetadata()) == 0 { + chunkReader.Close() + return nil, fmt.Errorf("SSE-S3 chunk %s missing per-chunk metadata", chunk.GetFileIdString()) + } + + // Deserialize the per-chunk SSE-S3 metadata to get the IV + chunkSSES3Metadata, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager) + if err != nil { + chunkReader.Close() + return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata for chunk %s: %v", chunk.GetFileIdString(), err) + } + + // Use the IV from the chunk metadata + iv := chunkSSES3Metadata.IV + glog.V(4).Infof("Decrypting SSE-S3 chunk %s with KeyID=%s, IV length=%d", + chunk.GetFileIdString(), sseS3Key.KeyID, len(iv)) + + // Create decrypted reader for this chunk + decryptedChunkReader, decErr := CreateSSES3DecryptedReader(chunkReader, sseS3Key, iv) + if decErr != nil { + chunkReader.Close() + return nil, fmt.Errorf("failed to decrypt SSE-S3 chunk: %v", decErr) + } + + // Use the streaming decrypted reader directly + readers = append(readers, struct { + io.Reader + io.Closer + }{ + Reader: decryptedChunkReader, + Closer: chunkReader, + }) + glog.V(4).Infof("Added streaming decrypted reader for SSE-S3 chunk %s", chunk.GetFileIdString()) + } else { + // Non-SSE-S3 chunk, use as-is + readers = append(readers, chunkReader) + glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString()) + } + } + + // Close the original encrypted stream since we're reading chunks individually + if encryptedStream != nil { + encryptedStream.Close() + } + + return NewMultipartSSEReader(readers), nil +} + // createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) { // Entry is passed from caller to avoid redundant filer lookup