diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index f6dff6f92..707eb3595 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -45,6 +45,34 @@ var corsHeaders = []string{ // Package-level to avoid per-call allocations in writeZeroBytes var zeroBuf = make([]byte, 32*1024) +// StreamError is returned when streaming functions encounter errors. +// It tracks whether an HTTP response has already been written to prevent +// double WriteHeader calls that would create malformed S3 error responses. +type StreamError struct { + // Err is the underlying error + Err error + // ResponseWritten indicates if HTTP headers/status have been written to ResponseWriter + ResponseWritten bool +} + +func (e *StreamError) Error() string { + return e.Err.Error() +} + +func (e *StreamError) Unwrap() error { + return e.Err +} + +// newStreamError creates a StreamError for cases where response hasn't been written yet +func newStreamError(err error) *StreamError { + return &StreamError{Err: err, ResponseWritten: false} +} + +// newStreamErrorWithResponse creates a StreamError for cases where response was already written +func newStreamErrorWithResponse(err error) *StreamError { + return &StreamError{Err: err, ResponseWritten: true} +} + func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser { mimeBuffer := make([]byte, 512) size, _ := dataReader.Read(mimeBuffer) @@ -686,9 +714,14 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) streamTime = time.Since(tStream) if err != nil { glog.Errorf("GetObjectHandler: failed to stream from volume servers: %v", err) - // Try to write error response. The HTTP library will gracefully handle cases - // where headers are already sent (e.g., during streaming errors). - // For early validation errors, this ensures client gets proper error response. + // Check if the streaming function already wrote an HTTP response + var streamErr *StreamError + if errors.As(err, &streamErr) && streamErr.ResponseWritten { + // Response already written (headers + status code), don't write again + // to avoid "superfluous response.WriteHeader call" and malformed S3 error bodies + return + } + // Response not yet written - safe to write S3 error response s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } @@ -713,10 +746,9 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R }() if entry == nil { - // Early validation error: write proper HTTP response - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "Internal Server Error: entry is nil") - return fmt.Errorf("entry is nil") + // Early validation error: write S3-compliant XML error response + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return newStreamErrorWithResponse(fmt.Errorf("entry is nil")) } // Get file size @@ -809,21 +841,19 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R // Safely convert int64 to int for slice indexing - validate BEFORE WriteHeader // Use MaxInt32 for portability across 32-bit and 64-bit platforms if offset < 0 || offset > int64(math.MaxInt32) || size < 0 || size > int64(math.MaxInt32) { - // Early validation error: write proper HTTP response BEFORE headers + // Early validation error: write S3-compliant error response w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) - w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - fmt.Fprintf(w, "Range too large for platform: offset=%d, size=%d", offset, size) - return fmt.Errorf("range too large for platform: offset=%d, size=%d", offset, size) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) + return newStreamErrorWithResponse(fmt.Errorf("range too large for platform: offset=%d, size=%d", offset, size)) } start := int(offset) end := start + int(size) // Bounds check (should already be validated, but double-check) - BEFORE WriteHeader if start < 0 || start > len(entry.Content) || end > len(entry.Content) || end < start { - // Early validation error: write proper HTTP response BEFORE headers + // Early validation error: write S3-compliant error response w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) - w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - fmt.Fprintf(w, "Invalid range for inline content: start=%d, end=%d, len=%d)", start, end, len(entry.Content)) - return fmt.Errorf("invalid range for inline content: start=%d, end=%d, len=%d", start, end, len(entry.Content)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) + return newStreamErrorWithResponse(fmt.Errorf("invalid range for inline content: start=%d, end=%d, len=%d", start, end, len(entry.Content))) } // Validation passed - now set headers and write s3a.setResponseHeaders(w, entry, totalSize) @@ -849,10 +879,9 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R // BUG FIX: If totalSize > 0 but no chunks and no content, this is a data integrity issue if totalSize > 0 && len(entry.Content) == 0 { glog.Errorf("streamFromVolumeServers: Data integrity error - entry reports size %d but has no content or chunks", totalSize) - // IMPORTANT: Write error status before returning, since headers haven't been written yet - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "Internal Server Error: data integrity issue (size %d reported but no content available)", totalSize) - return fmt.Errorf("data integrity error: size %d reported but no content available", totalSize) + // Write S3-compliant XML error response + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return newStreamErrorWithResponse(fmt.Errorf("data integrity error: size %d reported but no content available", totalSize)) } // Empty object - set headers and write status s3a.setResponseHeaders(w, entry, totalSize) @@ -878,9 +907,9 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R chunkResolveTime = time.Since(tChunkResolve) if err != nil { glog.Errorf("streamFromVolumeServers: failed to resolve chunks: %v", err) - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "Internal Server Error: failed to resolve chunks") - return fmt.Errorf("failed to resolve chunks: %v", err) + // Write S3-compliant XML error response + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return newStreamErrorWithResponse(fmt.Errorf("failed to resolve chunks: %v", err)) } // Prepare streaming function with simple master client wrapper @@ -901,9 +930,9 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R streamPrepTime = time.Since(tStreamPrep) if err != nil { glog.Errorf("streamFromVolumeServers: failed to prepare stream: %v", err) - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "Internal Server Error: failed to prepare stream") - return fmt.Errorf("failed to prepare stream: %v", err) + // Write S3-compliant XML error response + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return newStreamErrorWithResponse(fmt.Errorf("failed to prepare stream: %v", err)) } // All validation and preparation successful - NOW set headers and write status @@ -931,10 +960,11 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R streamExecTime = time.Since(tStreamExec) if err != nil { glog.Errorf("streamFromVolumeServers: streamFn failed: %v", err) - } else { - glog.V(4).Infof("streamFromVolumeServers: streamFn completed successfully") + // Streaming error after WriteHeader was called - response already partially written + return newStreamErrorWithResponse(err) } - return err + glog.V(4).Infof("streamFromVolumeServers: streamFn completed successfully") + return nil } // Shared HTTP client for volume server requests (connection pooling) @@ -1019,7 +1049,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r if totalSize == 0 || suffixLen <= 0 { w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - return fmt.Errorf("invalid suffix range for empty object") + return newStreamErrorWithResponse(fmt.Errorf("invalid suffix range for empty object")) } if suffixLen > totalSize { suffixLen = totalSize @@ -1030,7 +1060,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r // Set header BEFORE WriteHeader w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - return fmt.Errorf("invalid suffix range") + return newStreamErrorWithResponse(fmt.Errorf("invalid suffix range")) } } else { // Regular range or open-ended range @@ -1053,7 +1083,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r // Set header BEFORE WriteHeader w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - return fmt.Errorf("invalid range start") + return newStreamErrorWithResponse(fmt.Errorf("invalid range start")) } if endOffset >= totalSize { @@ -1064,7 +1094,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r // Set header BEFORE WriteHeader w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - return fmt.Errorf("invalid range: end before start") + return newStreamErrorWithResponse(fmt.Errorf("invalid range: end before start")) } } @@ -1084,18 +1114,18 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r customerKey, err := ParseSSECHeaders(r) if err != nil { s3err.WriteErrorResponse(w, r, MapSSECErrorToS3Error(err)) - return err + return newStreamErrorWithResponse(err) } if customerKey == nil { s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) - return fmt.Errorf("SSE-C key required") + return newStreamErrorWithResponse(fmt.Errorf("SSE-C key required")) } // Validate key MD5 if entry.Extended != nil { storedKeyMD5 := string(entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]) if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 { s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) - return fmt.Errorf("SSE-C key mismatch") + return newStreamErrorWithResponse(fmt.Errorf("SSE-C key mismatch")) } } decryptionKey = customerKey @@ -1103,27 +1133,27 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r // Extract KMS key from metadata (stored as raw bytes, matching filer behavior) if entry.Extended == nil { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return fmt.Errorf("no SSE-KMS metadata") + return newStreamErrorWithResponse(fmt.Errorf("no SSE-KMS metadata")) } kmsMetadataBytes := entry.Extended[s3_constants.SeaweedFSSSEKMSKey] sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return err + return newStreamErrorWithResponse(err) } decryptionKey = sseKMSKey case s3_constants.SSETypeS3: // Extract S3 key from metadata (stored as raw bytes, matching filer behavior) if entry.Extended == nil { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return fmt.Errorf("no SSE-S3 metadata") + return newStreamErrorWithResponse(fmt.Errorf("no SSE-S3 metadata")) } keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key] keyManager := GetSSES3KeyManager() sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return err + return newStreamErrorWithResponse(err) } decryptionKey = sseS3Key } @@ -1157,7 +1187,11 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r err := s3a.streamDecryptedRangeFromChunks(r.Context(), w, entry, offset, size, sseType, decryptionKey) decryptSetupTime = time.Since(tDecryptSetup) copyTime = decryptSetupTime // Streaming is included in decrypt setup for range-aware path - return err + if err != nil { + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(err) + } + return nil } // Full object path: Optimize multipart vs single-part @@ -1191,13 +1225,15 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) streamFetchTime = time.Since(tStreamFetch) if streamErr != nil { - return streamErr + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(streamErr) } defer encryptedReader.Close() iv := entry.Extended[s3_constants.SeaweedFSSSEIV] if len(iv) == 0 { - return fmt.Errorf("SSE-C IV not found in entry metadata") + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(fmt.Errorf("SSE-C IV not found in entry metadata")) } glog.V(2).Infof("SSE-C decryption: IV length=%d, KeyMD5=%s", len(iv), customerKey.KeyMD5) decryptedReader, err = CreateSSECDecryptedReader(encryptedReader, customerKey, iv) @@ -1227,7 +1263,8 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) streamFetchTime = time.Since(tStreamFetch) if streamErr != nil { - return streamErr + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(streamErr) } defer encryptedReader.Close() @@ -1259,14 +1296,16 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) streamFetchTime = time.Since(tStreamFetch) if streamErr != nil { - return streamErr + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(streamErr) } defer encryptedReader.Close() keyManager := GetSSES3KeyManager() iv, ivErr := GetSSES3IV(entry, sseS3Key, keyManager) if ivErr != nil { - return fmt.Errorf("failed to get SSE-S3 IV: %w", ivErr) + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(fmt.Errorf("failed to get SSE-S3 IV: %w", ivErr)) } glog.V(2).Infof("SSE-S3 decryption: KeyID=%s, IV length=%d", sseS3Key.KeyID, len(iv)) decryptedReader, err = CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv) @@ -1276,7 +1315,8 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r if err != nil { glog.Errorf("SSE decryption error (%s): %v", sseType, err) - return fmt.Errorf("failed to create decrypted reader: %w", err) + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(fmt.Errorf("failed to create decrypted reader: %w", err)) } // Close the decrypted reader to avoid leaking HTTP bodies @@ -1295,7 +1335,8 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r copyTime = time.Since(tCopy) if copyErr != nil { glog.Errorf("Failed to copy full object: copied %d bytes: %v", copied, copyErr) - return copyErr + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(copyErr) } glog.V(3).Infof("Full object request: copied %d bytes", copied) return nil