diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 6dd760859..ab2f73f65 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -2,17 +2,12 @@ package s3api import ( "bytes" - "context" "encoding/base64" - "encoding/json" "errors" "fmt" "io" - "math" - "mime" "net/http" "net/url" - "path/filepath" "sort" "strconv" "strings" @@ -20,14 +15,13 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/security" - "github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/util/mem" "github.com/seaweedfs/seaweedfs/weed/glog" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) // corsHeaders defines the CORS headers that need to be preserved @@ -41,10 +35,6 @@ var corsHeaders = []string{ "Access-Control-Allow-Credentials", } -// zeroBuf is a reusable buffer of zero bytes for padding operations -// Package-level to avoid per-call allocations in writeZeroBytes -var zeroBuf = make([]byte, 32*1024) - func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser { mimeBuffer := make([]byte, 512) size, _ := dataReader.Read(mimeBuffer) @@ -133,13 +123,6 @@ func (s3a *S3ApiServer) checkDirectoryObject(bucket, object string) (*filer_pb.E // serveDirectoryContent serves the content of a directory object directly func (s3a *S3ApiServer) serveDirectoryContent(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry) { - // Defensive nil checks - entry and attributes should never be nil, but guard against it - if entry == nil || entry.Attributes == nil { - glog.Errorf("serveDirectoryContent: entry or attributes is nil") - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } - // Set content type - use stored MIME type or default contentType := entry.Attributes.Mime if contentType == "" { @@ -194,6 +177,40 @@ func (s3a *S3ApiServer) handleDirectoryObjectRequest(w http.ResponseWriter, r *h return false // Not a directory object, continue with normal processing } +// rejectDirectoryObjectWithoutSlash rejects requests that target directories without using the trailing slash form. +// AWS S3 treats such requests as missing objects, which is relied upon by readers that distinguish +// files vs. prefixes via HEAD calls (e.g. PyArrow datasets). Seaweed internally stores directories +// as filer entries, so we need to hide them unless the caller explicitly uses the directory form. +func (s3a *S3ApiServer) rejectDirectoryObjectWithoutSlash(w http.ResponseWriter, r *http.Request, bucket, object, handlerName string, entryRef **filer_pb.Entry) bool { + if strings.HasSuffix(object, "/") { + return false + } + + var currentEntry *filer_pb.Entry + if entryRef != nil && *entryRef != nil { + currentEntry = *entryRef + } else { + var err error + currentEntry, err = s3a.fetchObjectEntry(bucket, object) + if err != nil { + glog.Errorf("%s: failed to fetch entry for %s/%s: %v", handlerName, bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return true + } + if entryRef != nil { + *entryRef = currentEntry + } + } + + if currentEntry != nil && currentEntry.IsDirectory { + glog.V(2).Infof("%s: object %s/%s is a directory but requested without trailing slash", handlerName, bucket, object) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return true + } + + return false +} + func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bucketPrefix string, fetchOwner bool, isDirectory bool, encodingTypeUrl bool, iam AccountManager) (listEntry ListEntry) { storageClass := "STANDARD" if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok { @@ -289,44 +306,36 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) bucket, object := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("GetObjectHandler %s %s", bucket, object) - // TTFB Profiling: Track all stages until first byte - tStart := time.Now() - var ( - conditionalHeadersTime time.Duration - versioningCheckTime time.Duration - entryFetchTime time.Duration - streamTime time.Duration - ) - defer func() { - totalTime := time.Since(tStart) - glog.V(2).Infof("GET TTFB PROFILE %s/%s: total=%v | conditional=%v, versioning=%v, entryFetch=%v, stream=%v", - bucket, object, totalTime, conditionalHeadersTime, versioningCheckTime, entryFetchTime, streamTime) - }() - // Handle directory objects with shared logic if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "GetObjectHandler") { return // Directory object request was handled } // Check conditional headers and handle early return if conditions fail - tConditional := time.Now() result, handled := s3a.processConditionalHeaders(w, r, bucket, object, "GetObjectHandler") - conditionalHeadersTime = time.Since(tConditional) if handled { return } + entryFromConditional := result.Entry + if s3a.rejectDirectoryObjectWithoutSlash(w, r, bucket, object, "GetObjectHandler", &entryFromConditional) { + return + } + if entryFromConditional != result.Entry { + result.Entry = entryFromConditional + } + // Check for specific version ID in query parameters versionId := r.URL.Query().Get("versionId") var ( + destUrl string entry *filer_pb.Entry // Declare entry at function scope for SSE processing versioningConfigured bool err error ) // Check if versioning is configured for the bucket (Enabled or Suspended) - tVersioning := time.Now() // Note: We need to check this even if versionId is empty, because versioned buckets // handle even "get latest version" requests differently (through .versions directory) versioningConfigured, err = s3a.isVersioningConfigured(bucket) @@ -339,91 +348,40 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - glog.V(3).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId) + glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId) if versioningConfigured { - // Handle versioned GET - check if specific version requested + // Handle versioned GET - all versions are stored in .versions directory var targetVersionId string if versionId != "" { - // Request for specific version - must look in .versions directory - glog.V(3).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object) + // Request for specific version + glog.V(2).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object) entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) if err != nil { glog.Errorf("Failed to get specific version %s: %v", versionId, err) s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) return } - // Safety check: entry must be valid after successful retrieval - if entry == nil { - glog.Errorf("GetObject: getSpecificObjectVersion returned nil entry without error for version %s", versionId) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } targetVersionId = versionId } else { - // Request for latest version - OPTIMIZATION: - // Check if .versions/ directory exists quickly (no retries) to decide path - // - If .versions/ exists: real versions available, use getLatestObjectVersion - // - If .versions/ doesn't exist (ErrNotFound): only null version at regular path, use it directly - // - If transient error: fall back to getLatestObjectVersion which has retry logic - bucketDir := s3a.option.BucketsPath + "/" + bucket - normalizedObject := removeDuplicateSlashes(object) - versionsDir := normalizedObject + s3_constants.VersionsFolder - - // Quick check (no retries) for .versions/ directory - versionsEntry, versionsErr := s3a.getEntry(bucketDir, versionsDir) - - if versionsErr == nil && versionsEntry != nil { - // .versions/ exists, meaning real versions are stored there - // Use getLatestObjectVersion which will properly find the newest version - entry, err = s3a.getLatestObjectVersion(bucket, object) - if err != nil { - glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } - } else if errors.Is(versionsErr, filer_pb.ErrNotFound) { - // .versions/ doesn't exist (confirmed not found), check regular path for null version - regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject) - if regularErr == nil && regularEntry != nil { - // Found object at regular path - this is the null version - entry = regularEntry - targetVersionId = "null" - } else { - // No object at regular path either - object doesn't exist - glog.Errorf("GetObject: object not found at regular path or .versions for %s%s", bucket, object) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } - } else { - // Transient error checking .versions/, fall back to getLatestObjectVersion with retries - glog.V(2).Infof("GetObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr) - entry, err = s3a.getLatestObjectVersion(bucket, object) - if err != nil { - glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } - } - // Safety check: entry must be valid after successful retrieval - if entry == nil { - glog.Errorf("GetObject: entry is nil after versioned lookup for %s%s", bucket, object) + // Request for latest version + glog.V(1).Infof("GetObject: requesting latest version for %s%s", bucket, object) + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err) s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) return } - // Extract version ID if not already set - if targetVersionId == "" { - if entry.Extended != nil { - if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { - targetVersionId = string(versionIdBytes) - } - } - // If no version ID found in entry, this is a pre-versioning object - if targetVersionId == "" { - targetVersionId = "null" + if entry.Extended != nil { + if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { + targetVersionId = string(versionIdBytes) } } + // If no version ID found in entry, this is a pre-versioning object + if targetVersionId == "" { + targetVersionId = "null" + } } // Check if this is a delete marker @@ -434,11 +392,16 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) } } - // For versioned objects, log the target version + // Determine the actual file path based on whether this is a versioned or pre-versioning object if targetVersionId == "null" { - glog.V(2).Infof("GetObject: pre-versioning object %s/%s", bucket, object) + // Pre-versioning object - stored as regular file + destUrl = s3a.toFilerUrl(bucket, object) + glog.V(2).Infof("GetObject: pre-versioning object URL: %s", destUrl) } else { - glog.V(2).Infof("GetObject: version %s for %s/%s", targetVersionId, bucket, object) + // Versioned object - stored in .versions directory + versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId) + destUrl = s3a.toFilerUrl(bucket, versionObjectPath) + glog.V(2).Infof("GetObject: version %s URL: %s", targetVersionId, destUrl) } // Set version ID in response header @@ -446,25 +409,21 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) // Add object lock metadata to response headers if present s3a.addObjectLockHeadersToResponse(w, entry) + } else { + // Handle regular GET (non-versioned) + destUrl = s3a.toFilerUrl(bucket, object) } - versioningCheckTime = time.Since(tVersioning) - // Fetch the correct entry for SSE processing (respects versionId) // This consolidates entry lookups to avoid multiple filer calls - tEntryFetch := time.Now() var objectEntryForSSE *filer_pb.Entry + originalRangeHeader := r.Header.Get("Range") + var sseObject = false // Optimization: Reuse already-fetched entry to avoid redundant metadata fetches if versioningConfigured { // For versioned objects, reuse the already-fetched entry objectEntryForSSE = entry - // Safety check - this should never happen as versioned path handles errors above - if objectEntryForSSE == nil { - glog.Errorf("GetObjectHandler: unexpected nil entry for versioned object %s/%s", bucket, object) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } } else { // For non-versioned objects, try to reuse entry from conditional header check if result.Entry != nil { @@ -480,7 +439,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) var fetchErr error objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object) if fetchErr != nil { - glog.Warningf("GetObjectHandler: failed to get entry for %s/%s: %v", bucket, object, fetchErr) + glog.Errorf("GetObjectHandler: failed to get entry for SSE check: %v", fetchErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } @@ -491,1536 +450,114 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) } } } - entryFetchTime = time.Since(tEntryFetch) - - // Check if PartNumber query parameter is present (for multipart GET requests) - partNumberStr := r.URL.Query().Get("partNumber") - if partNumberStr == "" { - partNumberStr = r.URL.Query().Get("PartNumber") - } - - // If PartNumber is specified, set headers and modify Range to read only that part - // This replicates the filer handler logic - if partNumberStr != "" { - if partNumber, parseErr := strconv.Atoi(partNumberStr); parseErr == nil && partNumber > 0 { - // Get actual parts count from metadata (not chunk count) - partsCount, partInfo := s3a.getMultipartInfo(objectEntryForSSE, partNumber) - - // Validate part number - if partNumber > partsCount { - glog.Warningf("GetObject: Invalid part number %d, object has %d parts", partNumber, partsCount) - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) - return - } - - // Set parts count header - w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(partsCount)) - glog.V(3).Infof("GetObject: Set PartsCount=%d for multipart GET with PartNumber=%d", partsCount, partNumber) - - // Calculate the byte range for this part - var startOffset, endOffset int64 - if partInfo != nil { - // Use part boundaries from metadata (accurate for multi-chunk parts) - startOffset = objectEntryForSSE.Chunks[partInfo.StartChunk].Offset - lastChunk := objectEntryForSSE.Chunks[partInfo.EndChunk-1] - endOffset = lastChunk.Offset + int64(lastChunk.Size) - 1 - - // Override ETag with the part's ETag from metadata - w.Header().Set("ETag", "\""+partInfo.ETag+"\"") - glog.V(3).Infof("GetObject: Override ETag with part %d ETag: %s (from metadata)", partNumber, partInfo.ETag) - } else { - // Fallback: assume 1:1 part-to-chunk mapping (backward compatibility) - chunkIndex := partNumber - 1 - if chunkIndex >= len(objectEntryForSSE.Chunks) { - glog.Warningf("GetObject: Part %d chunk index %d out of range (chunks: %d)", partNumber, chunkIndex, len(objectEntryForSSE.Chunks)) - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) - return - } - partChunk := objectEntryForSSE.Chunks[chunkIndex] - startOffset = partChunk.Offset - endOffset = partChunk.Offset + int64(partChunk.Size) - 1 - - // Override ETag with chunk's ETag (fallback) - if partChunk.ETag != "" { - if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil { - partETag := fmt.Sprintf("%x", md5Bytes) - w.Header().Set("ETag", "\""+partETag+"\"") - glog.V(3).Infof("GetObject: Override ETag with part %d ETag: %s (fallback from chunk)", partNumber, partETag) - } - } - } - // CRITICAL: Set Range header to read only this part's bytes (matches filer logic) - // This ensures we stream only the specific part, not the entire object - rangeHeader := fmt.Sprintf("bytes=%d-%d", startOffset, endOffset) - r.Header.Set("Range", rangeHeader) - glog.V(3).Infof("GetObject: Set Range header for part %d: %s", partNumber, rangeHeader) + // Check if this is an SSE object for Range request handling + // This applies to both versioned and non-versioned objects + if originalRangeHeader != "" && objectEntryForSSE != nil { + primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE) + if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS { + sseObject = true + // Temporarily remove Range header to get full encrypted data from filer + r.Header.Del("Range") } } - // NEW OPTIMIZATION: Stream directly from volume servers, bypassing filer proxy - // This eliminates the 19ms filer proxy overhead - // SSE decryption is handled inline during streaming - - // Safety check: entry must be valid before streaming - if objectEntryForSSE == nil { - glog.Errorf("GetObjectHandler: objectEntryForSSE is nil for %s/%s (should not happen)", bucket, object) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } - - // Detect SSE encryption type - primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE) - - // Stream directly from volume servers with SSE support - tStream := time.Now() - err = s3a.streamFromVolumeServersWithSSE(w, r, objectEntryForSSE, primarySSEType) - streamTime = time.Since(tStream) - if err != nil { - glog.Errorf("GetObjectHandler: failed to stream from volume servers: %v", err) - // Try to write error response. The HTTP library will gracefully handle cases - // where headers are already sent (e.g., during streaming errors). - // For early validation errors, this ensures client gets proper error response. - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } -} - -// streamFromVolumeServers streams object data directly from volume servers, bypassing filer proxy -// This eliminates the ~19ms filer proxy overhead by reading chunks directly -func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error { - // Profiling: Track overall and stage timings - t0 := time.Now() - var ( - rangeParseTime time.Duration - headerSetTime time.Duration - chunkResolveTime time.Duration - streamPrepTime time.Duration - streamExecTime time.Duration - ) - defer func() { - totalTime := time.Since(t0) - glog.V(2).Infof(" └─ streamFromVolumeServers: total=%v, rangeParse=%v, headerSet=%v, chunkResolve=%v, streamPrep=%v, streamExec=%v", - totalTime, rangeParseTime, headerSetTime, chunkResolveTime, streamPrepTime, streamExecTime) - }() - - if entry == nil { - // Early validation error: write proper HTTP response - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "Internal Server Error: entry is nil") - return fmt.Errorf("entry is nil") - } - - // Get file size - totalSize := int64(filer.FileSize(entry)) - - // Parse Range header if present - tRangeParse := time.Now() - var offset int64 = 0 - var size int64 = totalSize - rangeHeader := r.Header.Get("Range") - isRangeRequest := false - - if rangeHeader != "" && strings.HasPrefix(rangeHeader, "bytes=") { - rangeSpec := rangeHeader[6:] - parts := strings.Split(rangeSpec, "-") - if len(parts) == 2 { - var startOffset, endOffset int64 - - // Handle different Range formats: - // 1. "bytes=0-499" - first 500 bytes (parts[0]="0", parts[1]="499") - // 2. "bytes=500-" - from byte 500 to end (parts[0]="500", parts[1]="") - // 3. "bytes=-500" - last 500 bytes (parts[0]="", parts[1]="500") - - if parts[0] == "" && parts[1] != "" { - // Suffix range: bytes=-N (last N bytes) - if suffixLen, err := strconv.ParseInt(parts[1], 10, 64); err == nil { - // RFC 7233: suffix range on empty object or zero-length suffix is unsatisfiable - if totalSize == 0 || suffixLen <= 0 { - w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) - w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - return fmt.Errorf("invalid suffix range for empty object") - } - if suffixLen > totalSize { - suffixLen = totalSize - } - startOffset = totalSize - suffixLen - endOffset = totalSize - 1 - } else { - // Set header BEFORE WriteHeader - w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) - w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - return fmt.Errorf("invalid suffix range") - } - } else { - // Regular range or open-ended range - startOffset = 0 - endOffset = totalSize - 1 - - if parts[0] != "" { - if parsed, err := strconv.ParseInt(parts[0], 10, 64); err == nil { - startOffset = parsed - } - } - if parts[1] != "" { - if parsed, err := strconv.ParseInt(parts[1], 10, 64); err == nil { - endOffset = parsed - } - } - - // Validate range - if startOffset < 0 || startOffset >= totalSize { - // Set header BEFORE WriteHeader - w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) - w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - return fmt.Errorf("invalid range start") - } - - if endOffset >= totalSize { - endOffset = totalSize - 1 - } - - if endOffset < startOffset { - // Set header BEFORE WriteHeader - w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) - w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - return fmt.Errorf("invalid range: end before start") - } - } - - offset = startOffset - size = endOffset - startOffset + 1 - isRangeRequest = true + s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { + // Restore the original Range header for SSE processing + if sseObject && originalRangeHeader != "" { + r.Header.Set("Range", originalRangeHeader) } - } - rangeParseTime = time.Since(tRangeParse) - - // For small files stored inline in entry.Content - validate BEFORE setting headers - if len(entry.Content) > 0 && totalSize == int64(len(entry.Content)) { - if isRangeRequest { - // Safely convert int64 to int for slice indexing - validate BEFORE WriteHeader - // Use MaxInt32 for portability across 32-bit and 64-bit platforms - if offset < 0 || offset > int64(math.MaxInt32) || size < 0 || size > int64(math.MaxInt32) { - // Early validation error: write proper HTTP response BEFORE headers - w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) - w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - fmt.Fprintf(w, "Range too large for platform: offset=%d, size=%d", offset, size) - return fmt.Errorf("range too large for platform: offset=%d, size=%d", offset, size) - } - start := int(offset) - end := start + int(size) - // Bounds check (should already be validated, but double-check) - BEFORE WriteHeader - if start < 0 || start > len(entry.Content) || end > len(entry.Content) || end < start { - // Early validation error: write proper HTTP response BEFORE headers - w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) - w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - fmt.Fprintf(w, "Invalid range for inline content: start=%d, end=%d, len=%d)", start, end, len(entry.Content)) - return fmt.Errorf("invalid range for inline content: start=%d, end=%d, len=%d", start, end, len(entry.Content)) - } - // Validation passed - now set headers and write - s3a.setResponseHeaders(w, entry, totalSize) - w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize)) - w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) - w.WriteHeader(http.StatusPartialContent) - _, err := w.Write(entry.Content[start:end]) - return err - } - // Non-range request for inline content - s3a.setResponseHeaders(w, entry, totalSize) - w.WriteHeader(http.StatusOK) - _, err := w.Write(entry.Content) - return err - } - - // Get chunks and validate BEFORE setting headers - chunks := entry.GetChunks() - glog.V(4).Infof("streamFromVolumeServers: entry has %d chunks, totalSize=%d, isRange=%v, offset=%d, size=%d", - len(chunks), totalSize, isRangeRequest, offset, size) - - if len(chunks) == 0 { - // BUG FIX: If totalSize > 0 but no chunks and no content, this is a data integrity issue - if totalSize > 0 && len(entry.Content) == 0 { - glog.Errorf("streamFromVolumeServers: Data integrity error - entry reports size %d but has no content or chunks", totalSize) - // IMPORTANT: Write error status before returning, since headers haven't been written yet - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "Internal Server Error: data integrity issue (size %d reported but no content available)", totalSize) - return fmt.Errorf("data integrity error: size %d reported but no content available", totalSize) - } - // Empty object - set headers and write status - s3a.setResponseHeaders(w, entry, totalSize) - w.WriteHeader(http.StatusOK) - return nil - } - // Log chunk details (verbose only - high frequency) - if glog.V(4) { - for i, chunk := range chunks { - glog.Infof(" GET Chunk[%d]: fid=%s, offset=%d, size=%d", i, chunk.GetFileIdString(), chunk.Offset, chunk.Size) + // Add SSE metadata headers based on object metadata before SSE processing + if objectEntryForSSE != nil { + s3a.addSSEHeadersToResponse(proxyResponse, objectEntryForSSE) } - } - // CRITICAL: Resolve chunks and prepare stream BEFORE WriteHeader - // This ensures we can write proper error responses if these operations fail - ctx := r.Context() - lookupFileIdFn := s3a.createLookupFileIdFunction() + // Handle SSE decryption (both SSE-C and SSE-KMS) if needed + return s3a.handleSSEResponse(r, proxyResponse, w, objectEntryForSSE) + }) +} - // Resolve chunk manifests with the requested range - tChunkResolve := time.Now() - resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, offset, offset+size) - chunkResolveTime = time.Since(tChunkResolve) - if err != nil { - glog.Errorf("streamFromVolumeServers: failed to resolve chunks: %v", err) - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "Internal Server Error: failed to resolve chunks") - return fmt.Errorf("failed to resolve chunks: %v", err) - } - - // Prepare streaming function with simple master client wrapper - tStreamPrep := time.Now() - masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn} - streamFn, err := filer.PrepareStreamContentWithThrottler( - ctx, - masterClient, - func(fileId string) string { - // Use volume server JWT (not filer JWT) for direct volume reads - return string(security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, fileId)) - }, - resolvedChunks, - offset, - size, - 0, // no throttling - ) - streamPrepTime = time.Since(tStreamPrep) - if err != nil { - glog.Errorf("streamFromVolumeServers: failed to prepare stream: %v", err) - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "Internal Server Error: failed to prepare stream") - return fmt.Errorf("failed to prepare stream: %v", err) - } +func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { - // All validation and preparation successful - NOW set headers and write status - tHeaderSet := time.Now() - s3a.setResponseHeaders(w, entry, totalSize) + bucket, object := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object) - // Override/add range-specific headers if this is a range request - if isRangeRequest { - w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize)) - w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) + // Handle directory objects with shared logic + if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "HeadObjectHandler") { + return // Directory object request was handled } - headerSetTime = time.Since(tHeaderSet) - // Now write status code (headers are all set, stream is ready) - if isRangeRequest { - w.WriteHeader(http.StatusPartialContent) - } else { - w.WriteHeader(http.StatusOK) + // Check conditional headers and handle early return if conditions fail + result, handled := s3a.processConditionalHeaders(w, r, bucket, object, "HeadObjectHandler") + if handled { + return } - // Stream directly to response - tStreamExec := time.Now() - glog.V(4).Infof("streamFromVolumeServers: starting streamFn, offset=%d, size=%d", offset, size) - err = streamFn(w) - streamExecTime = time.Since(tStreamExec) - if err != nil { - glog.Errorf("streamFromVolumeServers: streamFn failed: %v", err) - } else { - glog.V(4).Infof("streamFromVolumeServers: streamFn completed successfully") + entryFromConditional := result.Entry + if s3a.rejectDirectoryObjectWithoutSlash(w, r, bucket, object, "HeadObjectHandler", &entryFromConditional) { + return } - return err -} - -// Shared HTTP client for volume server requests (connection pooling) -var volumeServerHTTPClient = &http.Client{ - Timeout: 5 * time.Minute, - Transport: &http.Transport{ - MaxIdleConns: 100, - MaxIdleConnsPerHost: 10, - IdleConnTimeout: 90 * time.Second, - }, -} - -// createLookupFileIdFunction creates a reusable lookup function for resolving volume URLs -func (s3a *S3ApiServer) createLookupFileIdFunction() func(context.Context, string) ([]string, error) { - return func(ctx context.Context, fileId string) ([]string, error) { - var urls []string - err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - vid := filer.VolumeId(fileId) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ - VolumeIds: []string{vid}, - }) - if err != nil { - return err - } - if locs, found := resp.LocationsMap[vid]; found { - for _, loc := range locs.Locations { - // Build complete URL with volume server address and fileId - // Format: http://host:port/volumeId,fileId - urls = append(urls, "http://"+loc.Url+"/"+fileId) - } - } - return nil - }) - glog.V(3).Infof("createLookupFileIdFunction: fileId=%s, resolved urls=%v", fileId, urls) - return urls, err + if entryFromConditional != result.Entry { + result.Entry = entryFromConditional } -} -// streamFromVolumeServersWithSSE handles streaming with inline SSE decryption -func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error { - // If not encrypted, use fast path without decryption - if sseType == "" || sseType == "None" { - return s3a.streamFromVolumeServers(w, r, entry, sseType) - } + // Check for specific version ID in query parameters + versionId := r.URL.Query().Get("versionId") - // Profiling: Track SSE decryption stages - t0 := time.Now() var ( - rangeParseTime time.Duration - keyValidateTime time.Duration - headerSetTime time.Duration - streamFetchTime time.Duration - decryptSetupTime time.Duration - copyTime time.Duration + destUrl string + entry *filer_pb.Entry // Declare entry at function scope for SSE processing + versioningConfigured bool + err error ) - defer func() { - totalTime := time.Since(t0) - glog.V(2).Infof(" └─ streamFromVolumeServersWithSSE (%s): total=%v, rangeParse=%v, keyValidate=%v, headerSet=%v, streamFetch=%v, decryptSetup=%v, copy=%v", - sseType, totalTime, rangeParseTime, keyValidateTime, headerSetTime, streamFetchTime, decryptSetupTime, copyTime) - }() - - glog.V(2).Infof("streamFromVolumeServersWithSSE: Handling %s encrypted object with inline decryption", sseType) - - // Parse Range header BEFORE key validation - totalSize := int64(filer.FileSize(entry)) - tRangeParse := time.Now() - var offset int64 = 0 - var size int64 = totalSize - rangeHeader := r.Header.Get("Range") - isRangeRequest := false - - if rangeHeader != "" && strings.HasPrefix(rangeHeader, "bytes=") { - rangeSpec := rangeHeader[6:] - parts := strings.Split(rangeSpec, "-") - if len(parts) == 2 { - var startOffset, endOffset int64 - - if parts[0] == "" && parts[1] != "" { - // Suffix range: bytes=-N (last N bytes) - if suffixLen, err := strconv.ParseInt(parts[1], 10, 64); err == nil { - // RFC 7233: suffix range on empty object or zero-length suffix is unsatisfiable - if totalSize == 0 || suffixLen <= 0 { - w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) - w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - return fmt.Errorf("invalid suffix range for empty object") - } - if suffixLen > totalSize { - suffixLen = totalSize - } - startOffset = totalSize - suffixLen - endOffset = totalSize - 1 - } else { - // Set header BEFORE WriteHeader - w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) - w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - return fmt.Errorf("invalid suffix range") - } - } else { - // Regular range or open-ended range - startOffset = 0 - endOffset = totalSize - 1 - - if parts[0] != "" { - if parsed, err := strconv.ParseInt(parts[0], 10, 64); err == nil { - startOffset = parsed - } - } - if parts[1] != "" { - if parsed, err := strconv.ParseInt(parts[1], 10, 64); err == nil { - endOffset = parsed - } - } - - // Validate range - if startOffset < 0 || startOffset >= totalSize { - // Set header BEFORE WriteHeader - w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) - w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - return fmt.Errorf("invalid range start") - } - - if endOffset >= totalSize { - endOffset = totalSize - 1 - } - - if endOffset < startOffset { - // Set header BEFORE WriteHeader - w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) - w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - return fmt.Errorf("invalid range: end before start") - } - } - - offset = startOffset - size = endOffset - startOffset + 1 - isRangeRequest = true - glog.V(2).Infof("streamFromVolumeServersWithSSE: Range request bytes %d-%d/%d (size=%d)", startOffset, endOffset, totalSize, size) - } - } - rangeParseTime = time.Since(tRangeParse) - - // Validate SSE keys BEFORE streaming - tKeyValidate := time.Now() - var decryptionKey interface{} - switch sseType { - case s3_constants.SSETypeC: - customerKey, err := ParseSSECHeaders(r) - if err != nil { - s3err.WriteErrorResponse(w, r, MapSSECErrorToS3Error(err)) - return err - } - if customerKey == nil { - s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) - return fmt.Errorf("SSE-C key required") - } - // Validate key MD5 - if entry.Extended != nil { - storedKeyMD5 := string(entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]) - if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 { - s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) - return fmt.Errorf("SSE-C key mismatch") - } - } - decryptionKey = customerKey - case s3_constants.SSETypeKMS: - // Extract KMS key from metadata (stored as raw bytes, matching filer behavior) - if entry.Extended == nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return fmt.Errorf("no SSE-KMS metadata") - } - kmsMetadataBytes := entry.Extended[s3_constants.SeaweedFSSSEKMSKey] - sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) - if err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return err - } - decryptionKey = sseKMSKey - case s3_constants.SSETypeS3: - // Extract S3 key from metadata (stored as raw bytes, matching filer behavior) - if entry.Extended == nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return fmt.Errorf("no SSE-S3 metadata") - } - keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key] - keyManager := GetSSES3KeyManager() - sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) - if err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return err - } - decryptionKey = sseS3Key - } - keyValidateTime = time.Since(tKeyValidate) - - // Set response headers - // IMPORTANT: Set ALL headers BEFORE calling WriteHeader (headers are ignored after WriteHeader) - tHeaderSet := time.Now() - s3a.setResponseHeaders(w, entry, totalSize) - s3a.addSSEResponseHeadersFromEntry(w, r, entry, sseType) - - // Override/add range-specific headers if this is a range request - if isRangeRequest { - w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize)) - w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) - } - headerSetTime = time.Since(tHeaderSet) - - // Now write status code (headers are all set) - if isRangeRequest { - w.WriteHeader(http.StatusPartialContent) - } - - // Full Range Optimization: Use ViewFromChunks to only fetch/decrypt needed chunks - tDecryptSetup := time.Now() - - // Use range-aware chunk resolution (like filer does) - if isRangeRequest { - glog.V(2).Infof("Using range-aware SSE decryption for offset=%d size=%d", offset, size) - streamFetchTime = 0 // No full stream fetch in range-aware path - err := s3a.streamDecryptedRangeFromChunks(r.Context(), w, entry, offset, size, sseType, decryptionKey) - decryptSetupTime = time.Since(tDecryptSetup) - copyTime = decryptSetupTime // Streaming is included in decrypt setup for range-aware path - return err - } - - // Full object path: Optimize multipart vs single-part - var decryptedReader io.Reader - var err error - - switch sseType { - case s3_constants.SSETypeC: - customerKey := decryptionKey.(*SSECustomerKey) - - // Check if this is a multipart object (multiple chunks with SSE-C metadata) - isMultipartSSEC := false - ssecChunks := 0 - for _, chunk := range entry.GetChunks() { - if chunk.GetSseType() == filer_pb.SSEType_SSE_C && len(chunk.GetSseMetadata()) > 0 { - ssecChunks++ - } - } - isMultipartSSEC = ssecChunks > 1 - glog.V(3).Infof("SSE-C decryption: KeyMD5=%s, entry has %d chunks, isMultipart=%v, ssecChunks=%d", - customerKey.KeyMD5, len(entry.GetChunks()), isMultipartSSEC, ssecChunks) - - if isMultipartSSEC { - // For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly - // This saves one filer lookup/pipe creation - decryptedReader, err = s3a.createMultipartSSECDecryptedReaderDirect(r.Context(), nil, customerKey, entry) - glog.V(2).Infof("Using multipart SSE-C decryption for object with %d chunks (no prefetch)", len(entry.GetChunks())) - } else { - // For single-part, get encrypted stream and decrypt - tStreamFetch := time.Now() - encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) - streamFetchTime = time.Since(tStreamFetch) - if streamErr != nil { - return streamErr - } - defer encryptedReader.Close() - - iv := entry.Extended[s3_constants.SeaweedFSSSEIV] - if len(iv) == 0 { - return fmt.Errorf("SSE-C IV not found in entry metadata") - } - glog.V(2).Infof("SSE-C decryption: IV length=%d, KeyMD5=%s", len(iv), customerKey.KeyMD5) - decryptedReader, err = CreateSSECDecryptedReader(encryptedReader, customerKey, iv) - } - - case s3_constants.SSETypeKMS: - sseKMSKey := decryptionKey.(*SSEKMSKey) - - // Check if this is a multipart object (multiple chunks with SSE-KMS metadata) - isMultipartSSEKMS := false - ssekmsChunks := 0 - for _, chunk := range entry.GetChunks() { - if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 { - ssekmsChunks++ - } - } - isMultipartSSEKMS = ssekmsChunks > 1 - glog.V(3).Infof("SSE-KMS decryption: isMultipart=%v, ssekmsChunks=%d", isMultipartSSEKMS, ssekmsChunks) - - if isMultipartSSEKMS { - // For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly - decryptedReader, err = s3a.createMultipartSSEKMSDecryptedReaderDirect(r.Context(), nil, entry) - glog.V(2).Infof("Using multipart SSE-KMS decryption for object with %d chunks (no prefetch)", len(entry.GetChunks())) - } else { - // For single-part, get encrypted stream and decrypt - tStreamFetch := time.Now() - encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) - streamFetchTime = time.Since(tStreamFetch) - if streamErr != nil { - return streamErr - } - defer encryptedReader.Close() - - glog.V(2).Infof("SSE-KMS decryption: KeyID=%s, IV length=%d", sseKMSKey.KeyID, len(sseKMSKey.IV)) - decryptedReader, err = CreateSSEKMSDecryptedReader(encryptedReader, sseKMSKey) - } - - case s3_constants.SSETypeS3: - sseS3Key := decryptionKey.(*SSES3Key) - - // Check if this is a multipart object (multiple chunks with SSE-S3 metadata) - isMultipartSSES3 := false - sses3Chunks := 0 - for _, chunk := range entry.GetChunks() { - if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(chunk.GetSseMetadata()) > 0 { - sses3Chunks++ - } - } - isMultipartSSES3 = sses3Chunks > 1 - glog.V(3).Infof("SSE-S3 decryption: isMultipart=%v, sses3Chunks=%d", isMultipartSSES3, sses3Chunks) - - if isMultipartSSES3 { - // For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly - decryptedReader, err = s3a.createMultipartSSES3DecryptedReaderDirect(r.Context(), nil, entry) - glog.V(2).Infof("Using multipart SSE-S3 decryption for object with %d chunks (no prefetch)", len(entry.GetChunks())) - } else { - // For single-part, get encrypted stream and decrypt - tStreamFetch := time.Now() - encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) - streamFetchTime = time.Since(tStreamFetch) - if streamErr != nil { - return streamErr - } - defer encryptedReader.Close() - - keyManager := GetSSES3KeyManager() - iv, ivErr := GetSSES3IV(entry, sseS3Key, keyManager) - if ivErr != nil { - return fmt.Errorf("failed to get SSE-S3 IV: %w", ivErr) - } - glog.V(2).Infof("SSE-S3 decryption: KeyID=%s, IV length=%d", sseS3Key.KeyID, len(iv)) - decryptedReader, err = CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv) - } - } - decryptSetupTime = time.Since(tDecryptSetup) + // Check if versioning is configured for the bucket (Enabled or Suspended) + // Note: We need to check this even if versionId is empty, because versioned buckets + // handle even "get latest version" requests differently (through .versions directory) + versioningConfigured, err = s3a.isVersioningConfigured(bucket) if err != nil { - glog.Errorf("SSE decryption error (%s): %v", sseType, err) - return fmt.Errorf("failed to create decrypted reader: %w", err) + if err == filer_pb.ErrNotFound { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) + return + } + glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return } - // Close the decrypted reader to avoid leaking HTTP bodies - if closer, ok := decryptedReader.(io.Closer); ok { - defer func() { - if closeErr := closer.Close(); closeErr != nil { - glog.V(3).Infof("Error closing decrypted reader: %v", closeErr) - } - }() - } - - // Stream full decrypted object to client - tCopy := time.Now() - buf := make([]byte, 128*1024) - copied, copyErr := io.CopyBuffer(w, decryptedReader, buf) - copyTime = time.Since(tCopy) - if copyErr != nil { - glog.Errorf("Failed to copy full object: copied %d bytes: %v", copied, copyErr) - return copyErr - } - glog.V(3).Infof("Full object request: copied %d bytes", copied) - return nil -} - -// streamDecryptedRangeFromChunks streams a range of decrypted data by only fetching needed chunks -// This implements the filer's ViewFromChunks approach for optimal range performance -func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io.Writer, entry *filer_pb.Entry, offset int64, size int64, sseType string, decryptionKey interface{}) error { - // Use filer's ViewFromChunks to resolve only needed chunks for the range - lookupFileIdFn := s3a.createLookupFileIdFunction() - chunkViews := filer.ViewFromChunks(ctx, lookupFileIdFn, entry.GetChunks(), offset, size) - - totalWritten := int64(0) - targetOffset := offset - - // Stream each chunk view - for x := chunkViews.Front(); x != nil; x = x.Next { - chunkView := x.Value - - // Handle gaps between chunks (write zeros) - if targetOffset < chunkView.ViewOffset { - gap := chunkView.ViewOffset - targetOffset - glog.V(4).Infof("Writing %d zero bytes for gap [%d,%d)", gap, targetOffset, chunkView.ViewOffset) - if err := writeZeroBytes(w, gap); err != nil { - return fmt.Errorf("failed to write zero padding: %w", err) - } - totalWritten += gap - targetOffset = chunkView.ViewOffset - } - - // Find the corresponding FileChunk for this chunkView - var fileChunk *filer_pb.FileChunk - for _, chunk := range entry.GetChunks() { - if chunk.GetFileIdString() == chunkView.FileId { - fileChunk = chunk - break - } - } - if fileChunk == nil { - return fmt.Errorf("chunk %s not found in entry", chunkView.FileId) - } - - // Fetch and decrypt this chunk view - var decryptedChunkReader io.Reader - var err error - - switch sseType { - case s3_constants.SSETypeC: - decryptedChunkReader, err = s3a.decryptSSECChunkView(ctx, fileChunk, chunkView, decryptionKey.(*SSECustomerKey)) - case s3_constants.SSETypeKMS: - decryptedChunkReader, err = s3a.decryptSSEKMSChunkView(ctx, fileChunk, chunkView) - case s3_constants.SSETypeS3: - decryptedChunkReader, err = s3a.decryptSSES3ChunkView(ctx, fileChunk, chunkView, entry) - default: - // Non-encrypted chunk - decryptedChunkReader, err = s3a.fetchChunkViewData(ctx, chunkView) - } - - if err != nil { - return fmt.Errorf("failed to decrypt chunk view %s: %w", chunkView.FileId, err) - } - - // Copy the decrypted chunk data - written, copyErr := io.Copy(w, decryptedChunkReader) - if closer, ok := decryptedChunkReader.(io.Closer); ok { - closeErr := closer.Close() - if closeErr != nil { - glog.Warningf("streamDecryptedRangeFromChunks: failed to close decrypted chunk reader: %v", closeErr) - } - } - if copyErr != nil { - glog.Errorf("streamDecryptedRangeFromChunks: copy error after writing %d bytes (expected %d): %v", written, chunkView.ViewSize, copyErr) - return fmt.Errorf("failed to copy decrypted chunk data: %w", copyErr) - } - - if written != int64(chunkView.ViewSize) { - glog.Errorf("streamDecryptedRangeFromChunks: size mismatch - wrote %d bytes but expected %d", written, chunkView.ViewSize) - return fmt.Errorf("size mismatch: wrote %d bytes but expected %d for chunk %s", written, chunkView.ViewSize, chunkView.FileId) - } - - totalWritten += written - targetOffset += written - glog.V(2).Infof("streamDecryptedRangeFromChunks: Wrote %d bytes from chunk %s [%d,%d), totalWritten=%d, targetSize=%d", written, chunkView.FileId, chunkView.ViewOffset, chunkView.ViewOffset+int64(chunkView.ViewSize), totalWritten, size) - } - - // Handle trailing zeros if needed - remaining := size - totalWritten - if remaining > 0 { - glog.V(4).Infof("Writing %d trailing zero bytes", remaining) - if err := writeZeroBytes(w, remaining); err != nil { - return fmt.Errorf("failed to write trailing zeros: %w", err) - } - } - - glog.V(3).Infof("Completed range-aware SSE decryption: wrote %d bytes for range [%d,%d)", totalWritten, offset, offset+size) - return nil -} - -// writeZeroBytes writes n zero bytes to writer using the package-level zero buffer -func writeZeroBytes(w io.Writer, n int64) error { - for n > 0 { - toWrite := min(n, int64(len(zeroBuf))) - written, err := w.Write(zeroBuf[:toWrite]) - if err != nil { - return err - } - n -= int64(written) - } - return nil -} - -// decryptSSECChunkView decrypts a specific chunk view with SSE-C -func (s3a *S3ApiServer) decryptSSECChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView, customerKey *SSECustomerKey) (io.Reader, error) { - // For multipart SSE-C, each chunk has its own IV in chunk.SseMetadata - if fileChunk.GetSseType() == filer_pb.SSEType_SSE_C && len(fileChunk.GetSseMetadata()) > 0 { - ssecMetadata, err := DeserializeSSECMetadata(fileChunk.GetSseMetadata()) - if err != nil { - return nil, fmt.Errorf("failed to deserialize SSE-C metadata: %w", err) - } - chunkIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV) - if err != nil { - return nil, fmt.Errorf("failed to decode IV: %w", err) - } - - // Fetch FULL encrypted chunk - // Note: Fetching full chunk is necessary for proper CTR decryption stream - fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId) - if err != nil { - return nil, fmt.Errorf("failed to fetch full chunk: %w", err) - } - - // Calculate IV using PartOffset - // PartOffset is the position of this chunk within its part's encrypted stream - var adjustedIV []byte - var ivSkip int - if ssecMetadata.PartOffset > 0 { - adjustedIV, ivSkip = calculateIVWithOffset(chunkIV, ssecMetadata.PartOffset) - } else { - adjustedIV = chunkIV - ivSkip = 0 - } - - // Decrypt the full chunk - decryptedReader, decryptErr := CreateSSECDecryptedReader(fullChunkReader, customerKey, adjustedIV) - if decryptErr != nil { - fullChunkReader.Close() - return nil, fmt.Errorf("failed to create decrypted reader: %w", decryptErr) - } - - // CRITICAL: Skip intra-block bytes from CTR decryption (non-block-aligned offset handling) - if ivSkip > 0 { - _, err = io.CopyN(io.Discard, decryptedReader, int64(ivSkip)) - if err != nil { - if closer, ok := decryptedReader.(io.Closer); ok { - closer.Close() - } - return nil, fmt.Errorf("failed to skip intra-block bytes (%d): %w", ivSkip, err) - } - } - - // Skip to the position we need in the decrypted stream - if chunkView.OffsetInChunk > 0 { - _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk) - if err != nil { - if closer, ok := decryptedReader.(io.Closer); ok { - closer.Close() - } - return nil, fmt.Errorf("failed to skip to offset %d: %w", chunkView.OffsetInChunk, err) - } - } - - // Return a reader that only reads ViewSize bytes with proper cleanup - limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) - return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil - } - - // Single-part SSE-C: use object-level IV (should not hit this in range path, but handle it) - encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView) - if err != nil { - return nil, err - } - // For single-part, the IV is stored at object level, already handled in non-range path - return encryptedReader, nil -} - -// decryptSSEKMSChunkView decrypts a specific chunk view with SSE-KMS -func (s3a *S3ApiServer) decryptSSEKMSChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView) (io.Reader, error) { - if fileChunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(fileChunk.GetSseMetadata()) > 0 { - sseKMSKey, err := DeserializeSSEKMSMetadata(fileChunk.GetSseMetadata()) - if err != nil { - return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err) - } - - // Fetch FULL encrypted chunk - fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId) - if err != nil { - return nil, fmt.Errorf("failed to fetch full chunk: %w", err) - } - - // Calculate IV using ChunkOffset (same as PartOffset in SSE-C) - var adjustedIV []byte - var ivSkip int - if sseKMSKey.ChunkOffset > 0 { - adjustedIV, ivSkip = calculateIVWithOffset(sseKMSKey.IV, sseKMSKey.ChunkOffset) - } else { - adjustedIV = sseKMSKey.IV - ivSkip = 0 - } - - adjustedKey := &SSEKMSKey{ - KeyID: sseKMSKey.KeyID, - EncryptedDataKey: sseKMSKey.EncryptedDataKey, - EncryptionContext: sseKMSKey.EncryptionContext, - BucketKeyEnabled: sseKMSKey.BucketKeyEnabled, - IV: adjustedIV, - ChunkOffset: sseKMSKey.ChunkOffset, - } - - decryptedReader, decryptErr := CreateSSEKMSDecryptedReader(fullChunkReader, adjustedKey) - if decryptErr != nil { - fullChunkReader.Close() - return nil, fmt.Errorf("failed to create KMS decrypted reader: %w", decryptErr) - } - - // CRITICAL: Skip intra-block bytes from CTR decryption (non-block-aligned offset handling) - if ivSkip > 0 { - _, err = io.CopyN(io.Discard, decryptedReader, int64(ivSkip)) - if err != nil { - if closer, ok := decryptedReader.(io.Closer); ok { - closer.Close() - } - return nil, fmt.Errorf("failed to skip intra-block bytes (%d): %w", ivSkip, err) - } - } - - // Skip to position and limit to ViewSize - if chunkView.OffsetInChunk > 0 { - _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk) - if err != nil { - if closer, ok := decryptedReader.(io.Closer); ok { - closer.Close() - } - return nil, fmt.Errorf("failed to skip to offset: %w", err) - } - } - - limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) - return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil - } - - // Non-KMS encrypted chunk - return s3a.fetchChunkViewData(ctx, chunkView) -} - -// decryptSSES3ChunkView decrypts a specific chunk view with SSE-S3 -func (s3a *S3ApiServer) decryptSSES3ChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView, entry *filer_pb.Entry) (io.Reader, error) { - // For multipart SSE-S3, each chunk has its own IV in chunk.SseMetadata - if fileChunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(fileChunk.GetSseMetadata()) > 0 { - keyManager := GetSSES3KeyManager() - - // Deserialize per-chunk SSE-S3 metadata to get chunk-specific IV - chunkSSES3Metadata, err := DeserializeSSES3Metadata(fileChunk.GetSseMetadata(), keyManager) - if err != nil { - return nil, fmt.Errorf("failed to deserialize chunk SSE-S3 metadata: %w", err) - } - - // Fetch FULL encrypted chunk (necessary for proper CTR decryption stream) - fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId) - if err != nil { - return nil, fmt.Errorf("failed to fetch full chunk: %w", err) - } - - // Use the chunk's IV directly (already adjusted for block offset during encryption) - // Note: SSE-S3 encryption flow: - // 1. Upload: CreateSSES3EncryptedReaderWithBaseIV(reader, key, baseIV, partOffset) - // calls calculateIVWithOffset(baseIV, partOffset) → (blockAlignedIV, skip) - // The blockAlignedIV is stored in chunk metadata - // 2. Download: We decrypt the FULL chunk from offset 0 using that blockAlignedIV - // Then skip to chunkView.OffsetInChunk in the PLAINTEXT (not ciphertext) - // This differs from SSE-C which stores base IV + PartOffset and calculates IV during decryption - // No ivSkip needed here because we're decrypting from chunk start (offset 0) - iv := chunkSSES3Metadata.IV - - glog.V(4).Infof("Decrypting multipart SSE-S3 chunk %s with chunk-specific IV length=%d", - chunkView.FileId, len(iv)) - - // Decrypt the full chunk starting from offset 0 - decryptedReader, decryptErr := CreateSSES3DecryptedReader(fullChunkReader, chunkSSES3Metadata, iv) - if decryptErr != nil { - fullChunkReader.Close() - return nil, fmt.Errorf("failed to create SSE-S3 decrypted reader: %w", decryptErr) - } - - // Skip to position within the decrypted chunk (plaintext offset, not ciphertext offset) - if chunkView.OffsetInChunk > 0 { - _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk) - if err != nil { - if closer, ok := decryptedReader.(io.Closer); ok { - closer.Close() - } - return nil, fmt.Errorf("failed to skip to offset %d: %w", chunkView.OffsetInChunk, err) - } - } - - limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) - return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil - } - - // Single-part SSE-S3: use object-level IV and key (fallback path) - keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key] - keyManager := GetSSES3KeyManager() - sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) - if err != nil { - return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err) - } - - // Fetch FULL encrypted chunk - fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId) - if err != nil { - return nil, fmt.Errorf("failed to fetch full chunk: %w", err) - } - - // Get base IV for single-part object - iv, err := GetSSES3IV(entry, sseS3Key, keyManager) - if err != nil { - fullChunkReader.Close() - return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err) - } - - glog.V(4).Infof("Decrypting single-part SSE-S3 chunk %s with entry-level IV length=%d", - chunkView.FileId, len(iv)) - - decryptedReader, decryptErr := CreateSSES3DecryptedReader(fullChunkReader, sseS3Key, iv) - if decryptErr != nil { - fullChunkReader.Close() - return nil, fmt.Errorf("failed to create S3 decrypted reader: %w", decryptErr) - } - - // Skip to position and limit to ViewSize - if chunkView.OffsetInChunk > 0 { - _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk) - if err != nil { - if closer, ok := decryptedReader.(io.Closer); ok { - closer.Close() - } - return nil, fmt.Errorf("failed to skip to offset: %w", err) - } - } - - limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) - return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil -} - -// fetchFullChunk fetches the complete encrypted chunk from volume server -func (s3a *S3ApiServer) fetchFullChunk(ctx context.Context, fileId string) (io.ReadCloser, error) { - // Lookup the volume server URLs for this chunk - lookupFileIdFn := s3a.createLookupFileIdFunction() - urlStrings, err := lookupFileIdFn(ctx, fileId) - if err != nil || len(urlStrings) == 0 { - return nil, fmt.Errorf("failed to lookup chunk %s: %w", fileId, err) - } - - // Use the first URL - chunkUrl := urlStrings[0] - - // Generate JWT for volume server authentication - jwt := security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, fileId) - - // Create request WITHOUT Range header to get full chunk - req, err := http.NewRequestWithContext(ctx, "GET", chunkUrl, nil) - if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) - } - - // Set JWT for authentication - if jwt != "" { - req.Header.Set("Authorization", "BEARER "+string(jwt)) - } - - // Use shared HTTP client - resp, err := volumeServerHTTPClient.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to fetch chunk: %w", err) - } - - if resp.StatusCode != http.StatusOK { - resp.Body.Close() - return nil, fmt.Errorf("unexpected status code %d for chunk %s", resp.StatusCode, fileId) - } - - return resp.Body, nil -} - -// fetchChunkViewData fetches encrypted data for a chunk view (with range) -func (s3a *S3ApiServer) fetchChunkViewData(ctx context.Context, chunkView *filer.ChunkView) (io.ReadCloser, error) { - // Lookup the volume server URLs for this chunk - lookupFileIdFn := s3a.createLookupFileIdFunction() - urlStrings, err := lookupFileIdFn(ctx, chunkView.FileId) - if err != nil || len(urlStrings) == 0 { - return nil, fmt.Errorf("failed to lookup chunk %s: %w", chunkView.FileId, err) - } - - // Use the first URL (already contains complete URL with fileId) - chunkUrl := urlStrings[0] - - // Generate JWT for volume server authentication - jwt := security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, chunkView.FileId) - - // Create request with Range header for the chunk view - // chunkUrl already contains the complete URL including fileId - req, err := http.NewRequestWithContext(ctx, "GET", chunkUrl, nil) - if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) - } - - // Set Range header to fetch only the needed portion of the chunk - if !chunkView.IsFullChunk() { - rangeEnd := chunkView.OffsetInChunk + int64(chunkView.ViewSize) - 1 - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", chunkView.OffsetInChunk, rangeEnd)) - } - - // Set JWT for authentication - if jwt != "" { - req.Header.Set("Authorization", "BEARER "+string(jwt)) - } - - // Use shared HTTP client with connection pooling - resp, err := volumeServerHTTPClient.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to fetch chunk: %w", err) - } - - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { - resp.Body.Close() - return nil, fmt.Errorf("unexpected status code %d for chunk %s", resp.StatusCode, chunkView.FileId) - } - - return resp.Body, nil -} - -// getEncryptedStreamFromVolumes gets raw encrypted data stream from volume servers -func (s3a *S3ApiServer) getEncryptedStreamFromVolumes(ctx context.Context, entry *filer_pb.Entry) (io.ReadCloser, error) { - // Handle inline content - if len(entry.Content) > 0 { - return io.NopCloser(bytes.NewReader(entry.Content)), nil - } - - // Handle empty files - chunks := entry.GetChunks() - if len(chunks) == 0 { - return io.NopCloser(bytes.NewReader([]byte{})), nil - } - - // Reuse shared lookup function to keep volume lookup logic in one place - lookupFileIdFn := s3a.createLookupFileIdFunction() - - // Resolve chunks - totalSize := int64(filer.FileSize(entry)) - resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, 0, totalSize) - if err != nil { - return nil, err - } - - // Create streaming reader - masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn} - streamFn, err := filer.PrepareStreamContentWithThrottler( - ctx, - masterClient, - func(fileId string) string { - // Use volume server JWT (not filer JWT) for direct volume reads - return string(security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, fileId)) - }, - resolvedChunks, - 0, - totalSize, - 0, - ) - if err != nil { - return nil, err - } - - // Create a pipe to get io.ReadCloser - pipeReader, pipeWriter := io.Pipe() - go func() { - defer pipeWriter.Close() - if err := streamFn(pipeWriter); err != nil { - glog.Errorf("getEncryptedStreamFromVolumes: streaming error: %v", err) - pipeWriter.CloseWithError(err) - } - }() - - return pipeReader, nil -} - -// addSSEResponseHeadersFromEntry adds appropriate SSE response headers based on entry metadata -func (s3a *S3ApiServer) addSSEResponseHeadersFromEntry(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) { - if entry == nil || entry.Extended == nil { - return - } - - switch sseType { - case s3_constants.SSETypeC: - // SSE-C: Echo back algorithm and key MD5 - if algo, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; exists { - w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, string(algo)) - } - if keyMD5, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; exists { - w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, string(keyMD5)) - } - - case s3_constants.SSETypeKMS: - // SSE-KMS: Return algorithm and key ID - w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms") - if kmsMetadataBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists { - sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) - if err == nil { - AddSSEKMSResponseHeaders(w, sseKMSKey) - } - } - - case s3_constants.SSETypeS3: - // SSE-S3: Return algorithm - w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) - } -} - -// pipeWriterWrapper wraps io.PipeWriter to implement http.ResponseWriter interface -type pipeWriterWrapper struct { - *io.PipeWriter -} - -func (pw *pipeWriterWrapper) Header() http.Header { - // Headers are already set on the real ResponseWriter, ignore here - return make(http.Header) -} - -func (pw *pipeWriterWrapper) WriteHeader(statusCode int) { - // Status is already set on the real ResponseWriter, ignore here -} - -// createSSECDecryptedReaderFromEntry creates an SSE-C decrypted reader from entry metadata -func (s3a *S3ApiServer) createSSECDecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) { - // Parse SSE-C headers from request - customerKey, err := ParseSSECHeaders(r) - if err != nil { - return nil, fmt.Errorf("failed to parse SSE-C headers: %w", err) - } - - if customerKey == nil { - return nil, fmt.Errorf("SSE-C key required but not provided") - } - - // Validate key MD5 from entry metadata - if entry.Extended != nil { - storedKeyMD5 := string(entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]) - if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 { - return nil, fmt.Errorf("SSE-C key mismatch") - } - } - - // Get IV from entry metadata (stored as raw bytes, matching filer behavior) - iv := entry.Extended[s3_constants.SeaweedFSSSEIV] - if len(iv) == 0 { - return nil, fmt.Errorf("SSE-C IV not found in metadata") - } - - // Create decrypted reader - return CreateSSECDecryptedReader(encryptedReader, customerKey, iv) -} - -// createSSEKMSDecryptedReaderFromEntry creates an SSE-KMS decrypted reader from entry metadata -func (s3a *S3ApiServer) createSSEKMSDecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) { - // Extract SSE-KMS metadata from entry (stored as raw bytes, matching filer behavior) - if entry.Extended == nil { - return nil, fmt.Errorf("no extended metadata found") - } - - kmsMetadataBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey] - if !exists { - return nil, fmt.Errorf("SSE-KMS metadata not found") - } - - sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) - if err != nil { - return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err) - } - - // Create decrypted reader - return CreateSSEKMSDecryptedReader(encryptedReader, sseKMSKey) -} - -// createSSES3DecryptedReaderFromEntry creates an SSE-S3 decrypted reader from entry metadata -func (s3a *S3ApiServer) createSSES3DecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) { - // Extract SSE-S3 metadata from entry (stored as raw bytes, matching filer behavior) - if entry.Extended == nil { - return nil, fmt.Errorf("no extended metadata found") - } - - keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key] - if !exists { - return nil, fmt.Errorf("SSE-S3 metadata not found") - } - - keyManager := GetSSES3KeyManager() - sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) - if err != nil { - return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err) - } - - // Get IV - iv, err := GetSSES3IV(entry, sseS3Key, keyManager) - if err != nil { - return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err) - } - - // Create decrypted reader - return CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv) -} - -// setResponseHeaders sets all standard HTTP response headers from entry metadata -func (s3a *S3ApiServer) setResponseHeaders(w http.ResponseWriter, entry *filer_pb.Entry, totalSize int64) { - // Safety check: entry must be valid - if entry == nil { - glog.Errorf("setResponseHeaders: entry is nil") - return - } - - // Set content length and accept ranges - w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) - w.Header().Set("Accept-Ranges", "bytes") - - // Set ETag (but don't overwrite if already set, e.g., for part-specific GET requests) - if w.Header().Get("ETag") == "" { - etag := filer.ETag(entry) - if etag != "" { - w.Header().Set("ETag", "\""+etag+"\"") - } - } - - // Set Last-Modified in RFC1123 format - if entry.Attributes != nil { - modTime := time.Unix(entry.Attributes.Mtime, 0).UTC() - w.Header().Set("Last-Modified", modTime.Format(http.TimeFormat)) - } - - // Set Content-Type - mimeType := "" - if entry.Attributes != nil && entry.Attributes.Mime != "" { - mimeType = entry.Attributes.Mime - } - if mimeType == "" { - // Try to detect from entry name - if entry.Name != "" { - ext := filepath.Ext(entry.Name) - if ext != "" { - mimeType = mime.TypeByExtension(ext) - } - } - } - if mimeType != "" { - w.Header().Set("Content-Type", mimeType) - } else { - w.Header().Set("Content-Type", "application/octet-stream") - } - - // Set custom headers from entry.Extended (user metadata) - // Use direct map assignment to preserve original header casing (matches proxy behavior) - if entry.Extended != nil { - for k, v := range entry.Extended { - // Skip internal SeaweedFS headers - if !strings.HasPrefix(k, "xattr-") && !s3_constants.IsSeaweedFSInternalHeader(k) { - // Support backward compatibility: migrate old non-canonical format to canonical format - // OLD: "x-amz-meta-foo" → NEW: "X-Amz-Meta-foo" (preserving suffix case) - headerKey := k - if len(k) >= 11 && strings.EqualFold(k[:11], "x-amz-meta-") { - // Normalize to AWS S3 format: "X-Amz-Meta-" prefix with lowercase suffix - // AWS S3 returns user metadata with the suffix in lowercase - suffix := k[len("x-amz-meta-"):] - headerKey = s3_constants.AmzUserMetaPrefix + strings.ToLower(suffix) - if glog.V(4) && k != headerKey { - glog.Infof("Normalizing user metadata header %q to %q in response", k, headerKey) - } - } - w.Header()[headerKey] = []string{string(v)} - } - } - } - - // Set tag count header (matches filer logic) - if entry.Extended != nil { - tagCount := 0 - for k := range entry.Extended { - if strings.HasPrefix(k, s3_constants.AmzObjectTagging+"-") { - tagCount++ - } - } - if tagCount > 0 { - w.Header().Set(s3_constants.AmzTagCount, strconv.Itoa(tagCount)) - } - } -} - -// simpleMasterClient implements the minimal interface for streaming -type simpleMasterClient struct { - lookupFn func(ctx context.Context, fileId string) ([]string, error) -} - -func (s *simpleMasterClient) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType { - return s.lookupFn -} - -func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { - - bucket, object := s3_constants.GetBucketAndObject(r) - glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object) - - // Handle directory objects with shared logic - if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "HeadObjectHandler") { - return // Directory object request was handled - } - - // Check conditional headers and handle early return if conditions fail - result, handled := s3a.processConditionalHeaders(w, r, bucket, object, "HeadObjectHandler") - if handled { - return - } - - // Check for specific version ID in query parameters - versionId := r.URL.Query().Get("versionId") - - var ( - entry *filer_pb.Entry // Declare entry at function scope for SSE processing - versioningConfigured bool - err error - ) - - // Check if versioning is configured for the bucket (Enabled or Suspended) - // Note: We need to check this even if versionId is empty, because versioned buckets - // handle even "get latest version" requests differently (through .versions directory) - versioningConfigured, err = s3a.isVersioningConfigured(bucket) - if err != nil { - if err == filer_pb.ErrNotFound { - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) - return - } - glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } - - if versioningConfigured { - // Handle versioned HEAD - all versions are stored in .versions directory - var targetVersionId string + if versioningConfigured { + // Handle versioned HEAD - all versions are stored in .versions directory + var targetVersionId string if versionId != "" { // Request for specific version glog.V(2).Infof("HeadObject: requesting specific version %s for %s%s", versionId, bucket, object) entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) - if err != nil { - glog.Errorf("Failed to get specific version %s: %v", versionId, err) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } - // Safety check: entry must be valid after successful retrieval - if entry == nil { - glog.Errorf("HeadObject: getSpecificObjectVersion returned nil entry without error for version %s", versionId) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } - targetVersionId = versionId - } else { - // Request for latest version - OPTIMIZATION: - // Check if .versions/ directory exists quickly (no retries) to decide path - // - If .versions/ exists: real versions available, use getLatestObjectVersion - // - If .versions/ doesn't exist (ErrNotFound): only null version at regular path, use it directly - // - If transient error: fall back to getLatestObjectVersion which has retry logic - bucketDir := s3a.option.BucketsPath + "/" + bucket - normalizedObject := removeDuplicateSlashes(object) - versionsDir := normalizedObject + s3_constants.VersionsFolder - - // Quick check (no retries) for .versions/ directory - versionsEntry, versionsErr := s3a.getEntry(bucketDir, versionsDir) - - if versionsErr == nil && versionsEntry != nil { - // .versions/ exists, meaning real versions are stored there - // Use getLatestObjectVersion which will properly find the newest version - entry, err = s3a.getLatestObjectVersion(bucket, object) - if err != nil { - glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } - } else if errors.Is(versionsErr, filer_pb.ErrNotFound) { - // .versions/ doesn't exist (confirmed not found), check regular path for null version - regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject) - if regularErr == nil && regularEntry != nil { - // Found object at regular path - this is the null version - entry = regularEntry - targetVersionId = "null" - } else { - // No object at regular path either - object doesn't exist - glog.Errorf("HeadObject: object not found at regular path or .versions for %s%s", bucket, object) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } - } else { - // Transient error checking .versions/, fall back to getLatestObjectVersion with retries - glog.V(2).Infof("HeadObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr) - entry, err = s3a.getLatestObjectVersion(bucket, object) - if err != nil { - glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } + if err != nil { + glog.Errorf("Failed to get specific version %s: %v", versionId, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return } - // Safety check: entry must be valid after successful retrieval - if entry == nil { - glog.Errorf("HeadObject: entry is nil after versioned lookup for %s%s", bucket, object) + targetVersionId = versionId + } else { + // Request for latest version + glog.V(2).Infof("HeadObject: requesting latest version for %s%s", bucket, object) + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("Failed to get latest version: %v", err) s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) return } - // Extract version ID if not already set - if targetVersionId == "" { - if entry.Extended != nil { - if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { - targetVersionId = string(versionIdBytes) - } - } - // If no version ID found in entry, this is a pre-versioning object - if targetVersionId == "" { - targetVersionId = "null" + if entry.Extended != nil { + if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { + targetVersionId = string(versionIdBytes) } } + // If no version ID found in entry, this is a pre-versioning object + if targetVersionId == "" { + targetVersionId = "null" + } } // Check if this is a delete marker @@ -2031,11 +568,16 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request } } - // For versioned objects, log the target version + // Determine the actual file path based on whether this is a versioned or pre-versioning object if targetVersionId == "null" { - glog.V(2).Infof("HeadObject: pre-versioning object %s/%s", bucket, object) + // Pre-versioning object - stored as regular file + destUrl = s3a.toFilerUrl(bucket, object) + glog.V(2).Infof("HeadObject: pre-versioning object URL: %s", destUrl) } else { - glog.V(2).Infof("HeadObject: version %s for %s/%s", targetVersionId, bucket, object) + // Versioned object - stored in .versions directory + versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId) + destUrl = s3a.toFilerUrl(bucket, versionObjectPath) + glog.V(2).Infof("HeadObject: version %s URL: %s", targetVersionId, destUrl) } // Set version ID in response header @@ -2043,6 +585,9 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request // Add object lock metadata to response headers if present s3a.addObjectLockHeadersToResponse(w, entry) + } else { + // Handle regular HEAD (non-versioned) + destUrl = s3a.toFilerUrl(bucket, object) } // Fetch the correct entry for SSE processing (respects versionId) @@ -2050,12 +595,6 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request var objectEntryForSSE *filer_pb.Entry if versioningConfigured { objectEntryForSSE = entry - // Safety check - this should never happen as versioned path handles errors above - if objectEntryForSSE == nil { - glog.Errorf("HeadObjectHandler: unexpected nil entry for versioned object %s/%s", bucket, object) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } } else { // For non-versioned objects, try to reuse entry from conditional header check if result.Entry != nil { @@ -2070,7 +609,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request var fetchErr error objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object) if fetchErr != nil { - glog.Warningf("HeadObjectHandler: failed to get entry for %s/%s: %v", bucket, object, fetchErr) + glog.Errorf("HeadObjectHandler: failed to get entry for SSE check: %v", fetchErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } @@ -2082,97 +621,122 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request } } - // Safety check: entry must be valid - if objectEntryForSSE == nil { - glog.Errorf("HeadObjectHandler: objectEntryForSSE is nil for %s/%s (should not happen)", bucket, object) + s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { + // Handle SSE validation (both SSE-C and SSE-KMS) for HEAD requests + return s3a.handleSSEResponse(r, proxyResponse, w, objectEntryForSSE) + }) +} + +func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, isWrite bool, responseFn func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64)) { + + glog.V(3).Infof("s3 proxying %s to %s", r.Method, destUrl) + start := time.Now() + + proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body) + + if err != nil { + glog.Errorf("NewRequest %s: %v", destUrl, err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - // For HEAD requests, we already have all metadata - just set headers directly - totalSize := int64(filer.FileSize(objectEntryForSSE)) - s3a.setResponseHeaders(w, objectEntryForSSE, totalSize) - - // Check if PartNumber query parameter is present (for multipart objects) - // This logic matches the filer handler for consistency - partNumberStr := r.URL.Query().Get("partNumber") - if partNumberStr == "" { - partNumberStr = r.URL.Query().Get("PartNumber") + proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) + proxyReq.Header.Set("Accept-Encoding", "identity") + for k, v := range r.URL.Query() { + if _, ok := s3_constants.PassThroughHeaders[strings.ToLower(k)]; ok { + proxyReq.Header[k] = v + } + if k == "partNumber" { + proxyReq.Header[s3_constants.SeaweedFSPartNumber] = v + } + } + for header, values := range r.Header { + proxyReq.Header[header] = values + } + if proxyReq.ContentLength == 0 && r.ContentLength != 0 { + proxyReq.ContentLength = r.ContentLength } - // If PartNumber is specified, set headers (matching filer logic) - if partNumberStr != "" { - if partNumber, parseErr := strconv.Atoi(partNumberStr); parseErr == nil && partNumber > 0 { - // Get actual parts count from metadata (not chunk count) - partsCount, partInfo := s3a.getMultipartInfo(objectEntryForSSE, partNumber) + // ensure that the Authorization header is overriding any previous + // Authorization header which might be already present in proxyReq + s3a.maybeAddFilerJwtAuthorization(proxyReq, isWrite) + resp, postErr := s3a.client.Do(proxyReq) - // Validate part number - if partNumber > partsCount { - glog.Warningf("HeadObject: Invalid part number %d, object has %d parts", partNumber, partsCount) - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) - return - } + if postErr != nil { + glog.Errorf("post to filer: %v", postErr) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + defer util_http.CloseResponse(resp) - // Set parts count header - w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(partsCount)) - glog.V(3).Infof("HeadObject: Set PartsCount=%d for part %d", partsCount, partNumber) + if resp.StatusCode == http.StatusPreconditionFailed { + s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed) + return + } - // Override ETag with the part's ETag - if partInfo != nil { - // Use part ETag from metadata (accurate for multi-chunk parts) - w.Header().Set("ETag", "\""+partInfo.ETag+"\"") - glog.V(3).Infof("HeadObject: Override ETag with part %d ETag: %s (from metadata)", partNumber, partInfo.ETag) - } else { - // Fallback: use chunk's ETag (backward compatibility) - chunkIndex := partNumber - 1 - if chunkIndex >= len(objectEntryForSSE.Chunks) { - glog.Warningf("HeadObject: Part %d chunk index %d out of range (chunks: %d)", partNumber, chunkIndex, len(objectEntryForSSE.Chunks)) - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) - return - } - partChunk := objectEntryForSSE.Chunks[chunkIndex] - if partChunk.ETag != "" { - if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil { - partETag := fmt.Sprintf("%x", md5Bytes) - w.Header().Set("ETag", "\""+partETag+"\"") - glog.V(3).Infof("HeadObject: Override ETag with part %d ETag: %s (fallback from chunk)", partNumber, partETag) - } - } - } + if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable { + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) + return + } + + if r.Method == http.MethodDelete { + if resp.StatusCode == http.StatusNotFound { + // this is normal + responseStatusCode, _ := responseFn(resp, w) + s3err.PostLog(r, responseStatusCode, s3err.ErrNone) + return } } + if resp.StatusCode == http.StatusNotFound { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } - // Detect and handle SSE - glog.V(3).Infof("HeadObjectHandler: Retrieved entry for %s%s - %d chunks", bucket, object, len(objectEntryForSSE.Chunks)) - sseType := s3a.detectPrimarySSEType(objectEntryForSSE) - glog.V(2).Infof("HeadObjectHandler: Detected SSE type: %s", sseType) - if sseType != "" && sseType != "None" { - // Validate SSE headers for encrypted objects - switch sseType { - case s3_constants.SSETypeC: - customerKey, err := ParseSSECHeaders(r) - if err != nil { - s3err.WriteErrorResponse(w, r, MapSSECErrorToS3Error(err)) - return - } - if customerKey == nil { - s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) - return - } - // Validate key MD5 - if objectEntryForSSE.Extended != nil { - storedKeyMD5 := string(objectEntryForSSE.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]) - if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 { - s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) - return - } - } + TimeToFirstByte(r.Method, start, r) + if resp.Header.Get(s3_constants.SeaweedFSIsDirectoryKey) == "true" { + responseStatusCode, _ := responseFn(resp, w) + s3err.PostLog(r, responseStatusCode, s3err.ErrNone) + return + } + + if resp.StatusCode == http.StatusInternalServerError { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + // when HEAD a directory, it should be reported as no such key + // https://github.com/seaweedfs/seaweedfs/issues/3457 + if resp.ContentLength == -1 && resp.StatusCode != http.StatusNotModified { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + + if resp.StatusCode == http.StatusBadRequest { + resp_body, _ := io.ReadAll(resp.Body) + switch string(resp_body) { + case "InvalidPart": + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) + default: + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest) } - // Add SSE response headers - s3a.addSSEResponseHeadersFromEntry(w, r, objectEntryForSSE, sseType) + resp.Body.Close() + return } + setUserMetadataKeyToLowercase(resp) - w.WriteHeader(http.StatusOK) + responseStatusCode, bytesTransferred := responseFn(resp, w) + BucketTrafficSent(bytesTransferred, r) + + s3err.PostLog(r, responseStatusCode, s3err.ErrNone) +} + +func setUserMetadataKeyToLowercase(resp *http.Response) { + for key, value := range resp.Header { + if strings.HasPrefix(key, s3_constants.AmzUserMetaPrefix) { + resp.Header[strings.ToLower(key)] = value + delete(resp.Header, key) + } + } } func captureCORSHeaders(w http.ResponseWriter, headersToCapture []string) map[string]string { @@ -2752,11 +1316,6 @@ func (s3a *S3ApiServer) addSSEHeadersToResponse(proxyResponse *http.Response, en // detectPrimarySSEType determines the primary SSE type by examining chunk metadata func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string { - // Safety check: handle nil entry - if entry == nil { - return "None" - } - if len(entry.GetChunks()) == 0 { // No chunks - check object-level metadata only (single objects or smallContent) hasSSEC := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] != nil @@ -2837,243 +1396,9 @@ func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string { return "None" } -// createMultipartSSECDecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-C objects (direct volume path) -// Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O. -// It's kept in the signature for API consistency with non-Direct versions. -func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, customerKey *SSECustomerKey, entry *filer_pb.Entry) (io.Reader, error) { - // Sort chunks by offset to ensure correct order - chunks := entry.GetChunks() - sort.Slice(chunks, func(i, j int) bool { - return chunks[i].GetOffset() < chunks[j].GetOffset() - }) - - // Create readers for each chunk, decrypting them independently - var readers []io.Reader - - for _, chunk := range chunks { - // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) - if err != nil { - return nil, fmt.Errorf("failed to create chunk reader: %v", err) - } - - // Handle based on chunk's encryption type - if chunk.GetSseType() == filer_pb.SSEType_SSE_C { - // Check if this chunk has per-chunk SSE-C metadata - if len(chunk.GetSseMetadata()) == 0 { - chunkReader.Close() - return nil, fmt.Errorf("SSE-C chunk %s missing per-chunk metadata", chunk.GetFileIdString()) - } - - // Deserialize the SSE-C metadata - ssecMetadata, err := DeserializeSSECMetadata(chunk.GetSseMetadata()) - if err != nil { - chunkReader.Close() - return nil, fmt.Errorf("failed to deserialize SSE-C metadata for chunk %s: %v", chunk.GetFileIdString(), err) - } - - // Decode the IV from the metadata - chunkIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV) - if err != nil { - chunkReader.Close() - return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), err) - } - - glog.V(4).Infof("Decrypting SSE-C chunk %s with IV=%x, PartOffset=%d", - chunk.GetFileIdString(), chunkIV[:8], ssecMetadata.PartOffset) - - // Note: SSE-C multipart behavior (differs from SSE-KMS/SSE-S3): - // - Upload: CreateSSECEncryptedReader generates RANDOM IV per part (no base IV + offset) - // - Metadata: PartOffset is stored but not used during encryption - // - Decryption: Use stored random IV directly (no offset adjustment needed) - // - // This differs from: - // - SSE-KMS/SSE-S3: Use base IV + calculateIVWithOffset(partOffset) during encryption - // - CopyObject: Applies calculateIVWithOffset to SSE-C (which may be incorrect) - // - // TODO: Investigate CopyObject SSE-C PartOffset handling for consistency - decryptedChunkReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV) - if decErr != nil { - chunkReader.Close() - return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) - } - - // Use the streaming decrypted reader directly - readers = append(readers, struct { - io.Reader - io.Closer - }{ - Reader: decryptedChunkReader, - Closer: chunkReader, - }) - glog.V(4).Infof("Added streaming decrypted reader for SSE-C chunk %s", chunk.GetFileIdString()) - } else { - // Non-SSE-C chunk, use as-is - readers = append(readers, chunkReader) - glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString()) - } - } - - // Close the original encrypted stream since we're reading chunks individually - if encryptedStream != nil { - encryptedStream.Close() - } - - return NewMultipartSSEReader(readers), nil -} - -// createMultipartSSEKMSDecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-KMS objects (direct volume path) -// Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O. -// It's kept in the signature for API consistency with non-Direct versions. -func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) { - // Sort chunks by offset to ensure correct order - chunks := entry.GetChunks() - sort.Slice(chunks, func(i, j int) bool { - return chunks[i].GetOffset() < chunks[j].GetOffset() - }) - - // Create readers for each chunk, decrypting them independently - var readers []io.Reader - - for _, chunk := range chunks { - // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) - if err != nil { - return nil, fmt.Errorf("failed to create chunk reader: %v", err) - } - - // Handle based on chunk's encryption type - if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS { - // Check if this chunk has per-chunk SSE-KMS metadata - if len(chunk.GetSseMetadata()) == 0 { - chunkReader.Close() - return nil, fmt.Errorf("SSE-KMS chunk %s missing per-chunk metadata", chunk.GetFileIdString()) - } - - // Use the per-chunk SSE-KMS metadata - kmsKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata()) - if err != nil { - chunkReader.Close() - return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err) - } - - glog.V(4).Infof("Decrypting SSE-KMS chunk %s with KeyID=%s", - chunk.GetFileIdString(), kmsKey.KeyID) - - // Create decrypted reader for this chunk - decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, kmsKey) - if decErr != nil { - chunkReader.Close() - return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) - } - - // Use the streaming decrypted reader directly - readers = append(readers, struct { - io.Reader - io.Closer - }{ - Reader: decryptedChunkReader, - Closer: chunkReader, - }) - glog.V(4).Infof("Added streaming decrypted reader for SSE-KMS chunk %s", chunk.GetFileIdString()) - } else { - // Non-SSE-KMS chunk, use as-is - readers = append(readers, chunkReader) - glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString()) - } - } - - // Close the original encrypted stream since we're reading chunks individually - if encryptedStream != nil { - encryptedStream.Close() - } - - return NewMultipartSSEReader(readers), nil -} - -// createMultipartSSES3DecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-S3 objects (direct volume path) -// Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O. -// It's kept in the signature for API consistency with non-Direct versions. -func (s3a *S3ApiServer) createMultipartSSES3DecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) { - // Sort chunks by offset to ensure correct order - chunks := entry.GetChunks() - sort.Slice(chunks, func(i, j int) bool { - return chunks[i].GetOffset() < chunks[j].GetOffset() - }) - - // Create readers for each chunk, decrypting them independently - var readers []io.Reader - - // Get key manager and SSE-S3 key from entry metadata - keyManager := GetSSES3KeyManager() - keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key] - sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) - if err != nil { - return nil, fmt.Errorf("failed to deserialize SSE-S3 key from entry metadata: %v", err) - } - - for _, chunk := range chunks { - // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) - if err != nil { - return nil, fmt.Errorf("failed to create chunk reader: %v", err) - } - - // Handle based on chunk's encryption type - if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 { - // Check if this chunk has per-chunk SSE-S3 metadata - if len(chunk.GetSseMetadata()) == 0 { - chunkReader.Close() - return nil, fmt.Errorf("SSE-S3 chunk %s missing per-chunk metadata", chunk.GetFileIdString()) - } - - // Deserialize the per-chunk SSE-S3 metadata to get the IV - chunkSSES3Metadata, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager) - if err != nil { - chunkReader.Close() - return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata for chunk %s: %v", chunk.GetFileIdString(), err) - } - - // Use the IV from the chunk metadata - iv := chunkSSES3Metadata.IV - glog.V(4).Infof("Decrypting SSE-S3 chunk %s with KeyID=%s, IV length=%d", - chunk.GetFileIdString(), sseS3Key.KeyID, len(iv)) - - // Create decrypted reader for this chunk - decryptedChunkReader, decErr := CreateSSES3DecryptedReader(chunkReader, sseS3Key, iv) - if decErr != nil { - chunkReader.Close() - return nil, fmt.Errorf("failed to decrypt SSE-S3 chunk: %v", decErr) - } - - // Use the streaming decrypted reader directly - readers = append(readers, struct { - io.Reader - io.Closer - }{ - Reader: decryptedChunkReader, - Closer: chunkReader, - }) - glog.V(4).Infof("Added streaming decrypted reader for SSE-S3 chunk %s", chunk.GetFileIdString()) - } else { - // Non-SSE-S3 chunk, use as-is - readers = append(readers, chunkReader) - glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString()) - } - } - - // Close the original encrypted stream since we're reading chunks individually - if encryptedStream != nil { - encryptedStream.Close() - } - - return NewMultipartSSEReader(readers), nil -} - // createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) { // Entry is passed from caller to avoid redundant filer lookup - ctx := r.Context() // Sort chunks by offset to ensure correct order chunks := entry.GetChunks() @@ -3086,7 +1411,7 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, pr for _, chunk := range chunks { // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) + chunkReader, err := s3a.createEncryptedChunkReader(chunk) if err != nil { return nil, fmt.Errorf("failed to create chunk reader: %v", err) } @@ -3135,8 +1460,6 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, pr // createMultipartSSES3DecryptedReader creates a reader for multipart SSE-S3 objects func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, entry *filer_pb.Entry) (io.Reader, error) { - ctx := r.Context() - // Sort chunks by offset to ensure correct order chunks := entry.GetChunks() sort.Slice(chunks, func(i, j int) bool { @@ -3149,7 +1472,7 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, ent for _, chunk := range chunks { // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) + chunkReader, err := s3a.createEncryptedChunkReader(chunk) if err != nil { return nil, fmt.Errorf("failed to create chunk reader: %v", err) } @@ -3216,28 +1539,21 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, ent } // createEncryptedChunkReader creates a reader for a single encrypted chunk -// Context propagation ensures cancellation if the S3 client disconnects -func (s3a *S3ApiServer) createEncryptedChunkReader(ctx context.Context, chunk *filer_pb.FileChunk) (io.ReadCloser, error) { +func (s3a *S3ApiServer) createEncryptedChunkReader(chunk *filer_pb.FileChunk) (io.ReadCloser, error) { // Get chunk URL srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString()) if err != nil { return nil, fmt.Errorf("lookup volume URL for chunk %s: %v", chunk.GetFileIdString(), err) } - // Create HTTP request with context for cancellation propagation - req, err := http.NewRequestWithContext(ctx, "GET", srcUrl, nil) + // Create HTTP request for chunk data + req, err := http.NewRequest("GET", srcUrl, nil) if err != nil { return nil, fmt.Errorf("create HTTP request for chunk: %v", err) } - // Attach volume server JWT for authentication (matches filer behavior) - jwt := security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, chunk.GetFileIdString()) - if jwt != "" { - req.Header.Set("Authorization", "BEARER "+string(jwt)) - } - - // Use shared HTTP client with connection pooling - resp, err := volumeServerHTTPClient.Do(req) + // Execute request + resp, err := http.DefaultClient.Do(req) if err != nil { return nil, fmt.Errorf("execute HTTP request for chunk: %v", err) } @@ -3259,10 +1575,9 @@ type MultipartSSEReader struct { // SSERangeReader applies range logic to an underlying reader type SSERangeReader struct { reader io.Reader - offset int64 // bytes to skip from the beginning - remaining int64 // bytes remaining to read (-1 for unlimited) - skipped int64 // bytes already skipped - skipBuf []byte // reusable buffer for skipping bytes (avoids per-call allocation) + offset int64 // bytes to skip from the beginning + remaining int64 // bytes remaining to read (-1 for unlimited) + skipped int64 // bytes already skipped } // NewMultipartSSEReader creates a new multipart reader that can properly close all underlying readers @@ -3294,28 +1609,22 @@ func (m *MultipartSSEReader) Close() error { // Read implements the io.Reader interface for SSERangeReader func (r *SSERangeReader) Read(p []byte) (n int, err error) { - // Skip bytes iteratively (no recursion) until we reach the offset - for r.skipped < r.offset { - skipNeeded := r.offset - r.skipped - - // Lazily allocate skip buffer on first use, reuse thereafter - if r.skipBuf == nil { - // Use a fixed 32KB buffer for skipping (avoids per-call allocation) - r.skipBuf = make([]byte, 32*1024) - } - - // Determine how much to skip in this iteration - bufSize := int64(len(r.skipBuf)) - if skipNeeded < bufSize { - bufSize = skipNeeded - } - skipRead, skipErr := r.reader.Read(r.skipBuf[:bufSize]) + // If we need to skip bytes and haven't skipped enough yet + if r.skipped < r.offset { + skipNeeded := r.offset - r.skipped + skipBuf := make([]byte, min(int64(len(p)), skipNeeded)) + skipRead, skipErr := r.reader.Read(skipBuf) r.skipped += int64(skipRead) if skipErr != nil { return 0, skipErr } + + // If we still need to skip more, recurse + if r.skipped < r.offset { + return r.Read(p) + } } // If we have a remaining limit and it's reached @@ -3341,8 +1650,6 @@ func (r *SSERangeReader) Read(p []byte) (n int, err error) { // createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects // Each chunk has its own IV and encryption key from the original multipart parts func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) { - ctx := r.Context() - // Parse SSE-C headers from the request for decryption key customerKey, err := ParseSSECHeaders(r) if err != nil { @@ -3402,7 +1709,7 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox for _, chunk := range neededChunks { // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) + chunkReader, err := s3a.createEncryptedChunkReader(chunk) if err != nil { return nil, fmt.Errorf("failed to create chunk reader: %v", err) } @@ -3422,10 +1729,16 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), ivErr) } - // Note: For multipart SSE-C, each part was encrypted with offset=0 - // So we use the stored IV directly without offset adjustment - // PartOffset is stored for informational purposes, but encryption uses offset=0 - chunkIV := iv + // Calculate the correct IV for this chunk using within-part offset + var chunkIV []byte + if ssecMetadata.PartOffset > 0 { + var skip int + chunkIV, skip = calculateIVWithOffset(iv, ssecMetadata.PartOffset) + // The caller handles range alignment, so we can safely ignore the skip value for SSE-C chunks. + _ = skip + } else { + chunkIV = iv + } decryptedReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV) if decErr != nil { @@ -3465,55 +1778,3 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox return multiReader, nil } - -// PartBoundaryInfo holds information about a part's chunk boundaries -type PartBoundaryInfo struct { - PartNumber int `json:"part"` - StartChunk int `json:"start"` - EndChunk int `json:"end"` // exclusive - ETag string `json:"etag"` -} - -// rc is a helper type that wraps a Reader and Closer for proper resource cleanup -type rc struct { - io.Reader - io.Closer -} - -// getMultipartInfo retrieves multipart metadata for a given part number -// Returns: (partsCount, partInfo) -// - partsCount: total number of parts in the multipart object -// - partInfo: boundary information for the requested part (nil if not found or not a multipart object) -func (s3a *S3ApiServer) getMultipartInfo(entry *filer_pb.Entry, partNumber int) (int, *PartBoundaryInfo) { - if entry == nil { - return 0, nil - } - if entry.Extended == nil { - // Not a multipart object or no metadata - return len(entry.GetChunks()), nil - } - - // Try to get parts count from metadata - partsCount := len(entry.GetChunks()) // default fallback - if partsCountBytes, exists := entry.Extended[s3_constants.SeaweedFSMultipartPartsCount]; exists { - if count, err := strconv.Atoi(string(partsCountBytes)); err == nil && count > 0 { - partsCount = count - } - } - - // Try to get part boundaries from metadata - if boundariesJSON, exists := entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries]; exists { - var boundaries []PartBoundaryInfo - if err := json.Unmarshal(boundariesJSON, &boundaries); err == nil { - // Find the requested part - for i := range boundaries { - if boundaries[i].PartNumber == partNumber { - return partsCount, &boundaries[i] - } - } - } - } - - // No part boundaries metadata or part not found - return partsCount, nil -} diff --git a/weed/s3api/s3api_object_handlers_multipart.go b/weed/s3api/s3api_object_handlers_multipart.go index 07d484d80..d1ba8eea3 100644 --- a/weed/s3api/s3api_object_handlers_multipart.go +++ b/weed/s3api/s3api_object_handlers_multipart.go @@ -403,7 +403,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ glog.V(2).Infof("PutObjectPart: bucket=%s, object=%s, uploadId=%s, partNumber=%d, size=%d", bucket, object, uploadID, partID, r.ContentLength) - etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, partID) + etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, partID) if errCode != s3err.ErrNone { glog.Errorf("PutObjectPart: putToFiler failed with error code %v for bucket=%s, object=%s, partNumber=%d", errCode, bucket, object, partID) @@ -412,12 +412,14 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ } glog.V(2).Infof("PutObjectPart: SUCCESS - bucket=%s, object=%s, partNumber=%d, etag=%s, sseType=%s", - bucket, object, partID, etag, sseMetadata.SSEType) + bucket, object, partID, etag, sseType) setEtag(w, etag) // Set SSE response headers for multipart uploads - s3a.setSSEResponseHeaders(w, r, sseMetadata) + if sseType == s3_constants.SSETypeS3 { + w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) + } writeSuccessResponseEmpty(w, r) diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go index ecb2ac8d1..c392d98f0 100644 --- a/weed/s3api/s3api_object_handlers_postpolicy.go +++ b/weed/s3api/s3api_object_handlers_postpolicy.go @@ -136,7 +136,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R } } - etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, fileBody, bucket, 1) + etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, fileBody, "", bucket, 1) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) @@ -153,7 +153,9 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R setEtag(w, etag) // Include SSE response headers (important for bucket-default encryption) - s3a.setSSEResponseHeaders(w, r, sseMetadata) + if sseType == s3_constants.SSETypeS3 { + w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) + } // Decide what http response to send depending on success_action_status parameter switch successStatus { diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 6181c34d6..6ce48429f 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -1,7 +1,6 @@ package s3api import ( - "context" "crypto/md5" "encoding/base64" "encoding/json" @@ -9,21 +8,18 @@ import ( "fmt" "io" "net/http" - "net/url" - "path/filepath" "strconv" "strings" "time" "github.com/pquerna/cachecontrol/cacheobject" - "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/security" + weed_server "github.com/seaweedfs/seaweedfs/weed/server" stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util/constants" ) @@ -64,13 +60,6 @@ type BucketDefaultEncryptionResult struct { SSEKMSKey *SSEKMSKey } -// SSEResponseMetadata holds encryption metadata needed for HTTP response headers -type SSEResponseMetadata struct { - SSEType string - KMSKeyID string - BucketKeyEnabled bool -} - func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) { // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html @@ -146,7 +135,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) versioningEnabled := (versioningState == s3_constants.VersioningEnabled) versioningConfigured := (versioningState != "") - glog.V(3).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured) + glog.V(2).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured) // Validate object lock headers before processing if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil { @@ -169,34 +158,29 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) switch versioningState { case s3_constants.VersioningEnabled: // Handle enabled versioning - create new versions with real version IDs - glog.V(3).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object) - versionId, etag, errCode, sseMetadata := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType) + glog.V(0).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object) + versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType) if errCode != s3err.ErrNone { glog.Errorf("PutObjectHandler: putVersionedObject failed with errCode=%v for %s/%s", errCode, bucket, object) s3err.WriteErrorResponse(w, r, errCode) return } - glog.V(3).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object) + glog.V(0).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object) // Set version ID in response header if versionId != "" { w.Header().Set("x-amz-version-id", versionId) - glog.V(3).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object) + glog.V(0).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object) } else { glog.Errorf("PutObjectHandler: CRITICAL - versionId is EMPTY for versioned bucket %s, object %s", bucket, object) } // Set ETag in response setEtag(w, etag) - - // Set SSE response headers for versioned objects - s3a.setSSEResponseHeaders(w, r, sseMetadata) - case s3_constants.VersioningSuspended: // Handle suspended versioning - overwrite with "null" version ID but preserve existing versions - glog.V(3).Infof("PutObjectHandler: SUSPENDED versioning detected for %s/%s, calling putSuspendedVersioningObject", bucket, object) - etag, errCode, sseMetadata := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType) + etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return @@ -207,9 +191,6 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) // Set ETag in response setEtag(w, etag) - - // Set SSE response headers for suspended versioning - s3a.setSSEResponseHeaders(w, r, sseMetadata) default: // Handle regular PUT (never configured versioning) uploadUrl := s3a.toFilerUrl(bucket, object) @@ -217,7 +198,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) dataReader = mimeDetect(r, dataReader) } - etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, 1) + etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, 1) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) @@ -228,7 +209,9 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) setEtag(w, etag) // Set SSE response headers based on encryption type used - s3a.setSSEResponseHeaders(w, r, sseMetadata) + if sseType == s3_constants.SSETypeS3 { + w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) + } } } stats_collect.RecordBucketActiveTime(bucket) @@ -237,18 +220,15 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) writeSuccessResponseEmpty(w, r) } -func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) { - // NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy - // This eliminates the filer proxy overhead for PUT operations +func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseType string) { + // Calculate unique offset for each part to prevent IV reuse in multipart uploads + // This is critical for CTR mode encryption security + partOffset := calculatePartOffset(partNumber) - // For SSE, encrypt with offset=0 for all parts - // Each part is encrypted independently, then decrypted using metadata during GET - partOffset := int64(0) - - // Handle all SSE encryption types in a unified manner + // Handle all SSE encryption types in a unified manner to eliminate repetitive dataReader assignments sseResult, sseErrorCode := s3a.handleAllSSEEncryption(r, dataReader, partOffset) if sseErrorCode != s3err.ErrNone { - return "", sseErrorCode, SSEResponseMetadata{} + return "", sseErrorCode, "" } // Extract results from unified SSE handling @@ -269,7 +249,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader encryptionResult, applyErr := s3a.applyBucketDefaultEncryption(bucket, r, dataReader) if applyErr != nil { glog.Errorf("Failed to apply bucket default encryption: %v", applyErr) - return "", s3err.ErrInternalError, SSEResponseMetadata{} + return "", s3err.ErrInternalError, "" } // Update variables based on the result @@ -283,289 +263,115 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader sseS3Metadata, metaErr = SerializeSSES3Metadata(sseS3Key) if metaErr != nil { glog.Errorf("Failed to serialize SSE-S3 metadata for bucket default encryption: %v", metaErr) - return "", s3err.ErrInternalError, SSEResponseMetadata{} + return "", s3err.ErrInternalError, "" } } } else { glog.V(4).Infof("putToFiler: explicit encryption already applied, skipping bucket default encryption") } - // Parse the upload URL to extract the file path - // uploadUrl format: http://filer:8888/path/to/bucket/object (or https://, IPv6, etc.) - // Use proper URL parsing instead of string manipulation for robustness - parsedUrl, parseErr := url.Parse(uploadUrl) - if parseErr != nil { - glog.Errorf("putToFiler: failed to parse uploadUrl %q: %v", uploadUrl, parseErr) - return "", s3err.ErrInternalError, SSEResponseMetadata{} - } - - // Use parsedUrl.Path directly - it's already decoded by url.Parse() - // Per Go documentation: "Path is stored in decoded form: /%47%6f%2f becomes /Go/" - // Calling PathUnescape again would double-decode and fail on keys like "b%ar" - filePath := parsedUrl.Path + hash := md5.New() + var body = io.TeeReader(dataReader, hash) - // Step 1 & 2: Use auto-chunking to handle large files without OOM - // This splits large uploads into 8MB chunks, preventing memory issues on both S3 API and volume servers - const chunkSize = 8 * 1024 * 1024 // 8MB chunks (S3 standard) - const smallFileLimit = 256 * 1024 // 256KB - store inline in filer + proxyReq, err := http.NewRequest(http.MethodPut, uploadUrl, body) - collection := "" - if s3a.option.FilerGroup != "" { - collection = s3a.getCollectionName(bucket) - } - - // Create assign function for chunked upload - assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) { - var assignResult *filer_pb.AssignVolumeResponse - err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.AssignVolume(ctx, &filer_pb.AssignVolumeRequest{ - Count: int32(count), - Replication: "", - Collection: collection, - DiskType: "", - DataCenter: s3a.option.DataCenter, - Path: filePath, - }) - if err != nil { - return fmt.Errorf("assign volume: %w", err) - } - if resp.Error != "" { - return fmt.Errorf("assign volume: %v", resp.Error) - } - assignResult = resp - return nil - }) - if err != nil { - return nil, nil, err - } - - // Convert filer_pb.AssignVolumeResponse to operation.AssignResult - return nil, &operation.AssignResult{ - Fid: assignResult.FileId, - Url: assignResult.Location.Url, - PublicUrl: assignResult.Location.PublicUrl, - Count: uint64(count), - Auth: security.EncodedJwt(assignResult.Auth), - }, nil - } - - // Upload with auto-chunking - // Use context.Background() to ensure chunk uploads complete even if HTTP request is cancelled - // This prevents partial uploads and data corruption - chunkResult, err := operation.UploadReaderInChunks(context.Background(), dataReader, &operation.ChunkedUploadOption{ - ChunkSize: chunkSize, - SmallFileLimit: smallFileLimit, - Collection: collection, - DataCenter: s3a.option.DataCenter, - SaveSmallInline: false, // S3 API always creates chunks, never stores inline - MimeType: r.Header.Get("Content-Type"), - AssignFunc: assignFunc, - }) if err != nil { - glog.Errorf("putToFiler: chunked upload failed: %v", err) - if strings.Contains(err.Error(), s3err.ErrMsgPayloadChecksumMismatch) { - return "", s3err.ErrInvalidDigest, SSEResponseMetadata{} - } - return "", s3err.ErrInternalError, SSEResponseMetadata{} + glog.Errorf("NewRequest %s: %v", uploadUrl, err) + return "", s3err.ErrInternalError, "" } - // Step 3: Calculate MD5 hash and add SSE metadata to chunks - md5Sum := chunkResult.Md5Hash.Sum(nil) - - glog.V(4).Infof("putToFiler: Chunked upload SUCCESS - path=%s, chunks=%d, size=%d", - filePath, len(chunkResult.FileChunks), chunkResult.TotalSize) - - // Log chunk details for debugging (verbose only - high frequency) - if glog.V(4) { - for i, chunk := range chunkResult.FileChunks { - glog.Infof(" PUT Chunk[%d]: fid=%s, offset=%d, size=%d", i, chunk.GetFileIdString(), chunk.Offset, chunk.Size) - } + proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) + if destination != "" { + proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination) } - // Add SSE metadata to all chunks if present - if customerKey != nil { - // SSE-C: Create per-chunk metadata (matches filer logic) - for _, chunk := range chunkResult.FileChunks { - chunk.SseType = filer_pb.SSEType_SSE_C - if len(sseIV) > 0 { - // PartOffset tracks position within the encrypted stream - // Since ALL uploads (single-part and multipart parts) encrypt starting from offset 0, - // PartOffset = chunk.Offset represents where this chunk is in that encrypted stream - // - Single-part: chunk.Offset is position in the file's encrypted stream - // - Multipart: chunk.Offset is position in this part's encrypted stream - ssecMetadataStruct := struct { - Algorithm string `json:"algorithm"` - IV string `json:"iv"` - KeyMD5 string `json:"keyMD5"` - PartOffset int64 `json:"partOffset"` - }{ - Algorithm: "AES256", - IV: base64.StdEncoding.EncodeToString(sseIV), - KeyMD5: customerKey.KeyMD5, - PartOffset: chunk.Offset, // Position within the encrypted stream (always encrypted from 0) - } - if ssecMetadata, serErr := json.Marshal(ssecMetadataStruct); serErr == nil { - chunk.SseMetadata = ssecMetadata - } - } - } - } else if sseKMSKey != nil { - // SSE-KMS: Store serialized metadata in all chunks - for _, chunk := range chunkResult.FileChunks { - chunk.SseType = filer_pb.SSEType_SSE_KMS - chunk.SseMetadata = sseKMSMetadata - } - } else if sseS3Key != nil { - // SSE-S3: Store serialized metadata in all chunks - for _, chunk := range chunkResult.FileChunks { - chunk.SseType = filer_pb.SSEType_SSE_S3 - chunk.SseMetadata = sseS3Metadata - } + if s3a.option.FilerGroup != "" { + query := proxyReq.URL.Query() + query.Add("collection", s3a.getCollectionName(bucket)) + proxyReq.URL.RawQuery = query.Encode() } - // Step 4: Create metadata entry - now := time.Now() - mimeType := r.Header.Get("Content-Type") - if mimeType == "" { - mimeType = "application/octet-stream" - } - - // Create entry - entry := &filer_pb.Entry{ - Name: filepath.Base(filePath), - IsDirectory: false, - Attributes: &filer_pb.FuseAttributes{ - Crtime: now.Unix(), - Mtime: now.Unix(), - FileMode: 0660, - Uid: 0, - Gid: 0, - Mime: mimeType, - FileSize: uint64(chunkResult.TotalSize), - }, - Chunks: chunkResult.FileChunks, // All chunks from auto-chunking - Extended: make(map[string][]byte), - } - - // Set Md5 attribute based on context: - // 1. For multipart upload PARTS (stored in .uploads/ directory): ALWAYS set Md5 - // - Parts must use simple MD5 ETags, never composite format - // - Even if a part has multiple chunks internally, its ETag is MD5 of entire part - // 2. For regular object uploads: only set Md5 for single-chunk uploads - // - Multi-chunk regular objects use composite "md5-count" format - isMultipartPart := strings.Contains(uploadUrl, "/.uploads/") - if isMultipartPart || len(chunkResult.FileChunks) == 1 { - entry.Attributes.Md5 = md5Sum - } - - // Calculate ETag using the same logic as GET to ensure consistency - // For single chunk: uses entry.Attributes.Md5 - // For multiple chunks: uses filer.ETagChunks() which returns "-" - etag = filer.ETag(entry) - glog.V(4).Infof("putToFiler: Calculated ETag=%s for %d chunks", etag, len(chunkResult.FileChunks)) - - // Set object owner - amzAccountId := r.Header.Get(s3_constants.AmzAccountId) - if amzAccountId != "" { - entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId) - glog.V(2).Infof("putToFiler: setting owner %s for object %s", amzAccountId, filePath) + for header, values := range r.Header { + for _, value := range values { + proxyReq.Header.Add(header, value) + } } - // Set version ID if present - if versionIdHeader := r.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" { - entry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionIdHeader) - glog.V(3).Infof("putToFiler: setting version ID %s for object %s", versionIdHeader, filePath) + // Log version ID header for debugging + if versionIdHeader := proxyReq.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" { + glog.V(0).Infof("putToFiler: version ID header set: %s=%s for %s", s3_constants.ExtVersionIdKey, versionIdHeader, uploadUrl) } - // Set TTL-based S3 expiry - entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") - - // Copy user metadata and standard headers - for k, v := range r.Header { - if len(v) > 0 && len(v[0]) > 0 { - if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) { - // Go's HTTP server canonicalizes headers (e.g., x-amz-meta-foo → X-Amz-Meta-Foo) - // We store them as they come in (after canonicalization) to preserve the user's intent - entry.Extended[k] = []byte(v[0]) - } else if k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" { - entry.Extended[k] = []byte(v[0]) - } - if k == "Response-Content-Disposition" { - entry.Extended["Content-Disposition"] = []byte(v[0]) - } - } + // Set object owner header for filer to extract + amzAccountId := r.Header.Get(s3_constants.AmzAccountId) + if amzAccountId != "" { + proxyReq.Header.Set(s3_constants.ExtAmzOwnerKey, amzAccountId) + glog.V(2).Infof("putToFiler: setting owner header %s for object %s", amzAccountId, uploadUrl) } - // Set SSE-C metadata + // Set SSE-C metadata headers for the filer if encryption was applied if customerKey != nil && len(sseIV) > 0 { - // Store IV as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes) - entry.Extended[s3_constants.SeaweedFSSSEIV] = sseIV - entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256") - entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(customerKey.KeyMD5) - glog.V(3).Infof("putToFiler: storing SSE-C metadata - IV len=%d", len(sseIV)) + proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, "AES256") + proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, customerKey.KeyMD5) + // Store IV in a custom header that the filer can use to store in entry metadata + proxyReq.Header.Set(s3_constants.SeaweedFSSSEIVHeader, base64.StdEncoding.EncodeToString(sseIV)) } - // Set SSE-KMS metadata + // Set SSE-KMS metadata headers for the filer if KMS encryption was applied if sseKMSKey != nil { - // Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes) - entry.Extended[s3_constants.SeaweedFSSSEKMSKey] = sseKMSMetadata - // Set standard SSE headers for detection - entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("aws:kms") - entry.Extended[s3_constants.AmzServerSideEncryptionAwsKmsKeyId] = []byte(sseKMSKey.KeyID) - glog.V(3).Infof("putToFiler: storing SSE-KMS metadata - keyID=%s, raw len=%d", sseKMSKey.KeyID, len(sseKMSMetadata)) + // Use already-serialized SSE-KMS metadata from helper function + // Store serialized KMS metadata in a custom header that the filer can use + proxyReq.Header.Set(s3_constants.SeaweedFSSSEKMSKeyHeader, base64.StdEncoding.EncodeToString(sseKMSMetadata)) + + glog.V(3).Infof("putToFiler: storing SSE-KMS metadata for object %s with keyID %s", uploadUrl, sseKMSKey.KeyID) + } else { + glog.V(4).Infof("putToFiler: no SSE-KMS encryption detected") } - // Set SSE-S3 metadata + // Set SSE-S3 metadata headers for the filer if S3 encryption was applied if sseS3Key != nil && len(sseS3Metadata) > 0 { - // Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes) - entry.Extended[s3_constants.SeaweedFSSSES3Key] = sseS3Metadata - // Set standard SSE header for detection - entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("AES256") - glog.V(3).Infof("putToFiler: storing SSE-S3 metadata - keyID=%s, raw len=%d", sseS3Key.KeyID, len(sseS3Metadata)) - } - - // Step 4: Save metadata to filer via gRPC - // Use context.Background() to ensure metadata save completes even if HTTP request is cancelled - // This matches the chunk upload behavior and prevents orphaned chunks - glog.V(3).Infof("putToFiler: About to create entry - dir=%s, name=%s, chunks=%d, extended keys=%d", - filepath.Dir(filePath), filepath.Base(filePath), len(entry.Chunks), len(entry.Extended)) - createErr := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - req := &filer_pb.CreateEntryRequest{ - Directory: filepath.Dir(filePath), - Entry: entry, + // Store serialized S3 metadata in a custom header that the filer can use + proxyReq.Header.Set(s3_constants.SeaweedFSSSES3Key, base64.StdEncoding.EncodeToString(sseS3Metadata)) + glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", uploadUrl, sseS3Key.KeyID) + } + // Set TTL-based S3 expiry (modification time) + proxyReq.Header.Set(s3_constants.SeaweedFSExpiresS3, "true") + // ensure that the Authorization header is overriding any previous + // Authorization header which might be already present in proxyReq + s3a.maybeAddFilerJwtAuthorization(proxyReq, true) + resp, postErr := s3a.client.Do(proxyReq) + + if postErr != nil { + glog.Errorf("post to filer: %v", postErr) + if strings.Contains(postErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) { + return "", s3err.ErrInvalidDigest, "" } - glog.V(3).Infof("putToFiler: Calling CreateEntry for %s", filePath) - _, err := client.CreateEntry(context.Background(), req) - if err != nil { - glog.Errorf("putToFiler: CreateEntry returned error: %v", err) - } - return err - }) - if createErr != nil { - glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr) - return "", filerErrorToS3Error(createErr.Error()), SSEResponseMetadata{} + return "", s3err.ErrInternalError, "" } - glog.V(3).Infof("putToFiler: CreateEntry SUCCESS for %s", filePath) - - glog.V(2).Infof("putToFiler: Metadata saved SUCCESS - path=%s, etag(hex)=%s, size=%d, partNumber=%d", - filePath, etag, entry.Attributes.FileSize, partNumber) + defer resp.Body.Close() - BucketTrafficReceived(chunkResult.TotalSize, r) + etag = fmt.Sprintf("%x", hash.Sum(nil)) - // Build SSE response metadata with encryption details - responseMetadata := SSEResponseMetadata{ - SSEType: sseResult.SSEType, + resp_body, ra_err := io.ReadAll(resp.Body) + if ra_err != nil { + glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err) + return etag, s3err.ErrInternalError, "" } - - // For SSE-KMS, include key ID and bucket-key-enabled flag from stored metadata - if sseKMSKey != nil { - responseMetadata.KMSKeyID = sseKMSKey.KeyID - responseMetadata.BucketKeyEnabled = sseKMSKey.BucketKeyEnabled - glog.V(4).Infof("putToFiler: returning SSE-KMS metadata - keyID=%s, bucketKeyEnabled=%v", - sseKMSKey.KeyID, sseKMSKey.BucketKeyEnabled) + var ret weed_server.FilerPostResult + unmarshal_err := json.Unmarshal(resp_body, &ret) + if unmarshal_err != nil { + glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body)) + return "", s3err.ErrInternalError, "" } + if ret.Error != "" { + glog.Errorf("upload to filer error: %v", ret.Error) + return "", filerErrorToS3Error(ret.Error), "" + } + + BucketTrafficReceived(ret.Size, r) - return etag, s3err.ErrNone, responseMetadata + // Return the SSE type determined by the unified handler + return etag, s3err.ErrNone, sseResult.SSEType } func setEtag(w http.ResponseWriter, etag string) { @@ -578,43 +384,6 @@ func setEtag(w http.ResponseWriter, etag string) { } } -// setSSEResponseHeaders sets appropriate SSE response headers based on encryption type -func (s3a *S3ApiServer) setSSEResponseHeaders(w http.ResponseWriter, r *http.Request, sseMetadata SSEResponseMetadata) { - switch sseMetadata.SSEType { - case s3_constants.SSETypeS3: - // SSE-S3: Return the encryption algorithm - w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) - - case s3_constants.SSETypeC: - // SSE-C: Echo back the customer-provided algorithm and key MD5 - if algo := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm); algo != "" { - w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, algo) - } - if keyMD5 := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5); keyMD5 != "" { - w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, keyMD5) - } - - case s3_constants.SSETypeKMS: - // SSE-KMS: Return the KMS key ID and algorithm - w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms") - - // Use metadata from stored encryption config (for bucket-default encryption) - // or fall back to request headers (for explicit encryption) - if sseMetadata.KMSKeyID != "" { - w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, sseMetadata.KMSKeyID) - } else if keyID := r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId); keyID != "" { - w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, keyID) - } - - // Set bucket-key-enabled header if it was enabled - if sseMetadata.BucketKeyEnabled { - w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true") - } else if bucketKeyEnabled := r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled); bucketKeyEnabled == "true" { - w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true") - } - } -} - func filerErrorToS3Error(errString string) s3err.ErrorCode { switch { case errString == constants.ErrMsgBadDigest: @@ -677,18 +446,18 @@ func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_ // // For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory, // while existing versions from when versioning was enabled remain preserved in the .versions subdirectory. -func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) { +func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) { // Normalize object path to ensure consistency with toFilerUrl behavior normalizedObject := removeDuplicateSlashes(object) // Enable detailed logging for testobjbar isTestObj := (normalizedObject == "testobjbar") - glog.V(3).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s, isTestObj=%v", + glog.V(0).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s, isTestObj=%v", bucket, object, normalizedObject, isTestObj) if isTestObj { - glog.V(3).Infof("=== TESTOBJBAR: putSuspendedVersioningObject START ===") + glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject START ===") } bucketDir := s3a.option.BucketsPath + "/" + bucket @@ -701,20 +470,20 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob entries, _, err := s3a.list(versionsDir, "", "", false, 1000) if err == nil { // .versions directory exists - glog.V(3).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object) + glog.V(0).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object) for _, entry := range entries { if entry.Extended != nil { if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok { versionId := string(versionIdBytes) - glog.V(3).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId) + glog.V(0).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId) if versionId == "null" { // Only delete null version - preserve real versioned entries - glog.V(3).Infof("putSuspendedVersioningObject: deleting null version from .versions") + glog.V(0).Infof("putSuspendedVersioningObject: deleting null version from .versions") err := s3a.rm(versionsDir, entry.Name, true, false) if err != nil { glog.Warningf("putSuspendedVersioningObject: failed to delete null version: %v", err) } else { - glog.V(3).Infof("putSuspendedVersioningObject: successfully deleted null version") + glog.V(0).Infof("putSuspendedVersioningObject: successfully deleted null version") } break } @@ -722,7 +491,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob } } } else { - glog.V(3).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object) + glog.V(0).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object) } uploadUrl := s3a.toFilerUrl(bucket, normalizedObject) @@ -740,7 +509,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob // Set version ID to "null" for suspended versioning r.Header.Set(s3_constants.ExtVersionIdKey, "null") if isTestObj { - glog.V(3).Infof("=== TESTOBJBAR: set version header before putToFiler, r.Header[%s]=%s ===", + glog.V(0).Infof("=== TESTOBJBAR: set version header before putToFiler, r.Header[%s]=%s ===", s3_constants.ExtVersionIdKey, r.Header.Get(s3_constants.ExtVersionIdKey)) } @@ -759,7 +528,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate) if err != nil { glog.Errorf("putSuspendedVersioningObject: failed to parse retention until date: %v", err) - return "", s3err.ErrInvalidRequest, SSEResponseMetadata{} + return "", s3err.ErrInvalidRequest } r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10)) glog.V(2).Infof("putSuspendedVersioningObject: setting retention until date header (timestamp: %d)", parsedTime.Unix()) @@ -771,7 +540,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob glog.V(2).Infof("putSuspendedVersioningObject: setting legal hold header: %s", legalHold) } else { glog.Errorf("putSuspendedVersioningObject: invalid legal hold value: %s", legalHold) - return "", s3err.ErrInvalidRequest, SSEResponseMetadata{} + return "", s3err.ErrInvalidRequest } } @@ -794,15 +563,15 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob // Upload the file using putToFiler - this will create the file with version metadata if isTestObj { - glog.V(3).Infof("=== TESTOBJBAR: calling putToFiler ===") + glog.V(0).Infof("=== TESTOBJBAR: calling putToFiler ===") } - etag, errCode, sseMetadata = s3a.putToFiler(r, uploadUrl, body, bucket, 1) + etag, errCode, _ = s3a.putToFiler(r, uploadUrl, body, "", bucket, 1) if errCode != s3err.ErrNone { glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode) - return "", errCode, SSEResponseMetadata{} + return "", errCode } if isTestObj { - glog.V(3).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag) + glog.V(0).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag) } // Verify the metadata was set correctly during file creation @@ -812,19 +581,19 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob for attempt := 1; attempt <= maxRetries; attempt++ { verifyEntry, verifyErr := s3a.getEntry(bucketDir, normalizedObject) if verifyErr == nil { - glog.V(3).Infof("=== TESTOBJBAR: verify attempt %d, entry.Extended=%v ===", attempt, verifyEntry.Extended) + glog.V(0).Infof("=== TESTOBJBAR: verify attempt %d, entry.Extended=%v ===", attempt, verifyEntry.Extended) if verifyEntry.Extended != nil { if versionIdBytes, ok := verifyEntry.Extended[s3_constants.ExtVersionIdKey]; ok { - glog.V(3).Infof("=== TESTOBJBAR: verification SUCCESSFUL, version=%s ===", string(versionIdBytes)) + glog.V(0).Infof("=== TESTOBJBAR: verification SUCCESSFUL, version=%s ===", string(versionIdBytes)) } else { - glog.V(3).Infof("=== TESTOBJBAR: verification FAILED, ExtVersionIdKey not found ===") + glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, ExtVersionIdKey not found ===") } } else { - glog.V(3).Infof("=== TESTOBJBAR: verification FAILED, Extended is nil ===") + glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, Extended is nil ===") } break } else { - glog.V(3).Infof("=== TESTOBJBAR: getEntry failed on attempt %d: %v ===", attempt, verifyErr) + glog.V(0).Infof("=== TESTOBJBAR: getEntry failed on attempt %d: %v ===", attempt, verifyErr) } if attempt < maxRetries { time.Sleep(time.Millisecond * 10) @@ -841,9 +610,9 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object) if isTestObj { - glog.V(3).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===") + glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===") } - return etag, s3err.ErrNone, sseMetadata + return etag, s3err.ErrNone } // updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers @@ -915,7 +684,7 @@ func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object return nil } -func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) { +func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) { // Generate version ID versionId = generateVersionId() @@ -940,7 +709,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin }) if err != nil { glog.Errorf("putVersionedObject: failed to create .versions directory: %v", err) - return "", "", s3err.ErrInternalError, SSEResponseMetadata{} + return "", "", s3err.ErrInternalError } hash := md5.New() @@ -951,10 +720,10 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl) - etag, errCode, sseMetadata = s3a.putToFiler(r, versionUploadUrl, body, bucket, 1) + etag, errCode, _ = s3a.putToFiler(r, versionUploadUrl, body, "", bucket, 1) if errCode != s3err.ErrNone { glog.Errorf("putVersionedObject: failed to upload version: %v", errCode) - return "", "", errCode, SSEResponseMetadata{} + return "", "", errCode } // Get the uploaded entry to add versioning metadata @@ -976,7 +745,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin if err != nil { glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err) - return "", "", s3err.ErrInternalError, SSEResponseMetadata{} + return "", "", s3err.ErrInternalError } // Add versioning metadata to this version @@ -997,7 +766,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // Extract and store object lock metadata from request headers if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil { glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err) - return "", "", s3err.ErrInvalidRequest, SSEResponseMetadata{} + return "", "", s3err.ErrInvalidRequest } // Update the version entry with metadata @@ -1008,17 +777,17 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin }) if err != nil { glog.Errorf("putVersionedObject: failed to update version metadata: %v", err) - return "", "", s3err.ErrInternalError, SSEResponseMetadata{} + return "", "", s3err.ErrInternalError } // Update the .versions directory metadata to indicate this is the latest version err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName) if err != nil { glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) - return "", "", s3err.ErrInternalError, SSEResponseMetadata{} + return "", "", s3err.ErrInternalError } glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject) - return versionId, etag, s3err.ErrNone, sseMetadata + return versionId, etag, s3err.ErrNone } // updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version @@ -1194,8 +963,7 @@ func (s3a *S3ApiServer) applySSEKMSDefaultEncryption(bucket string, r *http.Requ bucketKeyEnabled := encryptionConfig.BucketKeyEnabled // Build encryption context for KMS - // Use bucket parameter passed to function (not from request parsing) - _, object := s3_constants.GetBucketAndObject(r) + bucket, object := s3_constants.GetBucketAndObject(r) encryptionContext := BuildEncryptionContext(bucket, object, bucketKeyEnabled) // Create SSE-KMS encrypted reader