From def705fd51c68caad18cf1c4280238ba27eda764 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 18 Nov 2025 17:17:44 -0800 Subject: [PATCH] purge unused code --- weed/s3api/s3api_object_handlers.go | 576 ++---------------------- weed/s3api/s3api_object_handlers_put.go | 20 - 2 files changed, 46 insertions(+), 550 deletions(-) diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 8c52fa114..c6fe6f94f 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -113,18 +113,19 @@ func removeDuplicateSlashes(object string) string { // - If a 0-byte object or directory has no children → it's empty → HEAD returns 200 // // Examples: -// hasChildren("bucket", "dataset") where "dataset/file.txt" exists → true -// hasChildren("bucket", "empty-dir") where no children exist → false +// +// hasChildren("bucket", "dataset") where "dataset/file.txt" exists → true +// hasChildren("bucket", "empty-dir") where no children exist → false // // Performance: ~1-5ms per call (one gRPC LIST request with Limit=1) func (s3a *S3ApiServer) hasChildren(bucket, prefix string) bool { // Clean up prefix: remove leading slashes cleanPrefix := strings.TrimPrefix(prefix, "/") - + // The directory to list is bucketDir + cleanPrefix bucketDir := s3a.option.BucketsPath + "/" + bucket fullPath := bucketDir + "/" + cleanPrefix - + // Try to list one child object in the directory err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.ListEntriesRequest{ @@ -132,12 +133,12 @@ func (s3a *S3ApiServer) hasChildren(bucket, prefix string) bool { Limit: 1, InclusiveStartFrom: true, } - + stream, err := client.ListEntries(context.Background(), request) if err != nil { return err } - + // Check if we got at least one entry _, err = stream.Recv() if err == io.EOF { @@ -148,7 +149,7 @@ func (s3a *S3ApiServer) hasChildren(bucket, prefix string) bool { } return nil }) - + // If we got an entry (not EOF), then it has children return err == nil } @@ -402,14 +403,14 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) if versionId != "" { // Request for specific version - must look in .versions directory - glog.V(3).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object) - entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) - if err != nil { - glog.Errorf("Failed to get specific version %s: %v", versionId, err) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } - targetVersionId = versionId + glog.V(3).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object) + entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) + if err != nil { + glog.Errorf("Failed to get specific version %s: %v", versionId, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + targetVersionId = versionId } else { // Request for latest version - OPTIMIZATION: // Check if .versions/ directory exists quickly (no retries) to decide path @@ -447,15 +448,15 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) } } else { // Transient error checking .versions/, fall back to getLatestObjectVersion with retries - glog.V(2).Infof("GetObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr) - entry, err = s3a.getLatestObjectVersion(bucket, object) - if err != nil { - glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return + glog.V(2).Infof("GetObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr) + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } } - } - // Extract version ID if not already set + // Extract version ID if not already set if targetVersionId == "" { if entry.Extended != nil { if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { @@ -1751,99 +1752,6 @@ func (s3a *S3ApiServer) addSSEResponseHeadersFromEntry(w http.ResponseWriter, r } } -// pipeWriterWrapper wraps io.PipeWriter to implement http.ResponseWriter interface -type pipeWriterWrapper struct { - *io.PipeWriter -} - -func (pw *pipeWriterWrapper) Header() http.Header { - // Headers are already set on the real ResponseWriter, ignore here - return make(http.Header) -} - -func (pw *pipeWriterWrapper) WriteHeader(statusCode int) { - // Status is already set on the real ResponseWriter, ignore here -} - -// createSSECDecryptedReaderFromEntry creates an SSE-C decrypted reader from entry metadata -func (s3a *S3ApiServer) createSSECDecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) { - // Parse SSE-C headers from request - customerKey, err := ParseSSECHeaders(r) - if err != nil { - return nil, fmt.Errorf("failed to parse SSE-C headers: %w", err) - } - - if customerKey == nil { - return nil, fmt.Errorf("SSE-C key required but not provided") - } - - // Validate key MD5 from entry metadata - if entry.Extended != nil { - storedKeyMD5 := string(entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]) - if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 { - return nil, fmt.Errorf("SSE-C key mismatch") - } - } - - // Get IV from entry metadata (stored as raw bytes, matching filer behavior) - iv := entry.Extended[s3_constants.SeaweedFSSSEIV] - if len(iv) == 0 { - return nil, fmt.Errorf("SSE-C IV not found in metadata") - } - - // Create decrypted reader - return CreateSSECDecryptedReader(encryptedReader, customerKey, iv) -} - -// createSSEKMSDecryptedReaderFromEntry creates an SSE-KMS decrypted reader from entry metadata -func (s3a *S3ApiServer) createSSEKMSDecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) { - // Extract SSE-KMS metadata from entry (stored as raw bytes, matching filer behavior) - if entry.Extended == nil { - return nil, fmt.Errorf("no extended metadata found") - } - - kmsMetadataBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey] - if !exists { - return nil, fmt.Errorf("SSE-KMS metadata not found") - } - - sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) - if err != nil { - return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err) - } - - // Create decrypted reader - return CreateSSEKMSDecryptedReader(encryptedReader, sseKMSKey) -} - -// createSSES3DecryptedReaderFromEntry creates an SSE-S3 decrypted reader from entry metadata -func (s3a *S3ApiServer) createSSES3DecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) { - // Extract SSE-S3 metadata from entry (stored as raw bytes, matching filer behavior) - if entry.Extended == nil { - return nil, fmt.Errorf("no extended metadata found") - } - - keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key] - if !exists { - return nil, fmt.Errorf("SSE-S3 metadata not found") - } - - keyManager := GetSSES3KeyManager() - sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) - if err != nil { - return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err) - } - - // Get IV - iv, err := GetSSES3IV(entry, sseS3Key, keyManager) - if err != nil { - return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err) - } - - // Create decrypted reader - return CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv) -} - // setResponseHeaders sets all standard HTTP response headers from entry metadata func (s3a *S3ApiServer) setResponseHeaders(w http.ResponseWriter, entry *filer_pb.Entry, totalSize int64) { // Safety check: entry must be valid @@ -1954,10 +1862,11 @@ func (s *simpleMasterClient) GetLookupFileIdFunction() wdclient.LookupFileIdFunc // which correctly identifies directories by checking for children // // Examples: -// HEAD /bucket/dataset (no trailing slash, has children) → 404 Not Found (implicit directory) -// HEAD /bucket/dataset/ (trailing slash) → 200 OK (explicit directory request) -// HEAD /bucket/empty.txt (0-byte file, no children) → 200 OK (legitimate empty file) -// HEAD /bucket/file.txt (regular file) → 200 OK (normal operation) +// +// HEAD /bucket/dataset (no trailing slash, has children) → 404 Not Found (implicit directory) +// HEAD /bucket/dataset/ (trailing slash) → 200 OK (explicit directory request) +// HEAD /bucket/empty.txt (0-byte file, no children) → 200 OK (legitimate empty file) +// HEAD /bucket/file.txt (regular file) → 200 OK (normal operation) // // This behavior only applies to: // - Non-versioned buckets (versioned buckets use different semantics) @@ -2009,14 +1918,14 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request if versionId != "" { // Request for specific version - glog.V(2).Infof("HeadObject: requesting specific version %s for %s%s", versionId, bucket, object) - entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) - if err != nil { - glog.Errorf("Failed to get specific version %s: %v", versionId, err) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } - targetVersionId = versionId + glog.V(2).Infof("HeadObject: requesting specific version %s for %s%s", versionId, bucket, object) + entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) + if err != nil { + glog.Errorf("Failed to get specific version %s: %v", versionId, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + targetVersionId = versionId } else { // Request for latest version - OPTIMIZATION: // Check if .versions/ directory exists quickly (no retries) to decide path @@ -2054,15 +1963,15 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request } } else { // Transient error checking .versions/, fall back to getLatestObjectVersion with retries - glog.V(2).Infof("HeadObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr) - entry, err = s3a.getLatestObjectVersion(bucket, object) - if err != nil { - glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return + glog.V(2).Infof("HeadObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr) + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } } - } - // Extract version ID if not already set + // Extract version ID if not already set if targetVersionId == "" { if entry.Extended != nil { if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { @@ -2176,7 +2085,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request if objectEntryForSSE.Attributes != nil { isZeroByteFile := objectEntryForSSE.Attributes.FileSize == 0 && !objectEntryForSSE.IsDirectory isActualDirectory := objectEntryForSSE.IsDirectory - + if isZeroByteFile || isActualDirectory { // Check if it has children (making it an implicit directory) if s3a.hasChildren(bucket, object) { @@ -2520,247 +2429,6 @@ func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http. } } -// handleSSEResponse handles both SSE-C and SSE-KMS decryption/validation and response processing -// The objectEntry parameter should be the correct entry for the requested version (if versioned) -func (s3a *S3ApiServer) handleSSEResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, objectEntry *filer_pb.Entry) (statusCode int, bytesTransferred int64) { - // Check what the client is expecting based on request headers - clientExpectsSSEC := IsSSECRequest(r) - - // Check what the stored object has in headers (may be conflicting after copy) - kmsMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader) - - // Detect actual object SSE type from the provided entry (respects versionId) - actualObjectType := "Unknown" - if objectEntry != nil { - actualObjectType = s3a.detectPrimarySSEType(objectEntry) - } - - // If objectEntry is nil, we cannot determine SSE type from chunks - // This should only happen for 404s which will be handled by the proxy - if objectEntry == nil { - glog.V(4).Infof("Object entry not available for SSE routing, passing through") - return passThroughResponse(proxyResponse, w) - } - - // Route based on ACTUAL object type (from chunks) rather than conflicting headers - if actualObjectType == s3_constants.SSETypeC && clientExpectsSSEC { - // Object is SSE-C and client expects SSE-C → SSE-C handler - return s3a.handleSSECResponse(r, proxyResponse, w, objectEntry) - } else if actualObjectType == s3_constants.SSETypeKMS && !clientExpectsSSEC { - // Object is SSE-KMS and client doesn't expect SSE-C → SSE-KMS handler - return s3a.handleSSEKMSResponse(r, proxyResponse, w, objectEntry, kmsMetadataHeader) - } else if actualObjectType == s3_constants.SSETypeS3 && !clientExpectsSSEC { - // Object is SSE-S3 and client doesn't expect SSE-C → SSE-S3 handler - return s3a.handleSSES3Response(r, proxyResponse, w, objectEntry) - } else if actualObjectType == "None" && !clientExpectsSSEC { - // Object is unencrypted and client doesn't expect SSE-C → pass through - return passThroughResponse(proxyResponse, w) - } else if actualObjectType == s3_constants.SSETypeC && !clientExpectsSSEC { - // Object is SSE-C but client doesn't provide SSE-C headers → Error - s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) - return http.StatusBadRequest, 0 - } else if actualObjectType == s3_constants.SSETypeKMS && clientExpectsSSEC { - // Object is SSE-KMS but client provides SSE-C headers → Error - s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) - return http.StatusBadRequest, 0 - } else if actualObjectType == s3_constants.SSETypeS3 && clientExpectsSSEC { - // Object is SSE-S3 but client provides SSE-C headers → Error (mismatched encryption) - s3err.WriteErrorResponse(w, r, s3err.ErrSSEEncryptionTypeMismatch) - return http.StatusBadRequest, 0 - } else if actualObjectType == "None" && clientExpectsSSEC { - // Object is unencrypted but client provides SSE-C headers → Error - s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) - return http.StatusBadRequest, 0 - } - - // Unknown state - pass through and let proxy handle it - glog.V(4).Infof("Unknown SSE state: objectType=%s, clientExpectsSSEC=%v", actualObjectType, clientExpectsSSEC) - return passThroughResponse(proxyResponse, w) -} - -// handleSSEKMSResponse handles SSE-KMS decryption and response processing -func (s3a *S3ApiServer) handleSSEKMSResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, entry *filer_pb.Entry, kmsMetadataHeader string) (statusCode int, bytesTransferred int64) { - // Deserialize SSE-KMS metadata - kmsMetadataBytes, err := base64.StdEncoding.DecodeString(kmsMetadataHeader) - if err != nil { - glog.Errorf("Failed to decode SSE-KMS metadata: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - - sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) - if err != nil { - glog.Errorf("Failed to deserialize SSE-KMS metadata: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - - // For HEAD requests, we don't need to decrypt the body, just add response headers - if r.Method == "HEAD" { - // Capture existing CORS headers that may have been set by middleware - capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) - - // Copy headers from proxy response (excluding internal SeaweedFS headers) - copyResponseHeaders(w, proxyResponse, false) - - // Add SSE-KMS response headers - AddSSEKMSResponseHeaders(w, sseKMSKey) - - return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders) - } - - // For GET requests, check if this is a multipart SSE-KMS object - // We need to check the object structure to determine if it's multipart encrypted - isMultipartSSEKMS := false - - if sseKMSKey != nil && entry != nil { - // Use the entry parameter passed from the caller (avoids redundant lookup) - // Check for multipart SSE-KMS - sseKMSChunks := 0 - for _, chunk := range entry.GetChunks() { - if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 { - sseKMSChunks++ - } - } - isMultipartSSEKMS = sseKMSChunks > 1 - } - - var decryptedReader io.Reader - if isMultipartSSEKMS { - // Handle multipart SSE-KMS objects - each chunk needs independent decryption - multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse, entry) - if decErr != nil { - glog.Errorf("Failed to create multipart SSE-KMS decrypted reader: %v", decErr) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - decryptedReader = multipartReader - glog.V(3).Infof("Using multipart SSE-KMS decryption for object") - } else { - // Handle single-part SSE-KMS objects - singlePartReader, decErr := CreateSSEKMSDecryptedReader(proxyResponse.Body, sseKMSKey) - if decErr != nil { - glog.Errorf("Failed to create SSE-KMS decrypted reader: %v", decErr) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - decryptedReader = singlePartReader - glog.V(3).Infof("Using single-part SSE-KMS decryption for object") - } - - // Capture existing CORS headers that may have been set by middleware - capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) - - // Copy headers from proxy response (excluding body-related headers that might change and internal SeaweedFS headers) - copyResponseHeaders(w, proxyResponse, true) - - // Set correct Content-Length for SSE-KMS - if proxyResponse.Header.Get("Content-Range") == "" { - // For full object requests, encrypted length equals original length - if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" { - w.Header().Set("Content-Length", contentLengthStr) - } - } - - // Add SSE-KMS response headers - AddSSEKMSResponseHeaders(w, sseKMSKey) - - return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders) -} - -// handleSSES3Response handles SSE-S3 decryption and response processing -func (s3a *S3ApiServer) handleSSES3Response(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, entry *filer_pb.Entry) (statusCode int, bytesTransferred int64) { - - // For HEAD requests, we don't need to decrypt the body, just add response headers - if r.Method == "HEAD" { - // Capture existing CORS headers that may have been set by middleware - capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) - - // Copy headers from proxy response (excluding internal SeaweedFS headers) - copyResponseHeaders(w, proxyResponse, false) - - // Add SSE-S3 response headers - w.Header().Set(s3_constants.AmzServerSideEncryption, SSES3Algorithm) - - return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders) - } - - // For GET requests, check if this is a multipart SSE-S3 object - isMultipartSSES3 := false - sses3Chunks := 0 - for _, chunk := range entry.GetChunks() { - if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(chunk.GetSseMetadata()) > 0 { - sses3Chunks++ - } - } - isMultipartSSES3 = sses3Chunks > 1 - - var decryptedReader io.Reader - if isMultipartSSES3 { - // Handle multipart SSE-S3 objects - each chunk needs independent decryption - multipartReader, decErr := s3a.createMultipartSSES3DecryptedReader(r, entry) - if decErr != nil { - glog.Errorf("Failed to create multipart SSE-S3 decrypted reader: %v", decErr) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - decryptedReader = multipartReader - glog.V(3).Infof("Using multipart SSE-S3 decryption for object") - } else { - // Handle single-part SSE-S3 objects - // Extract SSE-S3 key from metadata - keyManager := GetSSES3KeyManager() - if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]; !exists { - glog.Errorf("SSE-S3 key metadata not found in object entry") - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } else { - sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) - if err != nil { - glog.Errorf("Failed to deserialize SSE-S3 metadata: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - - // Extract IV from metadata using helper function - iv, err := GetSSES3IV(entry, sseS3Key, keyManager) - if err != nil { - glog.Errorf("Failed to get SSE-S3 IV: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - - singlePartReader, decErr := CreateSSES3DecryptedReader(proxyResponse.Body, sseS3Key, iv) - if decErr != nil { - glog.Errorf("Failed to create SSE-S3 decrypted reader: %v", decErr) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - decryptedReader = singlePartReader - glog.V(3).Infof("Using single-part SSE-S3 decryption for object") - } - } - - // Capture existing CORS headers that may have been set by middleware - capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) - - // Copy headers from proxy response (excluding body-related headers that might change and internal SeaweedFS headers) - copyResponseHeaders(w, proxyResponse, true) - - // Set correct Content-Length for SSE-S3 - if proxyResponse.Header.Get("Content-Range") == "" { - // For full object requests, encrypted length equals original length - if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" { - w.Header().Set("Content-Length", contentLengthStr) - } - } - - // Add SSE-S3 response headers - w.Header().Set(s3_constants.AmzServerSideEncryption, SSES3Algorithm) - - return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders) -} - // addObjectLockHeadersToResponse extracts object lock metadata from entry Extended attributes // and adds the appropriate S3 headers to the response func (s3a *S3ApiServer) addObjectLockHeadersToResponse(w http.ResponseWriter, entry *filer_pb.Entry) { @@ -3170,158 +2838,6 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReaderDirect(ctx context.Co return NewMultipartSSEReader(readers), nil } -// createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects -func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) { - // Entry is passed from caller to avoid redundant filer lookup - ctx := r.Context() - - // Sort chunks by offset to ensure correct order - chunks := entry.GetChunks() - sort.Slice(chunks, func(i, j int) bool { - return chunks[i].GetOffset() < chunks[j].GetOffset() - }) - - // Create readers for each chunk, decrypting them independently - var readers []io.Reader - - for _, chunk := range chunks { - // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) - if err != nil { - return nil, fmt.Errorf("failed to create chunk reader: %v", err) - } - - // Get SSE-KMS metadata for this chunk - var chunkSSEKMSKey *SSEKMSKey - - // Check if this chunk has per-chunk SSE-KMS metadata (new architecture) - if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 { - // Use the per-chunk SSE-KMS metadata - kmsKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata()) - if err != nil { - glog.Errorf("Failed to deserialize per-chunk SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err) - } else { - // ChunkOffset is already set from the stored metadata (PartOffset) - chunkSSEKMSKey = kmsKey - } - } - - // Note: No fallback to object-level metadata for multipart objects - // Each chunk in a multipart SSE-KMS object must have its own unique IV - // Falling back to object-level metadata could lead to IV reuse or incorrect decryption - - if chunkSSEKMSKey == nil { - return nil, fmt.Errorf("no SSE-KMS metadata found for chunk %s in multipart object", chunk.GetFileIdString()) - } - - // Create decrypted reader for this chunk - decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, chunkSSEKMSKey) - if decErr != nil { - chunkReader.Close() // Close the chunk reader if decryption fails - return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) - } - - // Wrap the decrypted reader with the underlying chunkReader to ensure HTTP body is closed - // This matches the SSE-S3 pattern and prevents resource leaks - readers = append(readers, struct { - io.Reader - io.Closer - }{ - Reader: decryptedChunkReader, - Closer: chunkReader, - }) - glog.V(4).Infof("Added streaming decrypted reader for chunk %s in multipart SSE-KMS object", chunk.GetFileIdString()) - } - - // Combine all decrypted chunk readers into a single stream with proper resource management - multiReader := NewMultipartSSEReader(readers) - glog.V(3).Infof("Created multipart SSE-KMS decrypted reader with %d chunks", len(readers)) - - return multiReader, nil -} - -// createMultipartSSES3DecryptedReader creates a reader for multipart SSE-S3 objects -func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, entry *filer_pb.Entry) (io.Reader, error) { - ctx := r.Context() - - // Sort chunks by offset to ensure correct order - chunks := entry.GetChunks() - sort.Slice(chunks, func(i, j int) bool { - return chunks[i].GetOffset() < chunks[j].GetOffset() - }) - - // Create readers for each chunk, decrypting them independently - var readers []io.Reader - keyManager := GetSSES3KeyManager() - - for _, chunk := range chunks { - // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) - if err != nil { - return nil, fmt.Errorf("failed to create chunk reader: %v", err) - } - - // Handle based on chunk's encryption type - if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 { - var chunkSSES3Key *SSES3Key - - // Check if this chunk has per-chunk SSE-S3 metadata - if len(chunk.GetSseMetadata()) > 0 { - // Use the per-chunk SSE-S3 metadata - sseKey, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager) - if err != nil { - glog.Errorf("Failed to deserialize per-chunk SSE-S3 metadata for chunk %s: %v", chunk.GetFileIdString(), err) - chunkReader.Close() - return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %v", err) - } - chunkSSES3Key = sseKey - } - - // Note: No fallback to object-level metadata for multipart objects - // Each chunk in a multipart SSE-S3 object must have its own unique IV - // Falling back to object-level metadata could lead to IV reuse or incorrect decryption - - if chunkSSES3Key == nil { - chunkReader.Close() - return nil, fmt.Errorf("no SSE-S3 metadata found for chunk %s in multipart object", chunk.GetFileIdString()) - } - - // Extract IV from chunk metadata - if len(chunkSSES3Key.IV) == 0 { - chunkReader.Close() - return nil, fmt.Errorf("no IV found in SSE-S3 metadata for chunk %s", chunk.GetFileIdString()) - } - - // Create decrypted reader for this chunk - decryptedChunkReader, decErr := CreateSSES3DecryptedReader(chunkReader, chunkSSES3Key, chunkSSES3Key.IV) - if decErr != nil { - chunkReader.Close() - return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) - } - - // Use the streaming decrypted reader directly, ensuring the underlying chunkReader can be closed - readers = append(readers, struct { - io.Reader - io.Closer - }{ - Reader: decryptedChunkReader, - Closer: chunkReader, - }) - glog.V(4).Infof("Added streaming decrypted reader for chunk %s in multipart SSE-S3 object", chunk.GetFileIdString()) - } else { - // Non-SSE-S3 chunk (unencrypted or other encryption type), use as-is - readers = append(readers, chunkReader) - glog.V(4).Infof("Added passthrough reader for non-SSE-S3 chunk %s (type: %v)", chunk.GetFileIdString(), chunk.GetSseType()) - } - } - - // Combine all decrypted chunk readers into a single stream - multiReader := NewMultipartSSEReader(readers) - glog.V(3).Infof("Created multipart SSE-S3 decrypted reader with %d chunks", len(readers)) - - return multiReader, nil -} - // createEncryptedChunkReader creates a reader for a single encrypted chunk // Context propagation ensures cancellation if the S3 client disconnects func (s3a *S3ApiServer) createEncryptedChunkReader(ctx context.Context, chunk *filer_pb.FileChunk) (io.ReadCloser, error) { diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index a4f4d8de8..a1cad5cd4 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -641,26 +641,6 @@ func filerErrorToS3Error(errString string) s3err.ErrorCode { } } -func (s3a *S3ApiServer) maybeAddFilerJwtAuthorization(r *http.Request, isWrite bool) { - encodedJwt := s3a.maybeGetFilerJwtAuthorizationToken(isWrite) - - if encodedJwt == "" { - return - } - - r.Header.Set("Authorization", "BEARER "+string(encodedJwt)) -} - -func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string { - var encodedJwt security.EncodedJwt - if isWrite { - encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.SigningKey, s3a.filerGuard.ExpiresAfterSec) - } else { - encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec) - } - return string(encodedJwt) -} - // setObjectOwnerFromRequest sets the object owner metadata based on the authenticated user func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_pb.Entry) { amzAccountId := r.Header.Get(s3_constants.AmzAccountId)