From abfb67ac74325f463f8e8070e2d1d2912a928531 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 18 Nov 2025 11:29:10 -0800 Subject: [PATCH] Revert "handling directory" This reverts commit 3a335f0ac33c63f51975abc63c40e5328857a74b. --- weed/s3api/s3api_object_handlers.go | 2326 ++++++++++++++--- weed/s3api/s3api_object_handlers_multipart.go | 8 +- .../s3api/s3api_object_handlers_postpolicy.go | 6 +- weed/s3api/s3api_object_handlers_put.go | 480 +++- 4 files changed, 2394 insertions(+), 426 deletions(-) diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index e52421dd8..6dd760859 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -2,12 +2,17 @@ package s3api import ( "bytes" + "context" "encoding/base64" + "encoding/json" "errors" "fmt" "io" + "math" + "mime" "net/http" "net/url" + "path/filepath" "sort" "strconv" "strings" @@ -15,13 +20,14 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/util/mem" "github.com/seaweedfs/seaweedfs/weed/glog" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) // corsHeaders defines the CORS headers that need to be preserved @@ -35,6 +41,10 @@ var corsHeaders = []string{ "Access-Control-Allow-Credentials", } +// zeroBuf is a reusable buffer of zero bytes for padding operations +// Package-level to avoid per-call allocations in writeZeroBytes +var zeroBuf = make([]byte, 32*1024) + func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser { mimeBuffer := make([]byte, 512) size, _ := dataReader.Read(mimeBuffer) @@ -123,6 +133,13 @@ func (s3a *S3ApiServer) checkDirectoryObject(bucket, object string) (*filer_pb.E // serveDirectoryContent serves the content of a directory object directly func (s3a *S3ApiServer) serveDirectoryContent(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry) { + // Defensive nil checks - entry and attributes should never be nil, but guard against it + if entry == nil || entry.Attributes == nil { + glog.Errorf("serveDirectoryContent: entry or attributes is nil") + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + // Set content type - use stored MIME type or default contentType := entry.Attributes.Mime if contentType == "" { @@ -177,39 +194,6 @@ func (s3a *S3ApiServer) handleDirectoryObjectRequest(w http.ResponseWriter, r *h return false // Not a directory object, continue with normal processing } -// rejectDirectoryObjectWithoutSlash rejects requests that target directories without using the trailing slash form. -// AWS S3 treats such requests as missing objects, which is relied upon by readers that distinguish -// files vs. prefixes via HEAD calls (e.g. PyArrow datasets). Seaweed internally stores directories -// as filer entries, so we need to hide them unless the caller explicitly uses the directory form. -// -// This function is optimized to avoid fetching entries when not necessary: -// - If entryRef is provided and populated, use it -// - Otherwise, skip the check (let normal flow handle it) -func (s3a *S3ApiServer) rejectDirectoryObjectWithoutSlash(w http.ResponseWriter, r *http.Request, bucket, object, handlerName string, entryRef **filer_pb.Entry) bool { - if strings.HasSuffix(object, "/") { - return false - } - - // Only check if we already have the entry from a previous operation - // This avoids unnecessary filer lookups that could interfere with the request flow - var currentEntry *filer_pb.Entry - if entryRef != nil && *entryRef != nil { - currentEntry = *entryRef - } else { - // No entry available - skip the check and let normal flow handle it - // This is important for GET requests where we don't want to add extra latency - return false - } - - if currentEntry != nil && currentEntry.IsDirectory { - glog.V(2).Infof("%s: object %s/%s is a directory but requested without trailing slash", handlerName, bucket, object) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return true - } - - return false -} - func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bucketPrefix string, fetchOwner bool, isDirectory bool, encodingTypeUrl bool, iam AccountManager) (listEntry ListEntry) { storageClass := "STANDARD" if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok { @@ -305,36 +289,44 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) bucket, object := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("GetObjectHandler %s %s", bucket, object) + // TTFB Profiling: Track all stages until first byte + tStart := time.Now() + var ( + conditionalHeadersTime time.Duration + versioningCheckTime time.Duration + entryFetchTime time.Duration + streamTime time.Duration + ) + defer func() { + totalTime := time.Since(tStart) + glog.V(2).Infof("GET TTFB PROFILE %s/%s: total=%v | conditional=%v, versioning=%v, entryFetch=%v, stream=%v", + bucket, object, totalTime, conditionalHeadersTime, versioningCheckTime, entryFetchTime, streamTime) + }() + // Handle directory objects with shared logic if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "GetObjectHandler") { return // Directory object request was handled } // Check conditional headers and handle early return if conditions fail + tConditional := time.Now() result, handled := s3a.processConditionalHeaders(w, r, bucket, object, "GetObjectHandler") + conditionalHeadersTime = time.Since(tConditional) if handled { return } - entryFromConditional := result.Entry - if s3a.rejectDirectoryObjectWithoutSlash(w, r, bucket, object, "GetObjectHandler", &entryFromConditional) { - return - } - if entryFromConditional != result.Entry { - result.Entry = entryFromConditional - } - // Check for specific version ID in query parameters versionId := r.URL.Query().Get("versionId") var ( - destUrl string entry *filer_pb.Entry // Declare entry at function scope for SSE processing versioningConfigured bool err error ) // Check if versioning is configured for the bucket (Enabled or Suspended) + tVersioning := time.Now() // Note: We need to check this even if versionId is empty, because versioned buckets // handle even "get latest version" requests differently (through .versions directory) versioningConfigured, err = s3a.isVersioningConfigured(bucket) @@ -347,39 +339,90 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId) + glog.V(3).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId) if versioningConfigured { - // Handle versioned GET - all versions are stored in .versions directory + // Handle versioned GET - check if specific version requested var targetVersionId string if versionId != "" { - // Request for specific version - glog.V(2).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object) + // Request for specific version - must look in .versions directory + glog.V(3).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object) entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) if err != nil { glog.Errorf("Failed to get specific version %s: %v", versionId, err) s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) return } - targetVersionId = versionId - } else { - // Request for latest version - glog.V(1).Infof("GetObject: requesting latest version for %s%s", bucket, object) - entry, err = s3a.getLatestObjectVersion(bucket, object) - if err != nil { - glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err) + // Safety check: entry must be valid after successful retrieval + if entry == nil { + glog.Errorf("GetObject: getSpecificObjectVersion returned nil entry without error for version %s", versionId) s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) return } - if entry.Extended != nil { - if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { - targetVersionId = string(versionIdBytes) + targetVersionId = versionId + } else { + // Request for latest version - OPTIMIZATION: + // Check if .versions/ directory exists quickly (no retries) to decide path + // - If .versions/ exists: real versions available, use getLatestObjectVersion + // - If .versions/ doesn't exist (ErrNotFound): only null version at regular path, use it directly + // - If transient error: fall back to getLatestObjectVersion which has retry logic + bucketDir := s3a.option.BucketsPath + "/" + bucket + normalizedObject := removeDuplicateSlashes(object) + versionsDir := normalizedObject + s3_constants.VersionsFolder + + // Quick check (no retries) for .versions/ directory + versionsEntry, versionsErr := s3a.getEntry(bucketDir, versionsDir) + + if versionsErr == nil && versionsEntry != nil { + // .versions/ exists, meaning real versions are stored there + // Use getLatestObjectVersion which will properly find the newest version + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } else if errors.Is(versionsErr, filer_pb.ErrNotFound) { + // .versions/ doesn't exist (confirmed not found), check regular path for null version + regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject) + if regularErr == nil && regularEntry != nil { + // Found object at regular path - this is the null version + entry = regularEntry + targetVersionId = "null" + } else { + // No object at regular path either - object doesn't exist + glog.Errorf("GetObject: object not found at regular path or .versions for %s%s", bucket, object) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } else { + // Transient error checking .versions/, fall back to getLatestObjectVersion with retries + glog.V(2).Infof("GetObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr) + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return } } - // If no version ID found in entry, this is a pre-versioning object + // Safety check: entry must be valid after successful retrieval + if entry == nil { + glog.Errorf("GetObject: entry is nil after versioned lookup for %s%s", bucket, object) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + // Extract version ID if not already set if targetVersionId == "" { - targetVersionId = "null" + if entry.Extended != nil { + if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { + targetVersionId = string(versionIdBytes) + } + } + // If no version ID found in entry, this is a pre-versioning object + if targetVersionId == "" { + targetVersionId = "null" + } } } @@ -391,16 +434,11 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) } } - // Determine the actual file path based on whether this is a versioned or pre-versioning object + // For versioned objects, log the target version if targetVersionId == "null" { - // Pre-versioning object - stored as regular file - destUrl = s3a.toFilerUrl(bucket, object) - glog.V(2).Infof("GetObject: pre-versioning object URL: %s", destUrl) + glog.V(2).Infof("GetObject: pre-versioning object %s/%s", bucket, object) } else { - // Versioned object - stored in .versions directory - versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId) - destUrl = s3a.toFilerUrl(bucket, versionObjectPath) - glog.V(2).Infof("GetObject: version %s URL: %s", targetVersionId, destUrl) + glog.V(2).Infof("GetObject: version %s for %s/%s", targetVersionId, bucket, object) } // Set version ID in response header @@ -408,21 +446,25 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) // Add object lock metadata to response headers if present s3a.addObjectLockHeadersToResponse(w, entry) - } else { - // Handle regular GET (non-versioned) - destUrl = s3a.toFilerUrl(bucket, object) } + versioningCheckTime = time.Since(tVersioning) + // Fetch the correct entry for SSE processing (respects versionId) // This consolidates entry lookups to avoid multiple filer calls + tEntryFetch := time.Now() var objectEntryForSSE *filer_pb.Entry - originalRangeHeader := r.Header.Get("Range") - var sseObject = false // Optimization: Reuse already-fetched entry to avoid redundant metadata fetches if versioningConfigured { // For versioned objects, reuse the already-fetched entry objectEntryForSSE = entry + // Safety check - this should never happen as versioned path handles errors above + if objectEntryForSSE == nil { + glog.Errorf("GetObjectHandler: unexpected nil entry for versioned object %s/%s", bucket, object) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } } else { // For non-versioned objects, try to reuse entry from conditional header check if result.Entry != nil { @@ -438,7 +480,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) var fetchErr error objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object) if fetchErr != nil { - glog.Errorf("GetObjectHandler: failed to get entry for SSE check: %v", fetchErr) + glog.Warningf("GetObjectHandler: failed to get entry for %s/%s: %v", bucket, object, fetchErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } @@ -449,113 +491,1535 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) } } } + entryFetchTime = time.Since(tEntryFetch) + + // Check if PartNumber query parameter is present (for multipart GET requests) + partNumberStr := r.URL.Query().Get("partNumber") + if partNumberStr == "" { + partNumberStr = r.URL.Query().Get("PartNumber") + } + + // If PartNumber is specified, set headers and modify Range to read only that part + // This replicates the filer handler logic + if partNumberStr != "" { + if partNumber, parseErr := strconv.Atoi(partNumberStr); parseErr == nil && partNumber > 0 { + // Get actual parts count from metadata (not chunk count) + partsCount, partInfo := s3a.getMultipartInfo(objectEntryForSSE, partNumber) + + // Validate part number + if partNumber > partsCount { + glog.Warningf("GetObject: Invalid part number %d, object has %d parts", partNumber, partsCount) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) + return + } - // Check if this is an SSE object for Range request handling - // This applies to both versioned and non-versioned objects - if originalRangeHeader != "" && objectEntryForSSE != nil { - primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE) - if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS { - sseObject = true - // Temporarily remove Range header to get full encrypted data from filer - r.Header.Del("Range") + // Set parts count header + w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(partsCount)) + glog.V(3).Infof("GetObject: Set PartsCount=%d for multipart GET with PartNumber=%d", partsCount, partNumber) + + // Calculate the byte range for this part + var startOffset, endOffset int64 + if partInfo != nil { + // Use part boundaries from metadata (accurate for multi-chunk parts) + startOffset = objectEntryForSSE.Chunks[partInfo.StartChunk].Offset + lastChunk := objectEntryForSSE.Chunks[partInfo.EndChunk-1] + endOffset = lastChunk.Offset + int64(lastChunk.Size) - 1 + + // Override ETag with the part's ETag from metadata + w.Header().Set("ETag", "\""+partInfo.ETag+"\"") + glog.V(3).Infof("GetObject: Override ETag with part %d ETag: %s (from metadata)", partNumber, partInfo.ETag) + } else { + // Fallback: assume 1:1 part-to-chunk mapping (backward compatibility) + chunkIndex := partNumber - 1 + if chunkIndex >= len(objectEntryForSSE.Chunks) { + glog.Warningf("GetObject: Part %d chunk index %d out of range (chunks: %d)", partNumber, chunkIndex, len(objectEntryForSSE.Chunks)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) + return + } + partChunk := objectEntryForSSE.Chunks[chunkIndex] + startOffset = partChunk.Offset + endOffset = partChunk.Offset + int64(partChunk.Size) - 1 + + // Override ETag with chunk's ETag (fallback) + if partChunk.ETag != "" { + if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil { + partETag := fmt.Sprintf("%x", md5Bytes) + w.Header().Set("ETag", "\""+partETag+"\"") + glog.V(3).Infof("GetObject: Override ETag with part %d ETag: %s (fallback from chunk)", partNumber, partETag) + } + } + } + + // CRITICAL: Set Range header to read only this part's bytes (matches filer logic) + // This ensures we stream only the specific part, not the entire object + rangeHeader := fmt.Sprintf("bytes=%d-%d", startOffset, endOffset) + r.Header.Set("Range", rangeHeader) + glog.V(3).Infof("GetObject: Set Range header for part %d: %s", partNumber, rangeHeader) } } - s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { - // Restore the original Range header for SSE processing - if sseObject && originalRangeHeader != "" { - r.Header.Set("Range", originalRangeHeader) + // NEW OPTIMIZATION: Stream directly from volume servers, bypassing filer proxy + // This eliminates the 19ms filer proxy overhead + // SSE decryption is handled inline during streaming + + // Safety check: entry must be valid before streaming + if objectEntryForSSE == nil { + glog.Errorf("GetObjectHandler: objectEntryForSSE is nil for %s/%s (should not happen)", bucket, object) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + // Detect SSE encryption type + primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE) + + // Stream directly from volume servers with SSE support + tStream := time.Now() + err = s3a.streamFromVolumeServersWithSSE(w, r, objectEntryForSSE, primarySSEType) + streamTime = time.Since(tStream) + if err != nil { + glog.Errorf("GetObjectHandler: failed to stream from volume servers: %v", err) + // Try to write error response. The HTTP library will gracefully handle cases + // where headers are already sent (e.g., during streaming errors). + // For early validation errors, this ensures client gets proper error response. + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } +} + +// streamFromVolumeServers streams object data directly from volume servers, bypassing filer proxy +// This eliminates the ~19ms filer proxy overhead by reading chunks directly +func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error { + // Profiling: Track overall and stage timings + t0 := time.Now() + var ( + rangeParseTime time.Duration + headerSetTime time.Duration + chunkResolveTime time.Duration + streamPrepTime time.Duration + streamExecTime time.Duration + ) + defer func() { + totalTime := time.Since(t0) + glog.V(2).Infof(" └─ streamFromVolumeServers: total=%v, rangeParse=%v, headerSet=%v, chunkResolve=%v, streamPrep=%v, streamExec=%v", + totalTime, rangeParseTime, headerSetTime, chunkResolveTime, streamPrepTime, streamExecTime) + }() + + if entry == nil { + // Early validation error: write proper HTTP response + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Internal Server Error: entry is nil") + return fmt.Errorf("entry is nil") + } + + // Get file size + totalSize := int64(filer.FileSize(entry)) + + // Parse Range header if present + tRangeParse := time.Now() + var offset int64 = 0 + var size int64 = totalSize + rangeHeader := r.Header.Get("Range") + isRangeRequest := false + + if rangeHeader != "" && strings.HasPrefix(rangeHeader, "bytes=") { + rangeSpec := rangeHeader[6:] + parts := strings.Split(rangeSpec, "-") + if len(parts) == 2 { + var startOffset, endOffset int64 + + // Handle different Range formats: + // 1. "bytes=0-499" - first 500 bytes (parts[0]="0", parts[1]="499") + // 2. "bytes=500-" - from byte 500 to end (parts[0]="500", parts[1]="") + // 3. "bytes=-500" - last 500 bytes (parts[0]="", parts[1]="500") + + if parts[0] == "" && parts[1] != "" { + // Suffix range: bytes=-N (last N bytes) + if suffixLen, err := strconv.ParseInt(parts[1], 10, 64); err == nil { + // RFC 7233: suffix range on empty object or zero-length suffix is unsatisfiable + if totalSize == 0 || suffixLen <= 0 { + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + return fmt.Errorf("invalid suffix range for empty object") + } + if suffixLen > totalSize { + suffixLen = totalSize + } + startOffset = totalSize - suffixLen + endOffset = totalSize - 1 + } else { + // Set header BEFORE WriteHeader + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + return fmt.Errorf("invalid suffix range") + } + } else { + // Regular range or open-ended range + startOffset = 0 + endOffset = totalSize - 1 + + if parts[0] != "" { + if parsed, err := strconv.ParseInt(parts[0], 10, 64); err == nil { + startOffset = parsed + } + } + if parts[1] != "" { + if parsed, err := strconv.ParseInt(parts[1], 10, 64); err == nil { + endOffset = parsed + } + } + + // Validate range + if startOffset < 0 || startOffset >= totalSize { + // Set header BEFORE WriteHeader + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + return fmt.Errorf("invalid range start") + } + + if endOffset >= totalSize { + endOffset = totalSize - 1 + } + + if endOffset < startOffset { + // Set header BEFORE WriteHeader + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + return fmt.Errorf("invalid range: end before start") + } + } + + offset = startOffset + size = endOffset - startOffset + 1 + isRangeRequest = true } + } + rangeParseTime = time.Since(tRangeParse) + + // For small files stored inline in entry.Content - validate BEFORE setting headers + if len(entry.Content) > 0 && totalSize == int64(len(entry.Content)) { + if isRangeRequest { + // Safely convert int64 to int for slice indexing - validate BEFORE WriteHeader + // Use MaxInt32 for portability across 32-bit and 64-bit platforms + if offset < 0 || offset > int64(math.MaxInt32) || size < 0 || size > int64(math.MaxInt32) { + // Early validation error: write proper HTTP response BEFORE headers + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + fmt.Fprintf(w, "Range too large for platform: offset=%d, size=%d", offset, size) + return fmt.Errorf("range too large for platform: offset=%d, size=%d", offset, size) + } + start := int(offset) + end := start + int(size) + // Bounds check (should already be validated, but double-check) - BEFORE WriteHeader + if start < 0 || start > len(entry.Content) || end > len(entry.Content) || end < start { + // Early validation error: write proper HTTP response BEFORE headers + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + fmt.Fprintf(w, "Invalid range for inline content: start=%d, end=%d, len=%d)", start, end, len(entry.Content)) + return fmt.Errorf("invalid range for inline content: start=%d, end=%d, len=%d", start, end, len(entry.Content)) + } + // Validation passed - now set headers and write + s3a.setResponseHeaders(w, entry, totalSize) + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize)) + w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) + w.WriteHeader(http.StatusPartialContent) + _, err := w.Write(entry.Content[start:end]) + return err + } + // Non-range request for inline content + s3a.setResponseHeaders(w, entry, totalSize) + w.WriteHeader(http.StatusOK) + _, err := w.Write(entry.Content) + return err + } + + // Get chunks and validate BEFORE setting headers + chunks := entry.GetChunks() + glog.V(4).Infof("streamFromVolumeServers: entry has %d chunks, totalSize=%d, isRange=%v, offset=%d, size=%d", + len(chunks), totalSize, isRangeRequest, offset, size) + + if len(chunks) == 0 { + // BUG FIX: If totalSize > 0 but no chunks and no content, this is a data integrity issue + if totalSize > 0 && len(entry.Content) == 0 { + glog.Errorf("streamFromVolumeServers: Data integrity error - entry reports size %d but has no content or chunks", totalSize) + // IMPORTANT: Write error status before returning, since headers haven't been written yet + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Internal Server Error: data integrity issue (size %d reported but no content available)", totalSize) + return fmt.Errorf("data integrity error: size %d reported but no content available", totalSize) + } + // Empty object - set headers and write status + s3a.setResponseHeaders(w, entry, totalSize) + w.WriteHeader(http.StatusOK) + return nil + } - // Add SSE metadata headers based on object metadata before SSE processing - if objectEntryForSSE != nil { - s3a.addSSEHeadersToResponse(proxyResponse, objectEntryForSSE) + // Log chunk details (verbose only - high frequency) + if glog.V(4) { + for i, chunk := range chunks { + glog.Infof(" GET Chunk[%d]: fid=%s, offset=%d, size=%d", i, chunk.GetFileIdString(), chunk.Offset, chunk.Size) } + } - // Handle SSE decryption (both SSE-C and SSE-KMS) if needed - return s3a.handleSSEResponse(r, proxyResponse, w, objectEntryForSSE) - }) -} + // CRITICAL: Resolve chunks and prepare stream BEFORE WriteHeader + // This ensures we can write proper error responses if these operations fail + ctx := r.Context() + lookupFileIdFn := s3a.createLookupFileIdFunction() -func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { + // Resolve chunk manifests with the requested range + tChunkResolve := time.Now() + resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, offset, offset+size) + chunkResolveTime = time.Since(tChunkResolve) + if err != nil { + glog.Errorf("streamFromVolumeServers: failed to resolve chunks: %v", err) + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Internal Server Error: failed to resolve chunks") + return fmt.Errorf("failed to resolve chunks: %v", err) + } + + // Prepare streaming function with simple master client wrapper + tStreamPrep := time.Now() + masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn} + streamFn, err := filer.PrepareStreamContentWithThrottler( + ctx, + masterClient, + func(fileId string) string { + // Use volume server JWT (not filer JWT) for direct volume reads + return string(security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, fileId)) + }, + resolvedChunks, + offset, + size, + 0, // no throttling + ) + streamPrepTime = time.Since(tStreamPrep) + if err != nil { + glog.Errorf("streamFromVolumeServers: failed to prepare stream: %v", err) + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Internal Server Error: failed to prepare stream") + return fmt.Errorf("failed to prepare stream: %v", err) + } - bucket, object := s3_constants.GetBucketAndObject(r) - glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object) + // All validation and preparation successful - NOW set headers and write status + tHeaderSet := time.Now() + s3a.setResponseHeaders(w, entry, totalSize) - // Handle directory objects with shared logic - if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "HeadObjectHandler") { - return // Directory object request was handled + // Override/add range-specific headers if this is a range request + if isRangeRequest { + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize)) + w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) } + headerSetTime = time.Since(tHeaderSet) - // Check conditional headers and handle early return if conditions fail - result, handled := s3a.processConditionalHeaders(w, r, bucket, object, "HeadObjectHandler") - if handled { - return + // Now write status code (headers are all set, stream is ready) + if isRangeRequest { + w.WriteHeader(http.StatusPartialContent) + } else { + w.WriteHeader(http.StatusOK) } - entryFromConditional := result.Entry - if s3a.rejectDirectoryObjectWithoutSlash(w, r, bucket, object, "HeadObjectHandler", &entryFromConditional) { - return + // Stream directly to response + tStreamExec := time.Now() + glog.V(4).Infof("streamFromVolumeServers: starting streamFn, offset=%d, size=%d", offset, size) + err = streamFn(w) + streamExecTime = time.Since(tStreamExec) + if err != nil { + glog.Errorf("streamFromVolumeServers: streamFn failed: %v", err) + } else { + glog.V(4).Infof("streamFromVolumeServers: streamFn completed successfully") } - if entryFromConditional != result.Entry { - result.Entry = entryFromConditional + return err +} + +// Shared HTTP client for volume server requests (connection pooling) +var volumeServerHTTPClient = &http.Client{ + Timeout: 5 * time.Minute, + Transport: &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 90 * time.Second, + }, +} + +// createLookupFileIdFunction creates a reusable lookup function for resolving volume URLs +func (s3a *S3ApiServer) createLookupFileIdFunction() func(context.Context, string) ([]string, error) { + return func(ctx context.Context, fileId string) ([]string, error) { + var urls []string + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + vid := filer.VolumeId(fileId) + resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + if locs, found := resp.LocationsMap[vid]; found { + for _, loc := range locs.Locations { + // Build complete URL with volume server address and fileId + // Format: http://host:port/volumeId,fileId + urls = append(urls, "http://"+loc.Url+"/"+fileId) + } + } + return nil + }) + glog.V(3).Infof("createLookupFileIdFunction: fileId=%s, resolved urls=%v", fileId, urls) + return urls, err } +} - // Check for specific version ID in query parameters - versionId := r.URL.Query().Get("versionId") +// streamFromVolumeServersWithSSE handles streaming with inline SSE decryption +func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error { + // If not encrypted, use fast path without decryption + if sseType == "" || sseType == "None" { + return s3a.streamFromVolumeServers(w, r, entry, sseType) + } + // Profiling: Track SSE decryption stages + t0 := time.Now() var ( - destUrl string - entry *filer_pb.Entry // Declare entry at function scope for SSE processing - versioningConfigured bool - err error + rangeParseTime time.Duration + keyValidateTime time.Duration + headerSetTime time.Duration + streamFetchTime time.Duration + decryptSetupTime time.Duration + copyTime time.Duration ) + defer func() { + totalTime := time.Since(t0) + glog.V(2).Infof(" └─ streamFromVolumeServersWithSSE (%s): total=%v, rangeParse=%v, keyValidate=%v, headerSet=%v, streamFetch=%v, decryptSetup=%v, copy=%v", + sseType, totalTime, rangeParseTime, keyValidateTime, headerSetTime, streamFetchTime, decryptSetupTime, copyTime) + }() + + glog.V(2).Infof("streamFromVolumeServersWithSSE: Handling %s encrypted object with inline decryption", sseType) + + // Parse Range header BEFORE key validation + totalSize := int64(filer.FileSize(entry)) + tRangeParse := time.Now() + var offset int64 = 0 + var size int64 = totalSize + rangeHeader := r.Header.Get("Range") + isRangeRequest := false + + if rangeHeader != "" && strings.HasPrefix(rangeHeader, "bytes=") { + rangeSpec := rangeHeader[6:] + parts := strings.Split(rangeSpec, "-") + if len(parts) == 2 { + var startOffset, endOffset int64 + + if parts[0] == "" && parts[1] != "" { + // Suffix range: bytes=-N (last N bytes) + if suffixLen, err := strconv.ParseInt(parts[1], 10, 64); err == nil { + // RFC 7233: suffix range on empty object or zero-length suffix is unsatisfiable + if totalSize == 0 || suffixLen <= 0 { + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + return fmt.Errorf("invalid suffix range for empty object") + } + if suffixLen > totalSize { + suffixLen = totalSize + } + startOffset = totalSize - suffixLen + endOffset = totalSize - 1 + } else { + // Set header BEFORE WriteHeader + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + return fmt.Errorf("invalid suffix range") + } + } else { + // Regular range or open-ended range + startOffset = 0 + endOffset = totalSize - 1 - // Check if versioning is configured for the bucket (Enabled or Suspended) - // Note: We need to check this even if versionId is empty, because versioned buckets - // handle even "get latest version" requests differently (through .versions directory) - versioningConfigured, err = s3a.isVersioningConfigured(bucket) - if err != nil { - if err == filer_pb.ErrNotFound { - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) - return + if parts[0] != "" { + if parsed, err := strconv.ParseInt(parts[0], 10, 64); err == nil { + startOffset = parsed + } + } + if parts[1] != "" { + if parsed, err := strconv.ParseInt(parts[1], 10, 64); err == nil { + endOffset = parsed + } + } + + // Validate range + if startOffset < 0 || startOffset >= totalSize { + // Set header BEFORE WriteHeader + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + return fmt.Errorf("invalid range start") + } + + if endOffset >= totalSize { + endOffset = totalSize - 1 + } + + if endOffset < startOffset { + // Set header BEFORE WriteHeader + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + return fmt.Errorf("invalid range: end before start") + } + } + + offset = startOffset + size = endOffset - startOffset + 1 + isRangeRequest = true + glog.V(2).Infof("streamFromVolumeServersWithSSE: Range request bytes %d-%d/%d (size=%d)", startOffset, endOffset, totalSize, size) } - glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return } + rangeParseTime = time.Since(tRangeParse) - if versioningConfigured { - // Handle versioned HEAD - all versions are stored in .versions directory - var targetVersionId string + // Validate SSE keys BEFORE streaming + tKeyValidate := time.Now() + var decryptionKey interface{} + switch sseType { + case s3_constants.SSETypeC: + customerKey, err := ParseSSECHeaders(r) + if err != nil { + s3err.WriteErrorResponse(w, r, MapSSECErrorToS3Error(err)) + return err + } + if customerKey == nil { + s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) + return fmt.Errorf("SSE-C key required") + } + // Validate key MD5 + if entry.Extended != nil { + storedKeyMD5 := string(entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]) + if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 { + s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) + return fmt.Errorf("SSE-C key mismatch") + } + } + decryptionKey = customerKey + case s3_constants.SSETypeKMS: + // Extract KMS key from metadata (stored as raw bytes, matching filer behavior) + if entry.Extended == nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return fmt.Errorf("no SSE-KMS metadata") + } + kmsMetadataBytes := entry.Extended[s3_constants.SeaweedFSSSEKMSKey] + sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return err + } + decryptionKey = sseKMSKey + case s3_constants.SSETypeS3: + // Extract S3 key from metadata (stored as raw bytes, matching filer behavior) + if entry.Extended == nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return fmt.Errorf("no SSE-S3 metadata") + } + keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key] + keyManager := GetSSES3KeyManager() + sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return err + } + decryptionKey = sseS3Key + } + keyValidateTime = time.Since(tKeyValidate) - if versionId != "" { - // Request for specific version - glog.V(2).Infof("HeadObject: requesting specific version %s for %s%s", versionId, bucket, object) - entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) - if err != nil { - glog.Errorf("Failed to get specific version %s: %v", versionId, err) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return + // Set response headers + // IMPORTANT: Set ALL headers BEFORE calling WriteHeader (headers are ignored after WriteHeader) + tHeaderSet := time.Now() + s3a.setResponseHeaders(w, entry, totalSize) + s3a.addSSEResponseHeadersFromEntry(w, r, entry, sseType) + + // Override/add range-specific headers if this is a range request + if isRangeRequest { + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize)) + w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) + } + headerSetTime = time.Since(tHeaderSet) + + // Now write status code (headers are all set) + if isRangeRequest { + w.WriteHeader(http.StatusPartialContent) + } + + // Full Range Optimization: Use ViewFromChunks to only fetch/decrypt needed chunks + tDecryptSetup := time.Now() + + // Use range-aware chunk resolution (like filer does) + if isRangeRequest { + glog.V(2).Infof("Using range-aware SSE decryption for offset=%d size=%d", offset, size) + streamFetchTime = 0 // No full stream fetch in range-aware path + err := s3a.streamDecryptedRangeFromChunks(r.Context(), w, entry, offset, size, sseType, decryptionKey) + decryptSetupTime = time.Since(tDecryptSetup) + copyTime = decryptSetupTime // Streaming is included in decrypt setup for range-aware path + return err + } + + // Full object path: Optimize multipart vs single-part + var decryptedReader io.Reader + var err error + + switch sseType { + case s3_constants.SSETypeC: + customerKey := decryptionKey.(*SSECustomerKey) + + // Check if this is a multipart object (multiple chunks with SSE-C metadata) + isMultipartSSEC := false + ssecChunks := 0 + for _, chunk := range entry.GetChunks() { + if chunk.GetSseType() == filer_pb.SSEType_SSE_C && len(chunk.GetSseMetadata()) > 0 { + ssecChunks++ + } + } + isMultipartSSEC = ssecChunks > 1 + glog.V(3).Infof("SSE-C decryption: KeyMD5=%s, entry has %d chunks, isMultipart=%v, ssecChunks=%d", + customerKey.KeyMD5, len(entry.GetChunks()), isMultipartSSEC, ssecChunks) + + if isMultipartSSEC { + // For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly + // This saves one filer lookup/pipe creation + decryptedReader, err = s3a.createMultipartSSECDecryptedReaderDirect(r.Context(), nil, customerKey, entry) + glog.V(2).Infof("Using multipart SSE-C decryption for object with %d chunks (no prefetch)", len(entry.GetChunks())) + } else { + // For single-part, get encrypted stream and decrypt + tStreamFetch := time.Now() + encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) + streamFetchTime = time.Since(tStreamFetch) + if streamErr != nil { + return streamErr + } + defer encryptedReader.Close() + + iv := entry.Extended[s3_constants.SeaweedFSSSEIV] + if len(iv) == 0 { + return fmt.Errorf("SSE-C IV not found in entry metadata") + } + glog.V(2).Infof("SSE-C decryption: IV length=%d, KeyMD5=%s", len(iv), customerKey.KeyMD5) + decryptedReader, err = CreateSSECDecryptedReader(encryptedReader, customerKey, iv) + } + + case s3_constants.SSETypeKMS: + sseKMSKey := decryptionKey.(*SSEKMSKey) + + // Check if this is a multipart object (multiple chunks with SSE-KMS metadata) + isMultipartSSEKMS := false + ssekmsChunks := 0 + for _, chunk := range entry.GetChunks() { + if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 { + ssekmsChunks++ + } + } + isMultipartSSEKMS = ssekmsChunks > 1 + glog.V(3).Infof("SSE-KMS decryption: isMultipart=%v, ssekmsChunks=%d", isMultipartSSEKMS, ssekmsChunks) + + if isMultipartSSEKMS { + // For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly + decryptedReader, err = s3a.createMultipartSSEKMSDecryptedReaderDirect(r.Context(), nil, entry) + glog.V(2).Infof("Using multipart SSE-KMS decryption for object with %d chunks (no prefetch)", len(entry.GetChunks())) + } else { + // For single-part, get encrypted stream and decrypt + tStreamFetch := time.Now() + encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) + streamFetchTime = time.Since(tStreamFetch) + if streamErr != nil { + return streamErr + } + defer encryptedReader.Close() + + glog.V(2).Infof("SSE-KMS decryption: KeyID=%s, IV length=%d", sseKMSKey.KeyID, len(sseKMSKey.IV)) + decryptedReader, err = CreateSSEKMSDecryptedReader(encryptedReader, sseKMSKey) + } + + case s3_constants.SSETypeS3: + sseS3Key := decryptionKey.(*SSES3Key) + + // Check if this is a multipart object (multiple chunks with SSE-S3 metadata) + isMultipartSSES3 := false + sses3Chunks := 0 + for _, chunk := range entry.GetChunks() { + if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(chunk.GetSseMetadata()) > 0 { + sses3Chunks++ + } + } + isMultipartSSES3 = sses3Chunks > 1 + glog.V(3).Infof("SSE-S3 decryption: isMultipart=%v, sses3Chunks=%d", isMultipartSSES3, sses3Chunks) + + if isMultipartSSES3 { + // For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly + decryptedReader, err = s3a.createMultipartSSES3DecryptedReaderDirect(r.Context(), nil, entry) + glog.V(2).Infof("Using multipart SSE-S3 decryption for object with %d chunks (no prefetch)", len(entry.GetChunks())) + } else { + // For single-part, get encrypted stream and decrypt + tStreamFetch := time.Now() + encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) + streamFetchTime = time.Since(tStreamFetch) + if streamErr != nil { + return streamErr + } + defer encryptedReader.Close() + + keyManager := GetSSES3KeyManager() + iv, ivErr := GetSSES3IV(entry, sseS3Key, keyManager) + if ivErr != nil { + return fmt.Errorf("failed to get SSE-S3 IV: %w", ivErr) + } + glog.V(2).Infof("SSE-S3 decryption: KeyID=%s, IV length=%d", sseS3Key.KeyID, len(iv)) + decryptedReader, err = CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv) + } + } + decryptSetupTime = time.Since(tDecryptSetup) + + if err != nil { + glog.Errorf("SSE decryption error (%s): %v", sseType, err) + return fmt.Errorf("failed to create decrypted reader: %w", err) + } + + // Close the decrypted reader to avoid leaking HTTP bodies + if closer, ok := decryptedReader.(io.Closer); ok { + defer func() { + if closeErr := closer.Close(); closeErr != nil { + glog.V(3).Infof("Error closing decrypted reader: %v", closeErr) + } + }() + } + + // Stream full decrypted object to client + tCopy := time.Now() + buf := make([]byte, 128*1024) + copied, copyErr := io.CopyBuffer(w, decryptedReader, buf) + copyTime = time.Since(tCopy) + if copyErr != nil { + glog.Errorf("Failed to copy full object: copied %d bytes: %v", copied, copyErr) + return copyErr + } + glog.V(3).Infof("Full object request: copied %d bytes", copied) + return nil +} + +// streamDecryptedRangeFromChunks streams a range of decrypted data by only fetching needed chunks +// This implements the filer's ViewFromChunks approach for optimal range performance +func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io.Writer, entry *filer_pb.Entry, offset int64, size int64, sseType string, decryptionKey interface{}) error { + // Use filer's ViewFromChunks to resolve only needed chunks for the range + lookupFileIdFn := s3a.createLookupFileIdFunction() + chunkViews := filer.ViewFromChunks(ctx, lookupFileIdFn, entry.GetChunks(), offset, size) + + totalWritten := int64(0) + targetOffset := offset + + // Stream each chunk view + for x := chunkViews.Front(); x != nil; x = x.Next { + chunkView := x.Value + + // Handle gaps between chunks (write zeros) + if targetOffset < chunkView.ViewOffset { + gap := chunkView.ViewOffset - targetOffset + glog.V(4).Infof("Writing %d zero bytes for gap [%d,%d)", gap, targetOffset, chunkView.ViewOffset) + if err := writeZeroBytes(w, gap); err != nil { + return fmt.Errorf("failed to write zero padding: %w", err) + } + totalWritten += gap + targetOffset = chunkView.ViewOffset + } + + // Find the corresponding FileChunk for this chunkView + var fileChunk *filer_pb.FileChunk + for _, chunk := range entry.GetChunks() { + if chunk.GetFileIdString() == chunkView.FileId { + fileChunk = chunk + break + } + } + if fileChunk == nil { + return fmt.Errorf("chunk %s not found in entry", chunkView.FileId) + } + + // Fetch and decrypt this chunk view + var decryptedChunkReader io.Reader + var err error + + switch sseType { + case s3_constants.SSETypeC: + decryptedChunkReader, err = s3a.decryptSSECChunkView(ctx, fileChunk, chunkView, decryptionKey.(*SSECustomerKey)) + case s3_constants.SSETypeKMS: + decryptedChunkReader, err = s3a.decryptSSEKMSChunkView(ctx, fileChunk, chunkView) + case s3_constants.SSETypeS3: + decryptedChunkReader, err = s3a.decryptSSES3ChunkView(ctx, fileChunk, chunkView, entry) + default: + // Non-encrypted chunk + decryptedChunkReader, err = s3a.fetchChunkViewData(ctx, chunkView) + } + + if err != nil { + return fmt.Errorf("failed to decrypt chunk view %s: %w", chunkView.FileId, err) + } + + // Copy the decrypted chunk data + written, copyErr := io.Copy(w, decryptedChunkReader) + if closer, ok := decryptedChunkReader.(io.Closer); ok { + closeErr := closer.Close() + if closeErr != nil { + glog.Warningf("streamDecryptedRangeFromChunks: failed to close decrypted chunk reader: %v", closeErr) + } + } + if copyErr != nil { + glog.Errorf("streamDecryptedRangeFromChunks: copy error after writing %d bytes (expected %d): %v", written, chunkView.ViewSize, copyErr) + return fmt.Errorf("failed to copy decrypted chunk data: %w", copyErr) + } + + if written != int64(chunkView.ViewSize) { + glog.Errorf("streamDecryptedRangeFromChunks: size mismatch - wrote %d bytes but expected %d", written, chunkView.ViewSize) + return fmt.Errorf("size mismatch: wrote %d bytes but expected %d for chunk %s", written, chunkView.ViewSize, chunkView.FileId) + } + + totalWritten += written + targetOffset += written + glog.V(2).Infof("streamDecryptedRangeFromChunks: Wrote %d bytes from chunk %s [%d,%d), totalWritten=%d, targetSize=%d", written, chunkView.FileId, chunkView.ViewOffset, chunkView.ViewOffset+int64(chunkView.ViewSize), totalWritten, size) + } + + // Handle trailing zeros if needed + remaining := size - totalWritten + if remaining > 0 { + glog.V(4).Infof("Writing %d trailing zero bytes", remaining) + if err := writeZeroBytes(w, remaining); err != nil { + return fmt.Errorf("failed to write trailing zeros: %w", err) + } + } + + glog.V(3).Infof("Completed range-aware SSE decryption: wrote %d bytes for range [%d,%d)", totalWritten, offset, offset+size) + return nil +} + +// writeZeroBytes writes n zero bytes to writer using the package-level zero buffer +func writeZeroBytes(w io.Writer, n int64) error { + for n > 0 { + toWrite := min(n, int64(len(zeroBuf))) + written, err := w.Write(zeroBuf[:toWrite]) + if err != nil { + return err + } + n -= int64(written) + } + return nil +} + +// decryptSSECChunkView decrypts a specific chunk view with SSE-C +func (s3a *S3ApiServer) decryptSSECChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView, customerKey *SSECustomerKey) (io.Reader, error) { + // For multipart SSE-C, each chunk has its own IV in chunk.SseMetadata + if fileChunk.GetSseType() == filer_pb.SSEType_SSE_C && len(fileChunk.GetSseMetadata()) > 0 { + ssecMetadata, err := DeserializeSSECMetadata(fileChunk.GetSseMetadata()) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-C metadata: %w", err) + } + chunkIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV) + if err != nil { + return nil, fmt.Errorf("failed to decode IV: %w", err) + } + + // Fetch FULL encrypted chunk + // Note: Fetching full chunk is necessary for proper CTR decryption stream + fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId) + if err != nil { + return nil, fmt.Errorf("failed to fetch full chunk: %w", err) + } + + // Calculate IV using PartOffset + // PartOffset is the position of this chunk within its part's encrypted stream + var adjustedIV []byte + var ivSkip int + if ssecMetadata.PartOffset > 0 { + adjustedIV, ivSkip = calculateIVWithOffset(chunkIV, ssecMetadata.PartOffset) + } else { + adjustedIV = chunkIV + ivSkip = 0 + } + + // Decrypt the full chunk + decryptedReader, decryptErr := CreateSSECDecryptedReader(fullChunkReader, customerKey, adjustedIV) + if decryptErr != nil { + fullChunkReader.Close() + return nil, fmt.Errorf("failed to create decrypted reader: %w", decryptErr) + } + + // CRITICAL: Skip intra-block bytes from CTR decryption (non-block-aligned offset handling) + if ivSkip > 0 { + _, err = io.CopyN(io.Discard, decryptedReader, int64(ivSkip)) + if err != nil { + if closer, ok := decryptedReader.(io.Closer); ok { + closer.Close() + } + return nil, fmt.Errorf("failed to skip intra-block bytes (%d): %w", ivSkip, err) + } + } + + // Skip to the position we need in the decrypted stream + if chunkView.OffsetInChunk > 0 { + _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk) + if err != nil { + if closer, ok := decryptedReader.(io.Closer); ok { + closer.Close() + } + return nil, fmt.Errorf("failed to skip to offset %d: %w", chunkView.OffsetInChunk, err) + } + } + + // Return a reader that only reads ViewSize bytes with proper cleanup + limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) + return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil + } + + // Single-part SSE-C: use object-level IV (should not hit this in range path, but handle it) + encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView) + if err != nil { + return nil, err + } + // For single-part, the IV is stored at object level, already handled in non-range path + return encryptedReader, nil +} + +// decryptSSEKMSChunkView decrypts a specific chunk view with SSE-KMS +func (s3a *S3ApiServer) decryptSSEKMSChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView) (io.Reader, error) { + if fileChunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(fileChunk.GetSseMetadata()) > 0 { + sseKMSKey, err := DeserializeSSEKMSMetadata(fileChunk.GetSseMetadata()) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err) + } + + // Fetch FULL encrypted chunk + fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId) + if err != nil { + return nil, fmt.Errorf("failed to fetch full chunk: %w", err) + } + + // Calculate IV using ChunkOffset (same as PartOffset in SSE-C) + var adjustedIV []byte + var ivSkip int + if sseKMSKey.ChunkOffset > 0 { + adjustedIV, ivSkip = calculateIVWithOffset(sseKMSKey.IV, sseKMSKey.ChunkOffset) + } else { + adjustedIV = sseKMSKey.IV + ivSkip = 0 + } + + adjustedKey := &SSEKMSKey{ + KeyID: sseKMSKey.KeyID, + EncryptedDataKey: sseKMSKey.EncryptedDataKey, + EncryptionContext: sseKMSKey.EncryptionContext, + BucketKeyEnabled: sseKMSKey.BucketKeyEnabled, + IV: adjustedIV, + ChunkOffset: sseKMSKey.ChunkOffset, + } + + decryptedReader, decryptErr := CreateSSEKMSDecryptedReader(fullChunkReader, adjustedKey) + if decryptErr != nil { + fullChunkReader.Close() + return nil, fmt.Errorf("failed to create KMS decrypted reader: %w", decryptErr) + } + + // CRITICAL: Skip intra-block bytes from CTR decryption (non-block-aligned offset handling) + if ivSkip > 0 { + _, err = io.CopyN(io.Discard, decryptedReader, int64(ivSkip)) + if err != nil { + if closer, ok := decryptedReader.(io.Closer); ok { + closer.Close() + } + return nil, fmt.Errorf("failed to skip intra-block bytes (%d): %w", ivSkip, err) + } + } + + // Skip to position and limit to ViewSize + if chunkView.OffsetInChunk > 0 { + _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk) + if err != nil { + if closer, ok := decryptedReader.(io.Closer); ok { + closer.Close() + } + return nil, fmt.Errorf("failed to skip to offset: %w", err) + } + } + + limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) + return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil + } + + // Non-KMS encrypted chunk + return s3a.fetchChunkViewData(ctx, chunkView) +} + +// decryptSSES3ChunkView decrypts a specific chunk view with SSE-S3 +func (s3a *S3ApiServer) decryptSSES3ChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView, entry *filer_pb.Entry) (io.Reader, error) { + // For multipart SSE-S3, each chunk has its own IV in chunk.SseMetadata + if fileChunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(fileChunk.GetSseMetadata()) > 0 { + keyManager := GetSSES3KeyManager() + + // Deserialize per-chunk SSE-S3 metadata to get chunk-specific IV + chunkSSES3Metadata, err := DeserializeSSES3Metadata(fileChunk.GetSseMetadata(), keyManager) + if err != nil { + return nil, fmt.Errorf("failed to deserialize chunk SSE-S3 metadata: %w", err) + } + + // Fetch FULL encrypted chunk (necessary for proper CTR decryption stream) + fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId) + if err != nil { + return nil, fmt.Errorf("failed to fetch full chunk: %w", err) + } + + // Use the chunk's IV directly (already adjusted for block offset during encryption) + // Note: SSE-S3 encryption flow: + // 1. Upload: CreateSSES3EncryptedReaderWithBaseIV(reader, key, baseIV, partOffset) + // calls calculateIVWithOffset(baseIV, partOffset) → (blockAlignedIV, skip) + // The blockAlignedIV is stored in chunk metadata + // 2. Download: We decrypt the FULL chunk from offset 0 using that blockAlignedIV + // Then skip to chunkView.OffsetInChunk in the PLAINTEXT (not ciphertext) + // This differs from SSE-C which stores base IV + PartOffset and calculates IV during decryption + // No ivSkip needed here because we're decrypting from chunk start (offset 0) + iv := chunkSSES3Metadata.IV + + glog.V(4).Infof("Decrypting multipart SSE-S3 chunk %s with chunk-specific IV length=%d", + chunkView.FileId, len(iv)) + + // Decrypt the full chunk starting from offset 0 + decryptedReader, decryptErr := CreateSSES3DecryptedReader(fullChunkReader, chunkSSES3Metadata, iv) + if decryptErr != nil { + fullChunkReader.Close() + return nil, fmt.Errorf("failed to create SSE-S3 decrypted reader: %w", decryptErr) + } + + // Skip to position within the decrypted chunk (plaintext offset, not ciphertext offset) + if chunkView.OffsetInChunk > 0 { + _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk) + if err != nil { + if closer, ok := decryptedReader.(io.Closer); ok { + closer.Close() + } + return nil, fmt.Errorf("failed to skip to offset %d: %w", chunkView.OffsetInChunk, err) + } + } + + limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) + return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil + } + + // Single-part SSE-S3: use object-level IV and key (fallback path) + keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key] + keyManager := GetSSES3KeyManager() + sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err) + } + + // Fetch FULL encrypted chunk + fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId) + if err != nil { + return nil, fmt.Errorf("failed to fetch full chunk: %w", err) + } + + // Get base IV for single-part object + iv, err := GetSSES3IV(entry, sseS3Key, keyManager) + if err != nil { + fullChunkReader.Close() + return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err) + } + + glog.V(4).Infof("Decrypting single-part SSE-S3 chunk %s with entry-level IV length=%d", + chunkView.FileId, len(iv)) + + decryptedReader, decryptErr := CreateSSES3DecryptedReader(fullChunkReader, sseS3Key, iv) + if decryptErr != nil { + fullChunkReader.Close() + return nil, fmt.Errorf("failed to create S3 decrypted reader: %w", decryptErr) + } + + // Skip to position and limit to ViewSize + if chunkView.OffsetInChunk > 0 { + _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk) + if err != nil { + if closer, ok := decryptedReader.(io.Closer); ok { + closer.Close() + } + return nil, fmt.Errorf("failed to skip to offset: %w", err) + } + } + + limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) + return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil +} + +// fetchFullChunk fetches the complete encrypted chunk from volume server +func (s3a *S3ApiServer) fetchFullChunk(ctx context.Context, fileId string) (io.ReadCloser, error) { + // Lookup the volume server URLs for this chunk + lookupFileIdFn := s3a.createLookupFileIdFunction() + urlStrings, err := lookupFileIdFn(ctx, fileId) + if err != nil || len(urlStrings) == 0 { + return nil, fmt.Errorf("failed to lookup chunk %s: %w", fileId, err) + } + + // Use the first URL + chunkUrl := urlStrings[0] + + // Generate JWT for volume server authentication + jwt := security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, fileId) + + // Create request WITHOUT Range header to get full chunk + req, err := http.NewRequestWithContext(ctx, "GET", chunkUrl, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + // Set JWT for authentication + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } + + // Use shared HTTP client + resp, err := volumeServerHTTPClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to fetch chunk: %w", err) + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return nil, fmt.Errorf("unexpected status code %d for chunk %s", resp.StatusCode, fileId) + } + + return resp.Body, nil +} + +// fetchChunkViewData fetches encrypted data for a chunk view (with range) +func (s3a *S3ApiServer) fetchChunkViewData(ctx context.Context, chunkView *filer.ChunkView) (io.ReadCloser, error) { + // Lookup the volume server URLs for this chunk + lookupFileIdFn := s3a.createLookupFileIdFunction() + urlStrings, err := lookupFileIdFn(ctx, chunkView.FileId) + if err != nil || len(urlStrings) == 0 { + return nil, fmt.Errorf("failed to lookup chunk %s: %w", chunkView.FileId, err) + } + + // Use the first URL (already contains complete URL with fileId) + chunkUrl := urlStrings[0] + + // Generate JWT for volume server authentication + jwt := security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, chunkView.FileId) + + // Create request with Range header for the chunk view + // chunkUrl already contains the complete URL including fileId + req, err := http.NewRequestWithContext(ctx, "GET", chunkUrl, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + // Set Range header to fetch only the needed portion of the chunk + if !chunkView.IsFullChunk() { + rangeEnd := chunkView.OffsetInChunk + int64(chunkView.ViewSize) - 1 + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", chunkView.OffsetInChunk, rangeEnd)) + } + + // Set JWT for authentication + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } + + // Use shared HTTP client with connection pooling + resp, err := volumeServerHTTPClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to fetch chunk: %w", err) + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { + resp.Body.Close() + return nil, fmt.Errorf("unexpected status code %d for chunk %s", resp.StatusCode, chunkView.FileId) + } + + return resp.Body, nil +} + +// getEncryptedStreamFromVolumes gets raw encrypted data stream from volume servers +func (s3a *S3ApiServer) getEncryptedStreamFromVolumes(ctx context.Context, entry *filer_pb.Entry) (io.ReadCloser, error) { + // Handle inline content + if len(entry.Content) > 0 { + return io.NopCloser(bytes.NewReader(entry.Content)), nil + } + + // Handle empty files + chunks := entry.GetChunks() + if len(chunks) == 0 { + return io.NopCloser(bytes.NewReader([]byte{})), nil + } + + // Reuse shared lookup function to keep volume lookup logic in one place + lookupFileIdFn := s3a.createLookupFileIdFunction() + + // Resolve chunks + totalSize := int64(filer.FileSize(entry)) + resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, 0, totalSize) + if err != nil { + return nil, err + } + + // Create streaming reader + masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn} + streamFn, err := filer.PrepareStreamContentWithThrottler( + ctx, + masterClient, + func(fileId string) string { + // Use volume server JWT (not filer JWT) for direct volume reads + return string(security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, fileId)) + }, + resolvedChunks, + 0, + totalSize, + 0, + ) + if err != nil { + return nil, err + } + + // Create a pipe to get io.ReadCloser + pipeReader, pipeWriter := io.Pipe() + go func() { + defer pipeWriter.Close() + if err := streamFn(pipeWriter); err != nil { + glog.Errorf("getEncryptedStreamFromVolumes: streaming error: %v", err) + pipeWriter.CloseWithError(err) + } + }() + + return pipeReader, nil +} + +// addSSEResponseHeadersFromEntry adds appropriate SSE response headers based on entry metadata +func (s3a *S3ApiServer) addSSEResponseHeadersFromEntry(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) { + if entry == nil || entry.Extended == nil { + return + } + + switch sseType { + case s3_constants.SSETypeC: + // SSE-C: Echo back algorithm and key MD5 + if algo, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; exists { + w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, string(algo)) + } + if keyMD5, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; exists { + w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, string(keyMD5)) + } + + case s3_constants.SSETypeKMS: + // SSE-KMS: Return algorithm and key ID + w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms") + if kmsMetadataBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists { + sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) + if err == nil { + AddSSEKMSResponseHeaders(w, sseKMSKey) + } + } + + case s3_constants.SSETypeS3: + // SSE-S3: Return algorithm + w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) + } +} + +// pipeWriterWrapper wraps io.PipeWriter to implement http.ResponseWriter interface +type pipeWriterWrapper struct { + *io.PipeWriter +} + +func (pw *pipeWriterWrapper) Header() http.Header { + // Headers are already set on the real ResponseWriter, ignore here + return make(http.Header) +} + +func (pw *pipeWriterWrapper) WriteHeader(statusCode int) { + // Status is already set on the real ResponseWriter, ignore here +} + +// createSSECDecryptedReaderFromEntry creates an SSE-C decrypted reader from entry metadata +func (s3a *S3ApiServer) createSSECDecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) { + // Parse SSE-C headers from request + customerKey, err := ParseSSECHeaders(r) + if err != nil { + return nil, fmt.Errorf("failed to parse SSE-C headers: %w", err) + } + + if customerKey == nil { + return nil, fmt.Errorf("SSE-C key required but not provided") + } + + // Validate key MD5 from entry metadata + if entry.Extended != nil { + storedKeyMD5 := string(entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]) + if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 { + return nil, fmt.Errorf("SSE-C key mismatch") + } + } + + // Get IV from entry metadata (stored as raw bytes, matching filer behavior) + iv := entry.Extended[s3_constants.SeaweedFSSSEIV] + if len(iv) == 0 { + return nil, fmt.Errorf("SSE-C IV not found in metadata") + } + + // Create decrypted reader + return CreateSSECDecryptedReader(encryptedReader, customerKey, iv) +} + +// createSSEKMSDecryptedReaderFromEntry creates an SSE-KMS decrypted reader from entry metadata +func (s3a *S3ApiServer) createSSEKMSDecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) { + // Extract SSE-KMS metadata from entry (stored as raw bytes, matching filer behavior) + if entry.Extended == nil { + return nil, fmt.Errorf("no extended metadata found") + } + + kmsMetadataBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey] + if !exists { + return nil, fmt.Errorf("SSE-KMS metadata not found") + } + + sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err) + } + + // Create decrypted reader + return CreateSSEKMSDecryptedReader(encryptedReader, sseKMSKey) +} + +// createSSES3DecryptedReaderFromEntry creates an SSE-S3 decrypted reader from entry metadata +func (s3a *S3ApiServer) createSSES3DecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) { + // Extract SSE-S3 metadata from entry (stored as raw bytes, matching filer behavior) + if entry.Extended == nil { + return nil, fmt.Errorf("no extended metadata found") + } + + keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key] + if !exists { + return nil, fmt.Errorf("SSE-S3 metadata not found") + } + + keyManager := GetSSES3KeyManager() + sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err) + } + + // Get IV + iv, err := GetSSES3IV(entry, sseS3Key, keyManager) + if err != nil { + return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err) + } + + // Create decrypted reader + return CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv) +} + +// setResponseHeaders sets all standard HTTP response headers from entry metadata +func (s3a *S3ApiServer) setResponseHeaders(w http.ResponseWriter, entry *filer_pb.Entry, totalSize int64) { + // Safety check: entry must be valid + if entry == nil { + glog.Errorf("setResponseHeaders: entry is nil") + return + } + + // Set content length and accept ranges + w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) + w.Header().Set("Accept-Ranges", "bytes") + + // Set ETag (but don't overwrite if already set, e.g., for part-specific GET requests) + if w.Header().Get("ETag") == "" { + etag := filer.ETag(entry) + if etag != "" { + w.Header().Set("ETag", "\""+etag+"\"") + } + } + + // Set Last-Modified in RFC1123 format + if entry.Attributes != nil { + modTime := time.Unix(entry.Attributes.Mtime, 0).UTC() + w.Header().Set("Last-Modified", modTime.Format(http.TimeFormat)) + } + + // Set Content-Type + mimeType := "" + if entry.Attributes != nil && entry.Attributes.Mime != "" { + mimeType = entry.Attributes.Mime + } + if mimeType == "" { + // Try to detect from entry name + if entry.Name != "" { + ext := filepath.Ext(entry.Name) + if ext != "" { + mimeType = mime.TypeByExtension(ext) + } + } + } + if mimeType != "" { + w.Header().Set("Content-Type", mimeType) + } else { + w.Header().Set("Content-Type", "application/octet-stream") + } + + // Set custom headers from entry.Extended (user metadata) + // Use direct map assignment to preserve original header casing (matches proxy behavior) + if entry.Extended != nil { + for k, v := range entry.Extended { + // Skip internal SeaweedFS headers + if !strings.HasPrefix(k, "xattr-") && !s3_constants.IsSeaweedFSInternalHeader(k) { + // Support backward compatibility: migrate old non-canonical format to canonical format + // OLD: "x-amz-meta-foo" → NEW: "X-Amz-Meta-foo" (preserving suffix case) + headerKey := k + if len(k) >= 11 && strings.EqualFold(k[:11], "x-amz-meta-") { + // Normalize to AWS S3 format: "X-Amz-Meta-" prefix with lowercase suffix + // AWS S3 returns user metadata with the suffix in lowercase + suffix := k[len("x-amz-meta-"):] + headerKey = s3_constants.AmzUserMetaPrefix + strings.ToLower(suffix) + if glog.V(4) && k != headerKey { + glog.Infof("Normalizing user metadata header %q to %q in response", k, headerKey) + } + } + w.Header()[headerKey] = []string{string(v)} + } + } + } + + // Set tag count header (matches filer logic) + if entry.Extended != nil { + tagCount := 0 + for k := range entry.Extended { + if strings.HasPrefix(k, s3_constants.AmzObjectTagging+"-") { + tagCount++ + } + } + if tagCount > 0 { + w.Header().Set(s3_constants.AmzTagCount, strconv.Itoa(tagCount)) + } + } +} + +// simpleMasterClient implements the minimal interface for streaming +type simpleMasterClient struct { + lookupFn func(ctx context.Context, fileId string) ([]string, error) +} + +func (s *simpleMasterClient) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType { + return s.lookupFn +} + +func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { + + bucket, object := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object) + + // Handle directory objects with shared logic + if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "HeadObjectHandler") { + return // Directory object request was handled + } + + // Check conditional headers and handle early return if conditions fail + result, handled := s3a.processConditionalHeaders(w, r, bucket, object, "HeadObjectHandler") + if handled { + return + } + + // Check for specific version ID in query parameters + versionId := r.URL.Query().Get("versionId") + + var ( + entry *filer_pb.Entry // Declare entry at function scope for SSE processing + versioningConfigured bool + err error + ) + + // Check if versioning is configured for the bucket (Enabled or Suspended) + // Note: We need to check this even if versionId is empty, because versioned buckets + // handle even "get latest version" requests differently (through .versions directory) + versioningConfigured, err = s3a.isVersioningConfigured(bucket) + if err != nil { + if err == filer_pb.ErrNotFound { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) + return + } + glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + if versioningConfigured { + // Handle versioned HEAD - all versions are stored in .versions directory + var targetVersionId string + + if versionId != "" { + // Request for specific version + glog.V(2).Infof("HeadObject: requesting specific version %s for %s%s", versionId, bucket, object) + entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) + if err != nil { + glog.Errorf("Failed to get specific version %s: %v", versionId, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + // Safety check: entry must be valid after successful retrieval + if entry == nil { + glog.Errorf("HeadObject: getSpecificObjectVersion returned nil entry without error for version %s", versionId) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + targetVersionId = versionId + } else { + // Request for latest version - OPTIMIZATION: + // Check if .versions/ directory exists quickly (no retries) to decide path + // - If .versions/ exists: real versions available, use getLatestObjectVersion + // - If .versions/ doesn't exist (ErrNotFound): only null version at regular path, use it directly + // - If transient error: fall back to getLatestObjectVersion which has retry logic + bucketDir := s3a.option.BucketsPath + "/" + bucket + normalizedObject := removeDuplicateSlashes(object) + versionsDir := normalizedObject + s3_constants.VersionsFolder + + // Quick check (no retries) for .versions/ directory + versionsEntry, versionsErr := s3a.getEntry(bucketDir, versionsDir) + + if versionsErr == nil && versionsEntry != nil { + // .versions/ exists, meaning real versions are stored there + // Use getLatestObjectVersion which will properly find the newest version + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } else if errors.Is(versionsErr, filer_pb.ErrNotFound) { + // .versions/ doesn't exist (confirmed not found), check regular path for null version + regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject) + if regularErr == nil && regularEntry != nil { + // Found object at regular path - this is the null version + entry = regularEntry + targetVersionId = "null" + } else { + // No object at regular path either - object doesn't exist + glog.Errorf("HeadObject: object not found at regular path or .versions for %s%s", bucket, object) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } else { + // Transient error checking .versions/, fall back to getLatestObjectVersion with retries + glog.V(2).Infof("HeadObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr) + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } } - targetVersionId = versionId - } else { - // Request for latest version - glog.V(2).Infof("HeadObject: requesting latest version for %s%s", bucket, object) - entry, err = s3a.getLatestObjectVersion(bucket, object) - if err != nil { - glog.Errorf("Failed to get latest version: %v", err) + // Safety check: entry must be valid after successful retrieval + if entry == nil { + glog.Errorf("HeadObject: entry is nil after versioned lookup for %s%s", bucket, object) s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) return } - if entry.Extended != nil { - if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { - targetVersionId = string(versionIdBytes) - } - } - // If no version ID found in entry, this is a pre-versioning object + // Extract version ID if not already set if targetVersionId == "" { - targetVersionId = "null" + if entry.Extended != nil { + if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { + targetVersionId = string(versionIdBytes) + } + } + // If no version ID found in entry, this is a pre-versioning object + if targetVersionId == "" { + targetVersionId = "null" + } } } @@ -567,16 +2031,11 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request } } - // Determine the actual file path based on whether this is a versioned or pre-versioning object + // For versioned objects, log the target version if targetVersionId == "null" { - // Pre-versioning object - stored as regular file - destUrl = s3a.toFilerUrl(bucket, object) - glog.V(2).Infof("HeadObject: pre-versioning object URL: %s", destUrl) + glog.V(2).Infof("HeadObject: pre-versioning object %s/%s", bucket, object) } else { - // Versioned object - stored in .versions directory - versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId) - destUrl = s3a.toFilerUrl(bucket, versionObjectPath) - glog.V(2).Infof("HeadObject: version %s URL: %s", targetVersionId, destUrl) + glog.V(2).Infof("HeadObject: version %s for %s/%s", targetVersionId, bucket, object) } // Set version ID in response header @@ -584,9 +2043,6 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request // Add object lock metadata to response headers if present s3a.addObjectLockHeadersToResponse(w, entry) - } else { - // Handle regular HEAD (non-versioned) - destUrl = s3a.toFilerUrl(bucket, object) } // Fetch the correct entry for SSE processing (respects versionId) @@ -594,6 +2050,12 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request var objectEntryForSSE *filer_pb.Entry if versioningConfigured { objectEntryForSSE = entry + // Safety check - this should never happen as versioned path handles errors above + if objectEntryForSSE == nil { + glog.Errorf("HeadObjectHandler: unexpected nil entry for versioned object %s/%s", bucket, object) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } } else { // For non-versioned objects, try to reuse entry from conditional header check if result.Entry != nil { @@ -608,7 +2070,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request var fetchErr error objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object) if fetchErr != nil { - glog.Errorf("HeadObjectHandler: failed to get entry for SSE check: %v", fetchErr) + glog.Warningf("HeadObjectHandler: failed to get entry for %s/%s: %v", bucket, object, fetchErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } @@ -620,122 +2082,97 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request } } - s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { - // Handle SSE validation (both SSE-C and SSE-KMS) for HEAD requests - return s3a.handleSSEResponse(r, proxyResponse, w, objectEntryForSSE) - }) -} - -func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, isWrite bool, responseFn func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64)) { - - glog.V(3).Infof("s3 proxying %s to %s", r.Method, destUrl) - start := time.Now() - - proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body) - - if err != nil { - glog.Errorf("NewRequest %s: %v", destUrl, err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } - - proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) - proxyReq.Header.Set("Accept-Encoding", "identity") - for k, v := range r.URL.Query() { - if _, ok := s3_constants.PassThroughHeaders[strings.ToLower(k)]; ok { - proxyReq.Header[k] = v - } - if k == "partNumber" { - proxyReq.Header[s3_constants.SeaweedFSPartNumber] = v - } - } - for header, values := range r.Header { - proxyReq.Header[header] = values - } - if proxyReq.ContentLength == 0 && r.ContentLength != 0 { - proxyReq.ContentLength = r.ContentLength - } - - // ensure that the Authorization header is overriding any previous - // Authorization header which might be already present in proxyReq - s3a.maybeAddFilerJwtAuthorization(proxyReq, isWrite) - resp, postErr := s3a.client.Do(proxyReq) - - if postErr != nil { - glog.Errorf("post to filer: %v", postErr) + // Safety check: entry must be valid + if objectEntryForSSE == nil { + glog.Errorf("HeadObjectHandler: objectEntryForSSE is nil for %s/%s (should not happen)", bucket, object) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - defer util_http.CloseResponse(resp) - - if resp.StatusCode == http.StatusPreconditionFailed { - s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed) - return - } - if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable { - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) - return - } + // For HEAD requests, we already have all metadata - just set headers directly + totalSize := int64(filer.FileSize(objectEntryForSSE)) + s3a.setResponseHeaders(w, objectEntryForSSE, totalSize) - if r.Method == http.MethodDelete { - if resp.StatusCode == http.StatusNotFound { - // this is normal - responseStatusCode, _ := responseFn(resp, w) - s3err.PostLog(r, responseStatusCode, s3err.ErrNone) - return - } - } - if resp.StatusCode == http.StatusNotFound { - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return + // Check if PartNumber query parameter is present (for multipart objects) + // This logic matches the filer handler for consistency + partNumberStr := r.URL.Query().Get("partNumber") + if partNumberStr == "" { + partNumberStr = r.URL.Query().Get("PartNumber") } - TimeToFirstByte(r.Method, start, r) - if resp.Header.Get(s3_constants.SeaweedFSIsDirectoryKey) == "true" { - responseStatusCode, _ := responseFn(resp, w) - s3err.PostLog(r, responseStatusCode, s3err.ErrNone) - return - } + // If PartNumber is specified, set headers (matching filer logic) + if partNumberStr != "" { + if partNumber, parseErr := strconv.Atoi(partNumberStr); parseErr == nil && partNumber > 0 { + // Get actual parts count from metadata (not chunk count) + partsCount, partInfo := s3a.getMultipartInfo(objectEntryForSSE, partNumber) - if resp.StatusCode == http.StatusInternalServerError { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } + // Validate part number + if partNumber > partsCount { + glog.Warningf("HeadObject: Invalid part number %d, object has %d parts", partNumber, partsCount) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) + return + } - // when HEAD a directory, it should be reported as no such key - // https://github.com/seaweedfs/seaweedfs/issues/3457 - if resp.ContentLength == -1 && resp.StatusCode != http.StatusNotModified { - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } + // Set parts count header + w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(partsCount)) + glog.V(3).Infof("HeadObject: Set PartsCount=%d for part %d", partsCount, partNumber) - if resp.StatusCode == http.StatusBadRequest { - resp_body, _ := io.ReadAll(resp.Body) - switch string(resp_body) { - case "InvalidPart": - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) - default: - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest) + // Override ETag with the part's ETag + if partInfo != nil { + // Use part ETag from metadata (accurate for multi-chunk parts) + w.Header().Set("ETag", "\""+partInfo.ETag+"\"") + glog.V(3).Infof("HeadObject: Override ETag with part %d ETag: %s (from metadata)", partNumber, partInfo.ETag) + } else { + // Fallback: use chunk's ETag (backward compatibility) + chunkIndex := partNumber - 1 + if chunkIndex >= len(objectEntryForSSE.Chunks) { + glog.Warningf("HeadObject: Part %d chunk index %d out of range (chunks: %d)", partNumber, chunkIndex, len(objectEntryForSSE.Chunks)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) + return + } + partChunk := objectEntryForSSE.Chunks[chunkIndex] + if partChunk.ETag != "" { + if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil { + partETag := fmt.Sprintf("%x", md5Bytes) + w.Header().Set("ETag", "\""+partETag+"\"") + glog.V(3).Infof("HeadObject: Override ETag with part %d ETag: %s (fallback from chunk)", partNumber, partETag) + } + } + } } - resp.Body.Close() - return } - setUserMetadataKeyToLowercase(resp) - - responseStatusCode, bytesTransferred := responseFn(resp, w) - BucketTrafficSent(bytesTransferred, r) - - s3err.PostLog(r, responseStatusCode, s3err.ErrNone) -} -func setUserMetadataKeyToLowercase(resp *http.Response) { - for key, value := range resp.Header { - if strings.HasPrefix(key, s3_constants.AmzUserMetaPrefix) { - resp.Header[strings.ToLower(key)] = value - delete(resp.Header, key) + // Detect and handle SSE + glog.V(3).Infof("HeadObjectHandler: Retrieved entry for %s%s - %d chunks", bucket, object, len(objectEntryForSSE.Chunks)) + sseType := s3a.detectPrimarySSEType(objectEntryForSSE) + glog.V(2).Infof("HeadObjectHandler: Detected SSE type: %s", sseType) + if sseType != "" && sseType != "None" { + // Validate SSE headers for encrypted objects + switch sseType { + case s3_constants.SSETypeC: + customerKey, err := ParseSSECHeaders(r) + if err != nil { + s3err.WriteErrorResponse(w, r, MapSSECErrorToS3Error(err)) + return + } + if customerKey == nil { + s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) + return + } + // Validate key MD5 + if objectEntryForSSE.Extended != nil { + storedKeyMD5 := string(objectEntryForSSE.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]) + if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 { + s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) + return + } + } } + // Add SSE response headers + s3a.addSSEResponseHeadersFromEntry(w, r, objectEntryForSSE, sseType) } + + w.WriteHeader(http.StatusOK) } func captureCORSHeaders(w http.ResponseWriter, headersToCapture []string) map[string]string { @@ -1315,6 +2752,11 @@ func (s3a *S3ApiServer) addSSEHeadersToResponse(proxyResponse *http.Response, en // detectPrimarySSEType determines the primary SSE type by examining chunk metadata func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string { + // Safety check: handle nil entry + if entry == nil { + return "None" + } + if len(entry.GetChunks()) == 0 { // No chunks - check object-level metadata only (single objects or smallContent) hasSSEC := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] != nil @@ -1395,9 +2837,243 @@ func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string { return "None" } +// createMultipartSSECDecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-C objects (direct volume path) +// Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O. +// It's kept in the signature for API consistency with non-Direct versions. +func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, customerKey *SSECustomerKey, entry *filer_pb.Entry) (io.Reader, error) { + // Sort chunks by offset to ensure correct order + chunks := entry.GetChunks() + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].GetOffset() < chunks[j].GetOffset() + }) + + // Create readers for each chunk, decrypting them independently + var readers []io.Reader + + for _, chunk := range chunks { + // Get this chunk's encrypted data + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) + if err != nil { + return nil, fmt.Errorf("failed to create chunk reader: %v", err) + } + + // Handle based on chunk's encryption type + if chunk.GetSseType() == filer_pb.SSEType_SSE_C { + // Check if this chunk has per-chunk SSE-C metadata + if len(chunk.GetSseMetadata()) == 0 { + chunkReader.Close() + return nil, fmt.Errorf("SSE-C chunk %s missing per-chunk metadata", chunk.GetFileIdString()) + } + + // Deserialize the SSE-C metadata + ssecMetadata, err := DeserializeSSECMetadata(chunk.GetSseMetadata()) + if err != nil { + chunkReader.Close() + return nil, fmt.Errorf("failed to deserialize SSE-C metadata for chunk %s: %v", chunk.GetFileIdString(), err) + } + + // Decode the IV from the metadata + chunkIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV) + if err != nil { + chunkReader.Close() + return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), err) + } + + glog.V(4).Infof("Decrypting SSE-C chunk %s with IV=%x, PartOffset=%d", + chunk.GetFileIdString(), chunkIV[:8], ssecMetadata.PartOffset) + + // Note: SSE-C multipart behavior (differs from SSE-KMS/SSE-S3): + // - Upload: CreateSSECEncryptedReader generates RANDOM IV per part (no base IV + offset) + // - Metadata: PartOffset is stored but not used during encryption + // - Decryption: Use stored random IV directly (no offset adjustment needed) + // + // This differs from: + // - SSE-KMS/SSE-S3: Use base IV + calculateIVWithOffset(partOffset) during encryption + // - CopyObject: Applies calculateIVWithOffset to SSE-C (which may be incorrect) + // + // TODO: Investigate CopyObject SSE-C PartOffset handling for consistency + decryptedChunkReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV) + if decErr != nil { + chunkReader.Close() + return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) + } + + // Use the streaming decrypted reader directly + readers = append(readers, struct { + io.Reader + io.Closer + }{ + Reader: decryptedChunkReader, + Closer: chunkReader, + }) + glog.V(4).Infof("Added streaming decrypted reader for SSE-C chunk %s", chunk.GetFileIdString()) + } else { + // Non-SSE-C chunk, use as-is + readers = append(readers, chunkReader) + glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString()) + } + } + + // Close the original encrypted stream since we're reading chunks individually + if encryptedStream != nil { + encryptedStream.Close() + } + + return NewMultipartSSEReader(readers), nil +} + +// createMultipartSSEKMSDecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-KMS objects (direct volume path) +// Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O. +// It's kept in the signature for API consistency with non-Direct versions. +func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) { + // Sort chunks by offset to ensure correct order + chunks := entry.GetChunks() + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].GetOffset() < chunks[j].GetOffset() + }) + + // Create readers for each chunk, decrypting them independently + var readers []io.Reader + + for _, chunk := range chunks { + // Get this chunk's encrypted data + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) + if err != nil { + return nil, fmt.Errorf("failed to create chunk reader: %v", err) + } + + // Handle based on chunk's encryption type + if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS { + // Check if this chunk has per-chunk SSE-KMS metadata + if len(chunk.GetSseMetadata()) == 0 { + chunkReader.Close() + return nil, fmt.Errorf("SSE-KMS chunk %s missing per-chunk metadata", chunk.GetFileIdString()) + } + + // Use the per-chunk SSE-KMS metadata + kmsKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata()) + if err != nil { + chunkReader.Close() + return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err) + } + + glog.V(4).Infof("Decrypting SSE-KMS chunk %s with KeyID=%s", + chunk.GetFileIdString(), kmsKey.KeyID) + + // Create decrypted reader for this chunk + decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, kmsKey) + if decErr != nil { + chunkReader.Close() + return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) + } + + // Use the streaming decrypted reader directly + readers = append(readers, struct { + io.Reader + io.Closer + }{ + Reader: decryptedChunkReader, + Closer: chunkReader, + }) + glog.V(4).Infof("Added streaming decrypted reader for SSE-KMS chunk %s", chunk.GetFileIdString()) + } else { + // Non-SSE-KMS chunk, use as-is + readers = append(readers, chunkReader) + glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString()) + } + } + + // Close the original encrypted stream since we're reading chunks individually + if encryptedStream != nil { + encryptedStream.Close() + } + + return NewMultipartSSEReader(readers), nil +} + +// createMultipartSSES3DecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-S3 objects (direct volume path) +// Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O. +// It's kept in the signature for API consistency with non-Direct versions. +func (s3a *S3ApiServer) createMultipartSSES3DecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) { + // Sort chunks by offset to ensure correct order + chunks := entry.GetChunks() + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].GetOffset() < chunks[j].GetOffset() + }) + + // Create readers for each chunk, decrypting them independently + var readers []io.Reader + + // Get key manager and SSE-S3 key from entry metadata + keyManager := GetSSES3KeyManager() + keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key] + sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-S3 key from entry metadata: %v", err) + } + + for _, chunk := range chunks { + // Get this chunk's encrypted data + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) + if err != nil { + return nil, fmt.Errorf("failed to create chunk reader: %v", err) + } + + // Handle based on chunk's encryption type + if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 { + // Check if this chunk has per-chunk SSE-S3 metadata + if len(chunk.GetSseMetadata()) == 0 { + chunkReader.Close() + return nil, fmt.Errorf("SSE-S3 chunk %s missing per-chunk metadata", chunk.GetFileIdString()) + } + + // Deserialize the per-chunk SSE-S3 metadata to get the IV + chunkSSES3Metadata, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager) + if err != nil { + chunkReader.Close() + return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata for chunk %s: %v", chunk.GetFileIdString(), err) + } + + // Use the IV from the chunk metadata + iv := chunkSSES3Metadata.IV + glog.V(4).Infof("Decrypting SSE-S3 chunk %s with KeyID=%s, IV length=%d", + chunk.GetFileIdString(), sseS3Key.KeyID, len(iv)) + + // Create decrypted reader for this chunk + decryptedChunkReader, decErr := CreateSSES3DecryptedReader(chunkReader, sseS3Key, iv) + if decErr != nil { + chunkReader.Close() + return nil, fmt.Errorf("failed to decrypt SSE-S3 chunk: %v", decErr) + } + + // Use the streaming decrypted reader directly + readers = append(readers, struct { + io.Reader + io.Closer + }{ + Reader: decryptedChunkReader, + Closer: chunkReader, + }) + glog.V(4).Infof("Added streaming decrypted reader for SSE-S3 chunk %s", chunk.GetFileIdString()) + } else { + // Non-SSE-S3 chunk, use as-is + readers = append(readers, chunkReader) + glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString()) + } + } + + // Close the original encrypted stream since we're reading chunks individually + if encryptedStream != nil { + encryptedStream.Close() + } + + return NewMultipartSSEReader(readers), nil +} + // createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) { // Entry is passed from caller to avoid redundant filer lookup + ctx := r.Context() // Sort chunks by offset to ensure correct order chunks := entry.GetChunks() @@ -1410,7 +3086,7 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, pr for _, chunk := range chunks { // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(chunk) + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) if err != nil { return nil, fmt.Errorf("failed to create chunk reader: %v", err) } @@ -1459,6 +3135,8 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, pr // createMultipartSSES3DecryptedReader creates a reader for multipart SSE-S3 objects func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, entry *filer_pb.Entry) (io.Reader, error) { + ctx := r.Context() + // Sort chunks by offset to ensure correct order chunks := entry.GetChunks() sort.Slice(chunks, func(i, j int) bool { @@ -1471,7 +3149,7 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, ent for _, chunk := range chunks { // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(chunk) + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) if err != nil { return nil, fmt.Errorf("failed to create chunk reader: %v", err) } @@ -1538,21 +3216,28 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, ent } // createEncryptedChunkReader creates a reader for a single encrypted chunk -func (s3a *S3ApiServer) createEncryptedChunkReader(chunk *filer_pb.FileChunk) (io.ReadCloser, error) { +// Context propagation ensures cancellation if the S3 client disconnects +func (s3a *S3ApiServer) createEncryptedChunkReader(ctx context.Context, chunk *filer_pb.FileChunk) (io.ReadCloser, error) { // Get chunk URL srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString()) if err != nil { return nil, fmt.Errorf("lookup volume URL for chunk %s: %v", chunk.GetFileIdString(), err) } - // Create HTTP request for chunk data - req, err := http.NewRequest("GET", srcUrl, nil) + // Create HTTP request with context for cancellation propagation + req, err := http.NewRequestWithContext(ctx, "GET", srcUrl, nil) if err != nil { return nil, fmt.Errorf("create HTTP request for chunk: %v", err) } - // Execute request - resp, err := http.DefaultClient.Do(req) + // Attach volume server JWT for authentication (matches filer behavior) + jwt := security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, chunk.GetFileIdString()) + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } + + // Use shared HTTP client with connection pooling + resp, err := volumeServerHTTPClient.Do(req) if err != nil { return nil, fmt.Errorf("execute HTTP request for chunk: %v", err) } @@ -1574,9 +3259,10 @@ type MultipartSSEReader struct { // SSERangeReader applies range logic to an underlying reader type SSERangeReader struct { reader io.Reader - offset int64 // bytes to skip from the beginning - remaining int64 // bytes remaining to read (-1 for unlimited) - skipped int64 // bytes already skipped + offset int64 // bytes to skip from the beginning + remaining int64 // bytes remaining to read (-1 for unlimited) + skipped int64 // bytes already skipped + skipBuf []byte // reusable buffer for skipping bytes (avoids per-call allocation) } // NewMultipartSSEReader creates a new multipart reader that can properly close all underlying readers @@ -1608,22 +3294,28 @@ func (m *MultipartSSEReader) Close() error { // Read implements the io.Reader interface for SSERangeReader func (r *SSERangeReader) Read(p []byte) (n int, err error) { - - // If we need to skip bytes and haven't skipped enough yet - if r.skipped < r.offset { + // Skip bytes iteratively (no recursion) until we reach the offset + for r.skipped < r.offset { skipNeeded := r.offset - r.skipped - skipBuf := make([]byte, min(int64(len(p)), skipNeeded)) - skipRead, skipErr := r.reader.Read(skipBuf) + + // Lazily allocate skip buffer on first use, reuse thereafter + if r.skipBuf == nil { + // Use a fixed 32KB buffer for skipping (avoids per-call allocation) + r.skipBuf = make([]byte, 32*1024) + } + + // Determine how much to skip in this iteration + bufSize := int64(len(r.skipBuf)) + if skipNeeded < bufSize { + bufSize = skipNeeded + } + + skipRead, skipErr := r.reader.Read(r.skipBuf[:bufSize]) r.skipped += int64(skipRead) if skipErr != nil { return 0, skipErr } - - // If we still need to skip more, recurse - if r.skipped < r.offset { - return r.Read(p) - } } // If we have a remaining limit and it's reached @@ -1649,6 +3341,8 @@ func (r *SSERangeReader) Read(p []byte) (n int, err error) { // createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects // Each chunk has its own IV and encryption key from the original multipart parts func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) { + ctx := r.Context() + // Parse SSE-C headers from the request for decryption key customerKey, err := ParseSSECHeaders(r) if err != nil { @@ -1708,7 +3402,7 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox for _, chunk := range neededChunks { // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(chunk) + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) if err != nil { return nil, fmt.Errorf("failed to create chunk reader: %v", err) } @@ -1728,16 +3422,10 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), ivErr) } - // Calculate the correct IV for this chunk using within-part offset - var chunkIV []byte - if ssecMetadata.PartOffset > 0 { - var skip int - chunkIV, skip = calculateIVWithOffset(iv, ssecMetadata.PartOffset) - // The caller handles range alignment, so we can safely ignore the skip value for SSE-C chunks. - _ = skip - } else { - chunkIV = iv - } + // Note: For multipart SSE-C, each part was encrypted with offset=0 + // So we use the stored IV directly without offset adjustment + // PartOffset is stored for informational purposes, but encryption uses offset=0 + chunkIV := iv decryptedReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV) if decErr != nil { @@ -1777,3 +3465,55 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox return multiReader, nil } + +// PartBoundaryInfo holds information about a part's chunk boundaries +type PartBoundaryInfo struct { + PartNumber int `json:"part"` + StartChunk int `json:"start"` + EndChunk int `json:"end"` // exclusive + ETag string `json:"etag"` +} + +// rc is a helper type that wraps a Reader and Closer for proper resource cleanup +type rc struct { + io.Reader + io.Closer +} + +// getMultipartInfo retrieves multipart metadata for a given part number +// Returns: (partsCount, partInfo) +// - partsCount: total number of parts in the multipart object +// - partInfo: boundary information for the requested part (nil if not found or not a multipart object) +func (s3a *S3ApiServer) getMultipartInfo(entry *filer_pb.Entry, partNumber int) (int, *PartBoundaryInfo) { + if entry == nil { + return 0, nil + } + if entry.Extended == nil { + // Not a multipart object or no metadata + return len(entry.GetChunks()), nil + } + + // Try to get parts count from metadata + partsCount := len(entry.GetChunks()) // default fallback + if partsCountBytes, exists := entry.Extended[s3_constants.SeaweedFSMultipartPartsCount]; exists { + if count, err := strconv.Atoi(string(partsCountBytes)); err == nil && count > 0 { + partsCount = count + } + } + + // Try to get part boundaries from metadata + if boundariesJSON, exists := entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries]; exists { + var boundaries []PartBoundaryInfo + if err := json.Unmarshal(boundariesJSON, &boundaries); err == nil { + // Find the requested part + for i := range boundaries { + if boundaries[i].PartNumber == partNumber { + return partsCount, &boundaries[i] + } + } + } + } + + // No part boundaries metadata or part not found + return partsCount, nil +} diff --git a/weed/s3api/s3api_object_handlers_multipart.go b/weed/s3api/s3api_object_handlers_multipart.go index d1ba8eea3..07d484d80 100644 --- a/weed/s3api/s3api_object_handlers_multipart.go +++ b/weed/s3api/s3api_object_handlers_multipart.go @@ -403,7 +403,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ glog.V(2).Infof("PutObjectPart: bucket=%s, object=%s, uploadId=%s, partNumber=%d, size=%d", bucket, object, uploadID, partID, r.ContentLength) - etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, partID) + etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, partID) if errCode != s3err.ErrNone { glog.Errorf("PutObjectPart: putToFiler failed with error code %v for bucket=%s, object=%s, partNumber=%d", errCode, bucket, object, partID) @@ -412,14 +412,12 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ } glog.V(2).Infof("PutObjectPart: SUCCESS - bucket=%s, object=%s, partNumber=%d, etag=%s, sseType=%s", - bucket, object, partID, etag, sseType) + bucket, object, partID, etag, sseMetadata.SSEType) setEtag(w, etag) // Set SSE response headers for multipart uploads - if sseType == s3_constants.SSETypeS3 { - w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) - } + s3a.setSSEResponseHeaders(w, r, sseMetadata) writeSuccessResponseEmpty(w, r) diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go index c392d98f0..ecb2ac8d1 100644 --- a/weed/s3api/s3api_object_handlers_postpolicy.go +++ b/weed/s3api/s3api_object_handlers_postpolicy.go @@ -136,7 +136,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R } } - etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, fileBody, "", bucket, 1) + etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, fileBody, bucket, 1) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) @@ -153,9 +153,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R setEtag(w, etag) // Include SSE response headers (important for bucket-default encryption) - if sseType == s3_constants.SSETypeS3 { - w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) - } + s3a.setSSEResponseHeaders(w, r, sseMetadata) // Decide what http response to send depending on success_action_status parameter switch successStatus { diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 6ce48429f..6181c34d6 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -1,6 +1,7 @@ package s3api import ( + "context" "crypto/md5" "encoding/base64" "encoding/json" @@ -8,18 +9,21 @@ import ( "fmt" "io" "net/http" + "net/url" + "path/filepath" "strconv" "strings" "time" "github.com/pquerna/cachecontrol/cacheobject" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/security" - weed_server "github.com/seaweedfs/seaweedfs/weed/server" stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util/constants" ) @@ -60,6 +64,13 @@ type BucketDefaultEncryptionResult struct { SSEKMSKey *SSEKMSKey } +// SSEResponseMetadata holds encryption metadata needed for HTTP response headers +type SSEResponseMetadata struct { + SSEType string + KMSKeyID string + BucketKeyEnabled bool +} + func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) { // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html @@ -135,7 +146,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) versioningEnabled := (versioningState == s3_constants.VersioningEnabled) versioningConfigured := (versioningState != "") - glog.V(2).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured) + glog.V(3).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured) // Validate object lock headers before processing if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil { @@ -158,29 +169,34 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) switch versioningState { case s3_constants.VersioningEnabled: // Handle enabled versioning - create new versions with real version IDs - glog.V(0).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object) - versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType) + glog.V(3).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object) + versionId, etag, errCode, sseMetadata := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType) if errCode != s3err.ErrNone { glog.Errorf("PutObjectHandler: putVersionedObject failed with errCode=%v for %s/%s", errCode, bucket, object) s3err.WriteErrorResponse(w, r, errCode) return } - glog.V(0).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object) + glog.V(3).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object) // Set version ID in response header if versionId != "" { w.Header().Set("x-amz-version-id", versionId) - glog.V(0).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object) + glog.V(3).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object) } else { glog.Errorf("PutObjectHandler: CRITICAL - versionId is EMPTY for versioned bucket %s, object %s", bucket, object) } // Set ETag in response setEtag(w, etag) + + // Set SSE response headers for versioned objects + s3a.setSSEResponseHeaders(w, r, sseMetadata) + case s3_constants.VersioningSuspended: // Handle suspended versioning - overwrite with "null" version ID but preserve existing versions - etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType) + glog.V(3).Infof("PutObjectHandler: SUSPENDED versioning detected for %s/%s, calling putSuspendedVersioningObject", bucket, object) + etag, errCode, sseMetadata := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return @@ -191,6 +207,9 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) // Set ETag in response setEtag(w, etag) + + // Set SSE response headers for suspended versioning + s3a.setSSEResponseHeaders(w, r, sseMetadata) default: // Handle regular PUT (never configured versioning) uploadUrl := s3a.toFilerUrl(bucket, object) @@ -198,7 +217,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) dataReader = mimeDetect(r, dataReader) } - etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, 1) + etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, 1) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) @@ -209,9 +228,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) setEtag(w, etag) // Set SSE response headers based on encryption type used - if sseType == s3_constants.SSETypeS3 { - w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) - } + s3a.setSSEResponseHeaders(w, r, sseMetadata) } } stats_collect.RecordBucketActiveTime(bucket) @@ -220,15 +237,18 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) writeSuccessResponseEmpty(w, r) } -func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseType string) { - // Calculate unique offset for each part to prevent IV reuse in multipart uploads - // This is critical for CTR mode encryption security - partOffset := calculatePartOffset(partNumber) +func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) { + // NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy + // This eliminates the filer proxy overhead for PUT operations - // Handle all SSE encryption types in a unified manner to eliminate repetitive dataReader assignments + // For SSE, encrypt with offset=0 for all parts + // Each part is encrypted independently, then decrypted using metadata during GET + partOffset := int64(0) + + // Handle all SSE encryption types in a unified manner sseResult, sseErrorCode := s3a.handleAllSSEEncryption(r, dataReader, partOffset) if sseErrorCode != s3err.ErrNone { - return "", sseErrorCode, "" + return "", sseErrorCode, SSEResponseMetadata{} } // Extract results from unified SSE handling @@ -249,7 +269,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader encryptionResult, applyErr := s3a.applyBucketDefaultEncryption(bucket, r, dataReader) if applyErr != nil { glog.Errorf("Failed to apply bucket default encryption: %v", applyErr) - return "", s3err.ErrInternalError, "" + return "", s3err.ErrInternalError, SSEResponseMetadata{} } // Update variables based on the result @@ -263,115 +283,289 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader sseS3Metadata, metaErr = SerializeSSES3Metadata(sseS3Key) if metaErr != nil { glog.Errorf("Failed to serialize SSE-S3 metadata for bucket default encryption: %v", metaErr) - return "", s3err.ErrInternalError, "" + return "", s3err.ErrInternalError, SSEResponseMetadata{} } } } else { glog.V(4).Infof("putToFiler: explicit encryption already applied, skipping bucket default encryption") } - hash := md5.New() - var body = io.TeeReader(dataReader, hash) - - proxyReq, err := http.NewRequest(http.MethodPut, uploadUrl, body) - - if err != nil { - glog.Errorf("NewRequest %s: %v", uploadUrl, err) - return "", s3err.ErrInternalError, "" + // Parse the upload URL to extract the file path + // uploadUrl format: http://filer:8888/path/to/bucket/object (or https://, IPv6, etc.) + // Use proper URL parsing instead of string manipulation for robustness + parsedUrl, parseErr := url.Parse(uploadUrl) + if parseErr != nil { + glog.Errorf("putToFiler: failed to parse uploadUrl %q: %v", uploadUrl, parseErr) + return "", s3err.ErrInternalError, SSEResponseMetadata{} } - proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) - if destination != "" { - proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination) - } + // Use parsedUrl.Path directly - it's already decoded by url.Parse() + // Per Go documentation: "Path is stored in decoded form: /%47%6f%2f becomes /Go/" + // Calling PathUnescape again would double-decode and fail on keys like "b%ar" + filePath := parsedUrl.Path + + // Step 1 & 2: Use auto-chunking to handle large files without OOM + // This splits large uploads into 8MB chunks, preventing memory issues on both S3 API and volume servers + const chunkSize = 8 * 1024 * 1024 // 8MB chunks (S3 standard) + const smallFileLimit = 256 * 1024 // 256KB - store inline in filer + collection := "" if s3a.option.FilerGroup != "" { - query := proxyReq.URL.Query() - query.Add("collection", s3a.getCollectionName(bucket)) - proxyReq.URL.RawQuery = query.Encode() + collection = s3a.getCollectionName(bucket) + } + + // Create assign function for chunked upload + assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) { + var assignResult *filer_pb.AssignVolumeResponse + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.AssignVolume(ctx, &filer_pb.AssignVolumeRequest{ + Count: int32(count), + Replication: "", + Collection: collection, + DiskType: "", + DataCenter: s3a.option.DataCenter, + Path: filePath, + }) + if err != nil { + return fmt.Errorf("assign volume: %w", err) + } + if resp.Error != "" { + return fmt.Errorf("assign volume: %v", resp.Error) + } + assignResult = resp + return nil + }) + if err != nil { + return nil, nil, err + } + + // Convert filer_pb.AssignVolumeResponse to operation.AssignResult + return nil, &operation.AssignResult{ + Fid: assignResult.FileId, + Url: assignResult.Location.Url, + PublicUrl: assignResult.Location.PublicUrl, + Count: uint64(count), + Auth: security.EncodedJwt(assignResult.Auth), + }, nil + } + + // Upload with auto-chunking + // Use context.Background() to ensure chunk uploads complete even if HTTP request is cancelled + // This prevents partial uploads and data corruption + chunkResult, err := operation.UploadReaderInChunks(context.Background(), dataReader, &operation.ChunkedUploadOption{ + ChunkSize: chunkSize, + SmallFileLimit: smallFileLimit, + Collection: collection, + DataCenter: s3a.option.DataCenter, + SaveSmallInline: false, // S3 API always creates chunks, never stores inline + MimeType: r.Header.Get("Content-Type"), + AssignFunc: assignFunc, + }) + if err != nil { + glog.Errorf("putToFiler: chunked upload failed: %v", err) + if strings.Contains(err.Error(), s3err.ErrMsgPayloadChecksumMismatch) { + return "", s3err.ErrInvalidDigest, SSEResponseMetadata{} + } + return "", s3err.ErrInternalError, SSEResponseMetadata{} } - for header, values := range r.Header { - for _, value := range values { - proxyReq.Header.Add(header, value) + // Step 3: Calculate MD5 hash and add SSE metadata to chunks + md5Sum := chunkResult.Md5Hash.Sum(nil) + + glog.V(4).Infof("putToFiler: Chunked upload SUCCESS - path=%s, chunks=%d, size=%d", + filePath, len(chunkResult.FileChunks), chunkResult.TotalSize) + + // Log chunk details for debugging (verbose only - high frequency) + if glog.V(4) { + for i, chunk := range chunkResult.FileChunks { + glog.Infof(" PUT Chunk[%d]: fid=%s, offset=%d, size=%d", i, chunk.GetFileIdString(), chunk.Offset, chunk.Size) } } - // Log version ID header for debugging - if versionIdHeader := proxyReq.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" { - glog.V(0).Infof("putToFiler: version ID header set: %s=%s for %s", s3_constants.ExtVersionIdKey, versionIdHeader, uploadUrl) + // Add SSE metadata to all chunks if present + if customerKey != nil { + // SSE-C: Create per-chunk metadata (matches filer logic) + for _, chunk := range chunkResult.FileChunks { + chunk.SseType = filer_pb.SSEType_SSE_C + if len(sseIV) > 0 { + // PartOffset tracks position within the encrypted stream + // Since ALL uploads (single-part and multipart parts) encrypt starting from offset 0, + // PartOffset = chunk.Offset represents where this chunk is in that encrypted stream + // - Single-part: chunk.Offset is position in the file's encrypted stream + // - Multipart: chunk.Offset is position in this part's encrypted stream + ssecMetadataStruct := struct { + Algorithm string `json:"algorithm"` + IV string `json:"iv"` + KeyMD5 string `json:"keyMD5"` + PartOffset int64 `json:"partOffset"` + }{ + Algorithm: "AES256", + IV: base64.StdEncoding.EncodeToString(sseIV), + KeyMD5: customerKey.KeyMD5, + PartOffset: chunk.Offset, // Position within the encrypted stream (always encrypted from 0) + } + if ssecMetadata, serErr := json.Marshal(ssecMetadataStruct); serErr == nil { + chunk.SseMetadata = ssecMetadata + } + } + } + } else if sseKMSKey != nil { + // SSE-KMS: Store serialized metadata in all chunks + for _, chunk := range chunkResult.FileChunks { + chunk.SseType = filer_pb.SSEType_SSE_KMS + chunk.SseMetadata = sseKMSMetadata + } + } else if sseS3Key != nil { + // SSE-S3: Store serialized metadata in all chunks + for _, chunk := range chunkResult.FileChunks { + chunk.SseType = filer_pb.SSEType_SSE_S3 + chunk.SseMetadata = sseS3Metadata + } } - // Set object owner header for filer to extract + // Step 4: Create metadata entry + now := time.Now() + mimeType := r.Header.Get("Content-Type") + if mimeType == "" { + mimeType = "application/octet-stream" + } + + // Create entry + entry := &filer_pb.Entry{ + Name: filepath.Base(filePath), + IsDirectory: false, + Attributes: &filer_pb.FuseAttributes{ + Crtime: now.Unix(), + Mtime: now.Unix(), + FileMode: 0660, + Uid: 0, + Gid: 0, + Mime: mimeType, + FileSize: uint64(chunkResult.TotalSize), + }, + Chunks: chunkResult.FileChunks, // All chunks from auto-chunking + Extended: make(map[string][]byte), + } + + // Set Md5 attribute based on context: + // 1. For multipart upload PARTS (stored in .uploads/ directory): ALWAYS set Md5 + // - Parts must use simple MD5 ETags, never composite format + // - Even if a part has multiple chunks internally, its ETag is MD5 of entire part + // 2. For regular object uploads: only set Md5 for single-chunk uploads + // - Multi-chunk regular objects use composite "md5-count" format + isMultipartPart := strings.Contains(uploadUrl, "/.uploads/") + if isMultipartPart || len(chunkResult.FileChunks) == 1 { + entry.Attributes.Md5 = md5Sum + } + + // Calculate ETag using the same logic as GET to ensure consistency + // For single chunk: uses entry.Attributes.Md5 + // For multiple chunks: uses filer.ETagChunks() which returns "-" + etag = filer.ETag(entry) + glog.V(4).Infof("putToFiler: Calculated ETag=%s for %d chunks", etag, len(chunkResult.FileChunks)) + + // Set object owner amzAccountId := r.Header.Get(s3_constants.AmzAccountId) if amzAccountId != "" { - proxyReq.Header.Set(s3_constants.ExtAmzOwnerKey, amzAccountId) - glog.V(2).Infof("putToFiler: setting owner header %s for object %s", amzAccountId, uploadUrl) + entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId) + glog.V(2).Infof("putToFiler: setting owner %s for object %s", amzAccountId, filePath) + } + + // Set version ID if present + if versionIdHeader := r.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" { + entry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionIdHeader) + glog.V(3).Infof("putToFiler: setting version ID %s for object %s", versionIdHeader, filePath) } - // Set SSE-C metadata headers for the filer if encryption was applied + // Set TTL-based S3 expiry + entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") + + // Copy user metadata and standard headers + for k, v := range r.Header { + if len(v) > 0 && len(v[0]) > 0 { + if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) { + // Go's HTTP server canonicalizes headers (e.g., x-amz-meta-foo → X-Amz-Meta-Foo) + // We store them as they come in (after canonicalization) to preserve the user's intent + entry.Extended[k] = []byte(v[0]) + } else if k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" { + entry.Extended[k] = []byte(v[0]) + } + if k == "Response-Content-Disposition" { + entry.Extended["Content-Disposition"] = []byte(v[0]) + } + } + } + + // Set SSE-C metadata if customerKey != nil && len(sseIV) > 0 { - proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, "AES256") - proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, customerKey.KeyMD5) - // Store IV in a custom header that the filer can use to store in entry metadata - proxyReq.Header.Set(s3_constants.SeaweedFSSSEIVHeader, base64.StdEncoding.EncodeToString(sseIV)) + // Store IV as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes) + entry.Extended[s3_constants.SeaweedFSSSEIV] = sseIV + entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256") + entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(customerKey.KeyMD5) + glog.V(3).Infof("putToFiler: storing SSE-C metadata - IV len=%d", len(sseIV)) } - // Set SSE-KMS metadata headers for the filer if KMS encryption was applied + // Set SSE-KMS metadata if sseKMSKey != nil { - // Use already-serialized SSE-KMS metadata from helper function - // Store serialized KMS metadata in a custom header that the filer can use - proxyReq.Header.Set(s3_constants.SeaweedFSSSEKMSKeyHeader, base64.StdEncoding.EncodeToString(sseKMSMetadata)) - - glog.V(3).Infof("putToFiler: storing SSE-KMS metadata for object %s with keyID %s", uploadUrl, sseKMSKey.KeyID) - } else { - glog.V(4).Infof("putToFiler: no SSE-KMS encryption detected") + // Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes) + entry.Extended[s3_constants.SeaweedFSSSEKMSKey] = sseKMSMetadata + // Set standard SSE headers for detection + entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("aws:kms") + entry.Extended[s3_constants.AmzServerSideEncryptionAwsKmsKeyId] = []byte(sseKMSKey.KeyID) + glog.V(3).Infof("putToFiler: storing SSE-KMS metadata - keyID=%s, raw len=%d", sseKMSKey.KeyID, len(sseKMSMetadata)) } - // Set SSE-S3 metadata headers for the filer if S3 encryption was applied + // Set SSE-S3 metadata if sseS3Key != nil && len(sseS3Metadata) > 0 { - // Store serialized S3 metadata in a custom header that the filer can use - proxyReq.Header.Set(s3_constants.SeaweedFSSSES3Key, base64.StdEncoding.EncodeToString(sseS3Metadata)) - glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", uploadUrl, sseS3Key.KeyID) - } - // Set TTL-based S3 expiry (modification time) - proxyReq.Header.Set(s3_constants.SeaweedFSExpiresS3, "true") - // ensure that the Authorization header is overriding any previous - // Authorization header which might be already present in proxyReq - s3a.maybeAddFilerJwtAuthorization(proxyReq, true) - resp, postErr := s3a.client.Do(proxyReq) - - if postErr != nil { - glog.Errorf("post to filer: %v", postErr) - if strings.Contains(postErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) { - return "", s3err.ErrInvalidDigest, "" + // Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes) + entry.Extended[s3_constants.SeaweedFSSSES3Key] = sseS3Metadata + // Set standard SSE header for detection + entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("AES256") + glog.V(3).Infof("putToFiler: storing SSE-S3 metadata - keyID=%s, raw len=%d", sseS3Key.KeyID, len(sseS3Metadata)) + } + + // Step 4: Save metadata to filer via gRPC + // Use context.Background() to ensure metadata save completes even if HTTP request is cancelled + // This matches the chunk upload behavior and prevents orphaned chunks + glog.V(3).Infof("putToFiler: About to create entry - dir=%s, name=%s, chunks=%d, extended keys=%d", + filepath.Dir(filePath), filepath.Base(filePath), len(entry.Chunks), len(entry.Extended)) + createErr := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + req := &filer_pb.CreateEntryRequest{ + Directory: filepath.Dir(filePath), + Entry: entry, } - return "", s3err.ErrInternalError, "" + glog.V(3).Infof("putToFiler: Calling CreateEntry for %s", filePath) + _, err := client.CreateEntry(context.Background(), req) + if err != nil { + glog.Errorf("putToFiler: CreateEntry returned error: %v", err) + } + return err + }) + if createErr != nil { + glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr) + return "", filerErrorToS3Error(createErr.Error()), SSEResponseMetadata{} } - defer resp.Body.Close() + glog.V(3).Infof("putToFiler: CreateEntry SUCCESS for %s", filePath) - etag = fmt.Sprintf("%x", hash.Sum(nil)) + glog.V(2).Infof("putToFiler: Metadata saved SUCCESS - path=%s, etag(hex)=%s, size=%d, partNumber=%d", + filePath, etag, entry.Attributes.FileSize, partNumber) - resp_body, ra_err := io.ReadAll(resp.Body) - if ra_err != nil { - glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err) - return etag, s3err.ErrInternalError, "" - } - var ret weed_server.FilerPostResult - unmarshal_err := json.Unmarshal(resp_body, &ret) - if unmarshal_err != nil { - glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body)) - return "", s3err.ErrInternalError, "" - } - if ret.Error != "" { - glog.Errorf("upload to filer error: %v", ret.Error) - return "", filerErrorToS3Error(ret.Error), "" + BucketTrafficReceived(chunkResult.TotalSize, r) + + // Build SSE response metadata with encryption details + responseMetadata := SSEResponseMetadata{ + SSEType: sseResult.SSEType, } - BucketTrafficReceived(ret.Size, r) + // For SSE-KMS, include key ID and bucket-key-enabled flag from stored metadata + if sseKMSKey != nil { + responseMetadata.KMSKeyID = sseKMSKey.KeyID + responseMetadata.BucketKeyEnabled = sseKMSKey.BucketKeyEnabled + glog.V(4).Infof("putToFiler: returning SSE-KMS metadata - keyID=%s, bucketKeyEnabled=%v", + sseKMSKey.KeyID, sseKMSKey.BucketKeyEnabled) + } - // Return the SSE type determined by the unified handler - return etag, s3err.ErrNone, sseResult.SSEType + return etag, s3err.ErrNone, responseMetadata } func setEtag(w http.ResponseWriter, etag string) { @@ -384,6 +578,43 @@ func setEtag(w http.ResponseWriter, etag string) { } } +// setSSEResponseHeaders sets appropriate SSE response headers based on encryption type +func (s3a *S3ApiServer) setSSEResponseHeaders(w http.ResponseWriter, r *http.Request, sseMetadata SSEResponseMetadata) { + switch sseMetadata.SSEType { + case s3_constants.SSETypeS3: + // SSE-S3: Return the encryption algorithm + w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) + + case s3_constants.SSETypeC: + // SSE-C: Echo back the customer-provided algorithm and key MD5 + if algo := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm); algo != "" { + w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, algo) + } + if keyMD5 := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5); keyMD5 != "" { + w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, keyMD5) + } + + case s3_constants.SSETypeKMS: + // SSE-KMS: Return the KMS key ID and algorithm + w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms") + + // Use metadata from stored encryption config (for bucket-default encryption) + // or fall back to request headers (for explicit encryption) + if sseMetadata.KMSKeyID != "" { + w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, sseMetadata.KMSKeyID) + } else if keyID := r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId); keyID != "" { + w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, keyID) + } + + // Set bucket-key-enabled header if it was enabled + if sseMetadata.BucketKeyEnabled { + w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true") + } else if bucketKeyEnabled := r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled); bucketKeyEnabled == "true" { + w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true") + } + } +} + func filerErrorToS3Error(errString string) s3err.ErrorCode { switch { case errString == constants.ErrMsgBadDigest: @@ -446,18 +677,18 @@ func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_ // // For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory, // while existing versions from when versioning was enabled remain preserved in the .versions subdirectory. -func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) { +func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) { // Normalize object path to ensure consistency with toFilerUrl behavior normalizedObject := removeDuplicateSlashes(object) // Enable detailed logging for testobjbar isTestObj := (normalizedObject == "testobjbar") - glog.V(0).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s, isTestObj=%v", + glog.V(3).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s, isTestObj=%v", bucket, object, normalizedObject, isTestObj) if isTestObj { - glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject START ===") + glog.V(3).Infof("=== TESTOBJBAR: putSuspendedVersioningObject START ===") } bucketDir := s3a.option.BucketsPath + "/" + bucket @@ -470,20 +701,20 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob entries, _, err := s3a.list(versionsDir, "", "", false, 1000) if err == nil { // .versions directory exists - glog.V(0).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object) + glog.V(3).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object) for _, entry := range entries { if entry.Extended != nil { if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok { versionId := string(versionIdBytes) - glog.V(0).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId) + glog.V(3).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId) if versionId == "null" { // Only delete null version - preserve real versioned entries - glog.V(0).Infof("putSuspendedVersioningObject: deleting null version from .versions") + glog.V(3).Infof("putSuspendedVersioningObject: deleting null version from .versions") err := s3a.rm(versionsDir, entry.Name, true, false) if err != nil { glog.Warningf("putSuspendedVersioningObject: failed to delete null version: %v", err) } else { - glog.V(0).Infof("putSuspendedVersioningObject: successfully deleted null version") + glog.V(3).Infof("putSuspendedVersioningObject: successfully deleted null version") } break } @@ -491,7 +722,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob } } } else { - glog.V(0).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object) + glog.V(3).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object) } uploadUrl := s3a.toFilerUrl(bucket, normalizedObject) @@ -509,7 +740,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob // Set version ID to "null" for suspended versioning r.Header.Set(s3_constants.ExtVersionIdKey, "null") if isTestObj { - glog.V(0).Infof("=== TESTOBJBAR: set version header before putToFiler, r.Header[%s]=%s ===", + glog.V(3).Infof("=== TESTOBJBAR: set version header before putToFiler, r.Header[%s]=%s ===", s3_constants.ExtVersionIdKey, r.Header.Get(s3_constants.ExtVersionIdKey)) } @@ -528,7 +759,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate) if err != nil { glog.Errorf("putSuspendedVersioningObject: failed to parse retention until date: %v", err) - return "", s3err.ErrInvalidRequest + return "", s3err.ErrInvalidRequest, SSEResponseMetadata{} } r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10)) glog.V(2).Infof("putSuspendedVersioningObject: setting retention until date header (timestamp: %d)", parsedTime.Unix()) @@ -540,7 +771,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob glog.V(2).Infof("putSuspendedVersioningObject: setting legal hold header: %s", legalHold) } else { glog.Errorf("putSuspendedVersioningObject: invalid legal hold value: %s", legalHold) - return "", s3err.ErrInvalidRequest + return "", s3err.ErrInvalidRequest, SSEResponseMetadata{} } } @@ -563,15 +794,15 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob // Upload the file using putToFiler - this will create the file with version metadata if isTestObj { - glog.V(0).Infof("=== TESTOBJBAR: calling putToFiler ===") + glog.V(3).Infof("=== TESTOBJBAR: calling putToFiler ===") } - etag, errCode, _ = s3a.putToFiler(r, uploadUrl, body, "", bucket, 1) + etag, errCode, sseMetadata = s3a.putToFiler(r, uploadUrl, body, bucket, 1) if errCode != s3err.ErrNone { glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode) - return "", errCode + return "", errCode, SSEResponseMetadata{} } if isTestObj { - glog.V(0).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag) + glog.V(3).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag) } // Verify the metadata was set correctly during file creation @@ -581,19 +812,19 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob for attempt := 1; attempt <= maxRetries; attempt++ { verifyEntry, verifyErr := s3a.getEntry(bucketDir, normalizedObject) if verifyErr == nil { - glog.V(0).Infof("=== TESTOBJBAR: verify attempt %d, entry.Extended=%v ===", attempt, verifyEntry.Extended) + glog.V(3).Infof("=== TESTOBJBAR: verify attempt %d, entry.Extended=%v ===", attempt, verifyEntry.Extended) if verifyEntry.Extended != nil { if versionIdBytes, ok := verifyEntry.Extended[s3_constants.ExtVersionIdKey]; ok { - glog.V(0).Infof("=== TESTOBJBAR: verification SUCCESSFUL, version=%s ===", string(versionIdBytes)) + glog.V(3).Infof("=== TESTOBJBAR: verification SUCCESSFUL, version=%s ===", string(versionIdBytes)) } else { - glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, ExtVersionIdKey not found ===") + glog.V(3).Infof("=== TESTOBJBAR: verification FAILED, ExtVersionIdKey not found ===") } } else { - glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, Extended is nil ===") + glog.V(3).Infof("=== TESTOBJBAR: verification FAILED, Extended is nil ===") } break } else { - glog.V(0).Infof("=== TESTOBJBAR: getEntry failed on attempt %d: %v ===", attempt, verifyErr) + glog.V(3).Infof("=== TESTOBJBAR: getEntry failed on attempt %d: %v ===", attempt, verifyErr) } if attempt < maxRetries { time.Sleep(time.Millisecond * 10) @@ -610,9 +841,9 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object) if isTestObj { - glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===") + glog.V(3).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===") } - return etag, s3err.ErrNone + return etag, s3err.ErrNone, sseMetadata } // updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers @@ -684,7 +915,7 @@ func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object return nil } -func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) { +func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) { // Generate version ID versionId = generateVersionId() @@ -709,7 +940,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin }) if err != nil { glog.Errorf("putVersionedObject: failed to create .versions directory: %v", err) - return "", "", s3err.ErrInternalError + return "", "", s3err.ErrInternalError, SSEResponseMetadata{} } hash := md5.New() @@ -720,10 +951,10 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl) - etag, errCode, _ = s3a.putToFiler(r, versionUploadUrl, body, "", bucket, 1) + etag, errCode, sseMetadata = s3a.putToFiler(r, versionUploadUrl, body, bucket, 1) if errCode != s3err.ErrNone { glog.Errorf("putVersionedObject: failed to upload version: %v", errCode) - return "", "", errCode + return "", "", errCode, SSEResponseMetadata{} } // Get the uploaded entry to add versioning metadata @@ -745,7 +976,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin if err != nil { glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err) - return "", "", s3err.ErrInternalError + return "", "", s3err.ErrInternalError, SSEResponseMetadata{} } // Add versioning metadata to this version @@ -766,7 +997,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // Extract and store object lock metadata from request headers if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil { glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err) - return "", "", s3err.ErrInvalidRequest + return "", "", s3err.ErrInvalidRequest, SSEResponseMetadata{} } // Update the version entry with metadata @@ -777,17 +1008,17 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin }) if err != nil { glog.Errorf("putVersionedObject: failed to update version metadata: %v", err) - return "", "", s3err.ErrInternalError + return "", "", s3err.ErrInternalError, SSEResponseMetadata{} } // Update the .versions directory metadata to indicate this is the latest version err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName) if err != nil { glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) - return "", "", s3err.ErrInternalError + return "", "", s3err.ErrInternalError, SSEResponseMetadata{} } glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject) - return versionId, etag, s3err.ErrNone + return versionId, etag, s3err.ErrNone, sseMetadata } // updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version @@ -963,7 +1194,8 @@ func (s3a *S3ApiServer) applySSEKMSDefaultEncryption(bucket string, r *http.Requ bucketKeyEnabled := encryptionConfig.BucketKeyEnabled // Build encryption context for KMS - bucket, object := s3_constants.GetBucketAndObject(r) + // Use bucket parameter passed to function (not from request parsing) + _, object := s3_constants.GetBucketAndObject(r) encryptionContext := BuildEncryptionContext(bucket, object, bucketKeyEnabled) // Create SSE-KMS encrypted reader